This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 14ff4e5  [GOBBLIN-909] Return error message for unresolved 
substitutions in explain query
14ff4e5 is described below

commit 14ff4e537810f48115f71c27210f0881a45fdbe6
Author: Jack Moseley <[email protected]>
AuthorDate: Tue Oct 22 16:05:40 2019 -0700

    [GOBBLIN-909] Return error message for unresolved substitutions in explain 
query
    
    Closes #2763 from jack-moseley/explain-unresolved
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../service/FlowConfigV2ResourceLocalHandler.java  |  2 +-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  6 ++++
 .../flowgraph/pathfinder/AbstractPathFinder.java   | 14 ++++++++-
 .../scheduler/GobblinServiceJobScheduler.java      | 14 ++++-----
 .../service/modules/template/FlowTemplate.java     | 14 ++++-----
 .../modules/template/StaticFlowTemplate.java       | 34 ++++++++++++----------
 .../modules/flow/MultiHopFlowCompilerTest.java     | 13 ++++++++-
 .../FSFlowTemplateCatalogTest.java                 |  2 +-
 gobblin-service/src/test/resources/flow/flow5.conf |  9 ++++++
 10 files changed, 75 insertions(+), 34 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 20a0d72..cd5b21f 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -906,6 +906,7 @@ public class ConfigurationKeys {
   public static final String DATASET_BASE_INPUT_PATH_KEY = 
"gobblin.flow.dataset.baseInputPath";
   public static final String DATASET_BASE_OUTPUT_PATH_KEY = 
"gobblin.flow.dataset.baseOutputPath";
   public static final String DATASET_COMBINE_KEY = 
"gobblin.flow.dataset.combine";
+  public static final String WHITELISTED_EDGE_IDS = 
"gobblin.flow.whitelistedEdgeIds";
 
   /***
    * Configuration properties related to TopologySpec Store
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 9fcfec1..c4a3e45 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -69,7 +69,7 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
       StringMap props = flowConfig.getProperties();
       AddSpecResponse<String> addSpecResponse = 
responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
       props.put("gobblin.flow.compiled",
-          addSpecResponse != null ? 
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
+          addSpecResponse != null && addSpecResponse.getValue() != null ? 
StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
       flowConfig.setProperties(props);
       //Return response with 200 status code, since no resource is actually 
created.
       httpStatus = HttpStatus.S_200_OK;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 59a5025..3708cc6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.runtime.api;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -38,6 +39,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 
 /**
@@ -49,6 +51,7 @@ import lombok.Data;
  */
 @Alpha
 @Data
+@EqualsAndHashCode(exclude={"compilationErrors"})
 public class FlowSpec implements Configurable, Spec {
   /** An URI identifying the flow. */
   final URI uri;
@@ -74,6 +77,9 @@ public class FlowSpec implements Configurable, Spec {
   // Note that a FlowSpec can be materialized into multiple FlowSpec or 
JobSpec hierarchy
   final Optional<List<Spec>> childSpecs;
 
+  /** List of exceptions that occurred during compilation of this FlowSpec **/
+  final Set<String> compilationErrors = new HashSet<>();
+
   public static FlowSpec.Builder builder(URI flowSpecUri) {
     return new FlowSpec.Builder(flowSpecUri);
   }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 880532f..674c187 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
 import com.typesafe.config.ConfigValue;
 import com.typesafe.config.ConfigValueFactory;
 
@@ -170,7 +171,11 @@ public abstract class AbstractPathFinder implements 
PathFinder {
   List<FlowEdgeContext> getNextEdges(DataNode dataNode, DatasetDescriptor 
currentDatasetDescriptor,
       DatasetDescriptor destDatasetDescriptor) {
     List<FlowEdgeContext> prioritizedEdgeList = new LinkedList<>();
+    List<String> edgeIds = ConfigUtils.getStringList(this.flowConfig, 
ConfigurationKeys.WHITELISTED_EDGE_IDS);
     for (FlowEdge flowEdge : this.flowGraph.getEdges(dataNode)) {
+      if (!edgeIds.isEmpty() && !edgeIds.contains(flowEdge.getId())) {
+        continue;
+      }
       try {
         DataNode edgeDestination = this.flowGraph.getNode(flowEdge.getDest());
         //Base condition: Skip this FLowEdge, if it is inactive or if the 
destination of this edge is inactive.
@@ -183,11 +188,18 @@ public abstract class AbstractPathFinder implements 
PathFinder {
         for (SpecExecutor specExecutor : flowEdge.getExecutors()) {
           Config mergedConfig = getMergedConfig(flowEdge);
           List<Pair<DatasetDescriptor, DatasetDescriptor>> 
datasetDescriptorPairs =
-              
flowEdge.getFlowTemplate().getResolvingDatasetDescriptors(mergedConfig);
+              flowEdge.getFlowTemplate().getDatasetDescriptors(mergedConfig, 
false);
           for (Pair<DatasetDescriptor, DatasetDescriptor> 
datasetDescriptorPair : datasetDescriptorPairs) {
             DatasetDescriptor inputDatasetDescriptor = 
datasetDescriptorPair.getLeft();
             DatasetDescriptor outputDatasetDescriptor = 
datasetDescriptorPair.getRight();
 
+            try {
+              flowEdge.getFlowTemplate().tryResolving(mergedConfig, 
datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
+            } catch (JobTemplate.TemplateException | ConfigException | 
SpecNotFoundException e) {
+              this.flowSpec.getCompilationErrors().add(e.toString());
+              continue;
+            }
+
             if (inputDatasetDescriptor.contains(currentDatasetDescriptor)) {
               DatasetDescriptor edgeOutputDescriptor = 
makeOutputDescriptorSpecific(currentDatasetDescriptor, outputDatasetDescriptor);
               FlowEdgeContext flowEdgeContext = new FlowEdgeContext(flowEdge, 
currentDatasetDescriptor, edgeOutputDescriptor, mergedConfig,
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index baac7d6..798f7a3 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.scheduler;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
@@ -282,7 +283,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
               
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
         }
         boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), 
ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
-        String compiledFlow = null;
+        String response = null;
         if (!isExplain) {
           this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), 
addedSpec);
 
@@ -299,14 +300,11 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
           }
         } else {
           //Return a compiled flow.
-          try {
-            this.orchestrator.getSpecCompiler().awaitHealthy();
-          } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-          }
           Dag<JobExecutionPlan> dag = 
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
           if (dag != null && !dag.isEmpty()) {
-            compiledFlow = dag.toString();
+            response = dag.toString();
+          } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+            response = 
Arrays.toString(flowSpec.getCompilationErrors().toArray());
           }
           _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN 
request", this.serviceName, addedSpec);
 
@@ -315,7 +313,7 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
             this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), 
false);
           }
         }
-        return new AddSpecResponse(compiledFlow);
+        return new AddSpecResponse(response);
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, 
addedSpec, je);
       }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
index a8350fd..bd78e48 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/FlowTemplate.java
@@ -51,21 +51,21 @@ public interface FlowTemplate extends Spec {
 
   /**
    * @param userConfig a list of user customized attributes.
-   * @return list of input/output {@link DatasetDescriptor}s that fully 
resolve the {@link FlowTemplate} using the
-   * provided userConfig.
+   * @param resolvable if true, only return descriptors that resolve the 
{@link FlowTemplate}
+   * @return list of input/output {@link DatasetDescriptor}s corresponding to 
the provied userConfig.
    */
-  List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getResolvingDatasetDescriptors(Config userConfig)
+  List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getDatasetDescriptors(Config userConfig, boolean resolvable)
       throws IOException, ReflectiveOperationException, SpecNotFoundException, 
JobTemplate.TemplateException;
 
   /**
-   * Checks if the {@link FlowTemplate} is resolvable using the provided 
{@link Config} object. A {@link FlowTemplate}
-   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable
+   * Try to resolve the {@link FlowTemplate} using the provided {@link Config} 
object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable. Throws an exception if the flow is
+   * not resolvable.
    * @param userConfig User supplied Config
    * @param inputDescriptor input {@link DatasetDescriptor}
    * @param outputDescriptor output {@link DatasetDescriptor}
-   * @return true if the {@link FlowTemplate} is resolvable
    */
-  boolean isResolvable(Config userConfig, DatasetDescriptor inputDescriptor, 
DatasetDescriptor outputDescriptor)
+  void tryResolving(Config userConfig, DatasetDescriptor inputDescriptor, 
DatasetDescriptor outputDescriptor)
       throws SpecNotFoundException, JobTemplate.TemplateException;
 
   /**
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 53c8e37..e833897 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -102,12 +102,13 @@ public class StaticFlowTemplate implements FlowTemplate {
 
   /**
    * Generate the input/output dataset descriptors for the {@link 
FlowTemplate}.
-   * @param userConfig
-   * @return a List of Input/Output DatasetDescriptors that resolve this 
{@link FlowTemplate}.
+   * @param userConfig User supplied Config
+   * @param resolvable Whether to return only resolvable dataset descriptors
+   * @return a List of Input/Output DatasetDescriptors that correspond to this 
{@link FlowTemplate}. If resolvable is true,
+   * only return descriptors that fully resolve it.
    */
   @Override
-  public List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getResolvingDatasetDescriptors(Config userConfig)
-      throws IOException, SpecNotFoundException, JobTemplate.TemplateException 
{
+  public List<Pair<DatasetDescriptor, DatasetDescriptor>> 
getDatasetDescriptors(Config userConfig, boolean resolvable) throws IOException 
{
     Config config = 
this.getResolvedFlowConfig(userConfig).resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
     if 
(!config.hasPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX)
@@ -127,7 +128,14 @@ public class StaticFlowTemplate implements FlowTemplate {
         Config outputDescriptorConfig = config.getConfig(outputPrefix);
         DatasetDescriptor outputDescriptor = 
getDatasetDescriptor(outputDescriptorConfig);
 
-        if (isResolvable(userConfig, inputDescriptor, outputDescriptor)) {
+        if (resolvable) {
+          try {
+            tryResolving(userConfig, inputDescriptor, outputDescriptor);
+            result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
+          } catch (JobTemplate.TemplateException | ConfigException | 
SpecNotFoundException e) {
+            // Dataset descriptor cannot be resolved so don't add it to result
+          }
+        } else {
           result.add(ImmutablePair.of(inputDescriptor, outputDescriptor));
         }
       } catch (ReflectiveOperationException e) {
@@ -159,13 +167,14 @@ public class StaticFlowTemplate implements FlowTemplate {
   }
 
   /**
-   * Checks if the {@link FlowTemplate} is resolvable using the provided 
{@link Config} object. A {@link FlowTemplate}
-   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable
+   * Try to resolve the {@link FlowTemplate} using the provided {@link Config} 
object. A {@link FlowTemplate}
+   * is resolvable only if each of the {@link JobTemplate}s in the flow is 
resolvable. Throws an exception if the flow is
+   * not resolvable.
    * @param userConfig User supplied Config
-   * @return true if the {@link FlowTemplate} is resolvable
    */
   @Override
-  public boolean isResolvable(Config userConfig, DatasetDescriptor 
inputDescriptor, DatasetDescriptor outputDescriptor) {
+  public void tryResolving(Config userConfig, DatasetDescriptor 
inputDescriptor, DatasetDescriptor outputDescriptor)
+      throws SpecNotFoundException, JobTemplate.TemplateException {
     Config inputDescriptorConfig = 
inputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_INPUT_DATASET_DESCRIPTOR_PREFIX);
     Config outputDescriptorConfig = 
outputDescriptor.getRawConfig().atPath(DatasetDescriptorConfigKeys.FLOW_EDGE_OUTPUT_DATASET_DESCRIPTOR_PREFIX);
     userConfig = 
userConfig.withFallback(inputDescriptorConfig).withFallback(outputDescriptorConfig);
@@ -173,13 +182,8 @@ public class StaticFlowTemplate implements FlowTemplate {
     JobSpec.Builder jobSpecBuilder = JobSpec.builder().withConfig(userConfig);
 
     for (JobTemplate template: this.jobTemplates) {
-      try {
-        
this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
-      } catch (JobTemplate.TemplateException | ConfigException | 
SpecNotFoundException exc) {
-        return false;
-      }
+      
this.jobSpecResolver.resolveJobSpec(jobSpecBuilder.withTemplate(template).build());
     }
-    return true;
   }
 
   @Override
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 176c2d6..644b75a 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -640,7 +640,18 @@ public class MultiHopFlowCompilerTest {
     
Assert.assertTrue(jobConfig2.getString(ConfigurableGlobDatasetFinder.DATASET_FINDER_PATTERN_KEY).endsWith("{dataset0,dataset1,dataset2}"));
   }
 
-  @Test (dependsOnMethods = "testCompileMultiDatasetFlow")
+  @Test (dependsOnMethods = "testCompileCombinedDatasetFlow")
+  public void testUnresolvedFlow() throws Exception {
+    FlowSpec spec = createFlowSpec("flow/flow5.conf", "HDFS-1", "HDFS-3", 
false, false);
+
+    Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
+
+    Assert.assertTrue(dag.isEmpty());
+    Assert.assertEquals(spec.getCompilationErrors().size(), 1);
+    
Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains("user.to.proxy"));
+  }
+
+  @Test (dependsOnMethods = "testUnresolvedFlow")
   public void testGitFlowGraphMonitorService()
       throws IOException, GitAPIException, URISyntaxException, 
InterruptedException {
     File remoteDir = new File(TESTDIR + "/remote");
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
index f8cdf79..9307086 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/template_catalog/FSFlowTemplateCatalogTest.java
@@ -77,7 +77,7 @@ public class FSFlowTemplateCatalogTest {
     Config flowConfig = ConfigFactory.empty().withValue("team.name", 
ConfigValueFactory.fromAnyRef("test-team"))
         .withValue("dataset.name", 
ConfigValueFactory.fromAnyRef("test-dataset"));
 
-    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = 
flowTemplate.getResolvingDatasetDescriptors(flowConfig);
+    List<Pair<DatasetDescriptor, DatasetDescriptor>> inputOutputDescriptors = 
flowTemplate.getDatasetDescriptors(flowConfig, true);
     Assert.assertTrue(inputOutputDescriptors.size() == 2);
     List<String> dirs = Lists.newArrayList("inbound", "outbound");
     for (int i = 0; i < 2; i++) {
diff --git a/gobblin-service/src/test/resources/flow/flow5.conf 
b/gobblin-service/src/test/resources/flow/flow5.conf
new file mode 100644
index 0000000..1131b47
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow5.conf
@@ -0,0 +1,9 @@
+// Leaving user.to.proxy unresolved
+
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/input/dataset1
+
+gobblin.flow.output.dataset.descriptor.class=${gobblin.flow.input.dataset.descriptor.class}
+gobblin.flow.output.dataset.descriptor.platform=${gobblin.flow.input.dataset.descriptor.platform}
+gobblin.flow.output.dataset.descriptor.path=/data/output/dataset1
\ No newline at end of file

Reply via email to