This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 14ff4e5 [GOBBLIN-909] Return error message for unresolved
substitutions in explain query
14ff4e5 is described below
commit 14ff4e537810f48115f71c27210f0881a45fdbe6
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Oct 22 16:05:40 2019 -0700
[GOBBLIN-909] Return error message for unresolved substitutions in explain
query
Closes #2763 from jack-moseley/explain-unresolved
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../service/FlowConfigV2ResourceLocalHandler.java | 2 +-
.../org/apache/gobblin/runtime/api/FlowSpec.java | 6 ++++
.../flowgraph/pathfinder/AbstractPathFinder.java | 14 ++++++++-
.../scheduler/GobblinServiceJobScheduler.java | 14 ++++-----
.../service/modules/template/FlowTemplate.java | 14 ++++-----
.../modules/template/StaticFlowTemplate.java | 34 ++++++++++++----------
.../modules/flow/MultiHopFlowCompilerTest.java | 13 ++++++++-
.../FSFlowTemplateCatalogTest.java | 2 +-
gobblin-service/src/test/resources/flow/flow5.conf | 9 ++++++
10 files changed, 75 insertions(+), 34 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 20a0d72..cd5b21f 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -906,6 +906,7 @@ public class ConfigurationKeys {
public static final String DATASET_BASE_INPUT_PATH_KEY =
"gobblin.flow.dataset.baseInputPath";
public static final String DATASET_BASE_OUTPUT_PATH_KEY =
"gobblin.flow.dataset.baseOutputPath";
public static final String DATASET_COMBINE_KEY =
"gobblin.flow.dataset.combine";
+ public static final String WHITELISTED_EDGE_IDS =
"gobblin.flow.whitelistedEdgeIds";
/***
* Configuration properties related to TopologySpec Store
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 9fcfec1..c4a3e45 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -69,7 +69,7 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
StringMap props = flowConfig.getProperties();
AddSpecResponse<String> addSpecResponse =
responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
props.put("gobblin.flow.compiled",
- addSpecResponse != null ?
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
+ addSpecResponse != null && addSpecResponse.getValue() != null ?
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
flowConfig.setProperties(props);
//Return response with 200 status code, since no resource is actually
created.
httpStatus = HttpStatus.S_200_OK;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 59a5025..3708cc6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -38,6 +39,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import lombok.Data;
+import lombok.EqualsAndHashCode;
/**
@@ -49,6 +51,7 @@ import lombok.Data;
*/
@Alpha
@Data
+@EqualsAndHashCode(exclude={"compilationErrors"})
public class FlowSpec implements Configurable, Spec {
/** An URI identifying the flow. */
final URI uri;
@@ -74,6 +77,9 @@ public class FlowSpec implements Configurable, Spec {
// Note that a FlowSpec can be materialized into multiple FlowSpec or
JobSpec hierarchy
final Optional<List<Spec>> childSpecs;
+ /** List of exceptions that occurred during compilation of this FlowSpec **/
+ final Set<String> compilationErrors = new HashSet<>();
+
public static FlowSpec.Builder builder(URI flowSpecUri) {
return new FlowSpec.Builder(flowSpecUri);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 880532f..674c187 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.tuple.Pair;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
import com.typesafe.config.ConfigValue;
import com.typesafe.config.ConfigValueFactory;
@@ -170,7 +171,11 @@ public abstract class AbstractPathFinder implements
PathFinder {
List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor
currentDatasetDescriptor,
DatasetDescriptor destDatasetDescriptor) {
List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+ List<String> edgeIds = ConfigUtils.getStringList(this.flowConfig,
ConfigurationKeys.WHITELISTED_EDGE_IDS);
for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+ if (!edgeIds.isEmpty() && !edgeIds.contains(flowEdge.getId())) {
+ continue;
+ }
try {
DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
//Base condition: Skip this FLowEdge, if it is inactive or if the
destination of this edge is inactive.
@@ -183,11 +188,18 @@ public abstract class AbstractPathFinder implements
PathFinder {
for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
Config mergedConfig = getMergedConfig(flowEdge);
List<Pair<DatasetDescriptor, DatasetDescriptor>>
datasetDescriptorPairs =
-
flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+ flowEdge.getFlowTemplate().getDatasetDescriptors(mergedConfig,
false);
for (Pair<DatasetDescriptor, DatasetDescriptor>
datasetDescriptorPair : datasetDescriptorPairs) {
DatasetDescriptor inputDatasetDescriptor =
datasetDescriptorPair.getLeft();
DatasetDescriptor outputDatasetDescriptor =
datasetDescriptorPair.getRight();
+ try {
+ flowEdge.getFlowTemplate().tryResolving(mergedConfig,
datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
+ } catch (JobTemplate.TemplateException | ConfigException |
SpecNotFoundException e) {
+ this.flowSpec.getCompilationErrors().add(e.toString());
+ continue;
+ }
+
if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
DatasetDescriptor edgeOutputDescriptor =
makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge,
currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index baac7d6..798f7a3 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.scheduler;
import java.io.IOException;
import java.net.URI;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -282,7 +283,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
}
boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(),
ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
- String compiledFlow = null;
+ String response = null;
if (!isExplain) {
this.scheduledFlowSpecs.put(addedSpec.getUri().toString(),
addedSpec);
@@ -299,14 +300,11 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
}
} else {
//Return a compiled flow.
- try {
- this.orchestrator.getSpecCompiler().awaitHealthy();
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
Dag<JobExecutionPlan> dag =
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
if (dag != null && !dag.isEmpty()) {
- compiledFlow = dag.toString();
+ response = dag.toString();
+ } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+ response =
Arrays.toString(flowSpec.getCompilationErrors().toArray());
}
_log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN
request", this.serviceName, addedSpec);
@@ -315,7 +313,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(),
false);
}
}
- return new AddSpecResponse(compiledFlow);
+ return new AddSpecResponse(response);
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName,
addedSpec, je);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index a8350fd..bd78e48 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -51,21 +51,21 @@ public interface FlowTemplate extends Spec {
/**
* @param userConfig a list of user customized attributes.
- * @return list of input/output {@link DatasetDescriptor}s that fully
resolve the {@link FlowTemplate} using the
- * provided userConfig.
+ * @param resolvable if true, only return descriptors that resolve the
{@link FlowTemplate}
+ * @return list of input/output {@link DatasetDescriptor}s corresponding to
the provied userConfig.
*/
- List<Pair<DatasetDescriptor, DatasetDescriptor>>
getResolvingDatasetDescriptors(Config userConfig)
+ List<Pair<DatasetDescriptor, DatasetDescriptor>>
getDatasetDescriptors(Config userConfig, boolean resolvable)
throws IOException, ReflectiveOperationException, SpecNotFoundException,
JobTemplate.TemplateException;
/**
- * Checks if the {@link FlowTemplate} is resolvable using the provided
{@link Config} object. A {@link FlowTemplate}
- * is resolvable only if each of the {@link JobTemplate}s in the flow is
resolvable
+ * Try to resolve the {@link FlowTemplate} using the provided {@link Config}
object. A {@link FlowTemplate}
+ * is resolvable only if each of the {@link JobTemplate}s in the flow is
resolvable. Throws an exception if the flow is
+ * not resolvable.
* @param userConfig User supplied Config
* @param inputDescriptor input {@link DatasetDescriptor}
* @param outputDescriptor output {@link DatasetDescriptor}
- * @return true if the {@link FlowTemplate} is resolvable
*/
- boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor,
DatasetDescriptor outputDescriptor)
+ void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor,
DatasetDescriptor outputDescriptor)
throws SpecNotFoundException, JobTemplate.TemplateException;
/**
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 53c8e37..e833897 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -102,12 +102,13 @@ public class StaticFlowTemplate implements FlowTemplate {
/**
* Generate the input/output dataset descriptors for the {@link
FlowTemplate}.
- * @param userConfig
- * @return a List of Input/Output DatasetDescriptors that resolve this
{@link FlowTemplate}.
+ * @param userConfig User supplied Config
+ * @param resolvable Whether to return only resolvable dataset descriptors
+ * @return a List of Input/Output DatasetDescriptors that correspond to this
{@link FlowTemplate}. If resolvable is true,
+ * only return descriptors that fully resolve it.
*/
@Override
- public List<Pair<DatasetDescriptor, DatasetDescriptor>>
getResolvingDatasetDescriptors(Config userConfig)
- throws IOException, SpecNotFoundException, JobTemplate.TemplateException
{
+ public List<Pair<DatasetDescriptor, DatasetDescriptor>>
getDatasetDescriptors(Config userConfig, boolean resolvable) throws IOException
{
Config config =
this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
if
(!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX)
@@ -127,7 +128,14 @@ public class StaticFlowTemplate implements FlowTemplate {
Config outputDescriptorConfig = config.getConfig(outputPrefix);
DatasetDescriptor outputDescriptor =
getDatasetDescriptor(outputDescriptorConfig);
- if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) {
+ if (resolvable) {
+ try {
+ tryResolving(userConfig, inputDescriptor, outputDescriptor);
+ result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+ } catch (JobTemplate.TemplateException | ConfigException |
SpecNotFoundException e) {
+ // Dataset descriptor cannot be resolved so don't add it to result
+ }
+ } else {
result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
}
} catch (ReflectiveOperationException e) {
@@ -159,13 +167,14 @@ public class StaticFlowTemplate implements FlowTemplate {
}
/**
- * Checks if the {@link FlowTemplate} is resolvable using the provided
{@link Config} object. A {@link FlowTemplate}
- * is resolvable only if each of the {@link JobTemplate}s in the flow is
resolvable
+ * Try to resolve the {@link FlowTemplate} using the provided {@link Config}
object. A {@link FlowTemplate}
+ * is resolvable only if each of the {@link JobTemplate}s in the flow is
resolvable. Throws an exception if the flow is
+ * not resolvable.
* @param userConfig User supplied Config
- * @return true if the {@link FlowTemplate} is resolvable
*/
@Override
- public boolean isResolvable(Config userConfig, DatasetDescriptor
inputDescriptor, DatasetDescriptor outputDescriptor) {
+ public void tryResolving(Config userConfig, DatasetDescriptor
inputDescriptor, DatasetDescriptor outputDescriptor)
+ throws SpecNotFoundException, JobTemplate.TemplateException {
Config inputDescriptorConfig =
inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
Config outputDescriptorConfig =
outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
userConfig =
userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
@@ -173,13 +182,8 @@ public class StaticFlowTemplate implements FlowTemplate {
JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
for (JobTemplate template: this.jobTemplates) {
- try {
-
this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
- } catch (JobTemplate.TemplateException | ConfigException |
SpecNotFoundException exc) {
- return false;
- }
+
this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
}
- return true;
}
@Override
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 176c2d6..644b75a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -640,7 +640,18 @@ public class MultiHopFlowCompilerTest {
Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
}
- @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
+ @Test (dependsOnMethods = "testCompileCombinedDatasetFlow")
+ public void testUnresolvedFlow() throws Exception {
+ FlowSpec spec = createFlowSpec("flow/flow5.conf", "HDFS-1", "HDFS-3",
false, false);
+
+ Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+ Assert.assertTrue(dag.isEmpty());
+ Assert.assertEquals(spec.getCompilationErrors().size(), 1);
+
Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains("user.to.proxy"));
+ }
+
+ @Test (dependsOnMethods = "testUnresolvedFlow")
public void testGitFlowGraphMonitorService()
throws IOException, GitAPIException, URISyntaxException,
InterruptedException {
File remoteDir = new File(TESTDIR + "/remote");
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index f8cdf79..9307086 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -77,7 +77,7 @@ public class FSFlowTemplateCatalogTest {
Config flowConfig = ConfigFactory.empty().withValue("team.name",
ConfigValueFactory.fromAnyRef("test-team"))
.withValue("dataset.name",
ConfigValueFactory.fromAnyRef("test-dataset"));
- List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors =
flowTemplate.getResolvingDatasetDescriptors(flowConfig);
+ List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors =
flowTemplate.getDatasetDescriptors(flowConfig, true);
Assert.assertTrue(inputOutputDescriptors.size() == 2);
List<String> dirs = Lists.newArrayList("inbound", "outbound");
for (int i = 0; i < 2; i++) {
diff --git a/gobblin-service/src/test/resources/flow/flow5.conf
b/gobblin-service/src/test/resources/flow/flow5.conf
new file mode 100644
index 0000000..1131b47
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow5.conf
@@ -0,0 +1,9 @@
+// Leaving user.to.proxy unresolved
+
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/input/dataset1
+
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.output.dataset.descriptor.path=/data/output/dataset1
\ No newline at end of file