This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b3781ec  [GOBBLIN-1561]Improve error message when flow compilation 
fails (#3412)
b3781ec is described below

commit b3781ec6025a3acc27bd65d05595326cc2ddd27d
Author: Zihan Li <[email protected]>
AuthorDate: Thu Oct 14 15:02:54 2021 -0700

    [GOBBLIN-1561]Improve error message when flow compilation fails (#3412)
    
    * [GOBBLIN-1561]Improve error message when flow compilation fails
    
    * adddress comments
---
 .../service/FlowConfigV2ResourceLocalHandler.java  | 30 ++++++++++++++++++----
 .../org/apache/gobblin/runtime/api/FlowSpec.java   | 25 ++++++++++++++++--
 .../service/modules/flow/MultiHopFlowCompiler.java | 15 ++++++-----
 .../flowgraph/pathfinder/AbstractPathFinder.java   |  2 +-
 .../modules/flow/MultiHopFlowCompilerTest.java     |  9 ++++---
 5 files changed, 63 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 3c64e47..60dfe7b 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,8 +16,12 @@
  */
 package org.apache.gobblin.service;
 
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map;
 
+import java.util.Set;
 import org.apache.commons.lang3.StringEscapeUtils;
 
 import com.linkedin.data.template.StringMap;
@@ -89,16 +93,32 @@ public class FlowConfigV2ResourceLocalHandler extends 
FlowConfigResourceLocalHan
     } else if 
(Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL,
 new AddSpecResponse<>("false")).getValue().toString())) {
       httpStatus = HttpStatus.S_201_CREATED;
     } else {
-      String message = "Flow was not compiled successfully.";
-      if (!flowSpec.getCompilationErrors().isEmpty()) {
-        message = message + " Compilation errors encountered: " + 
flowSpec.getCompilationErrors();
-      }
-      return new CreateKVResponse<>(new 
RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, message));
+      throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, 
getErrorMessage(flowSpec));
     }
 
     return new CreateKVResponse<>(new ComplexResourceKey<>(flowConfig.getId(), 
flowStatusId), flowConfig, httpStatus);
   }
 
+  private String getErrorMessage(FlowSpec flowSpec) {
+    StringBuilder message = new StringBuilder("Flow was not compiled 
successfully.");
+    if (!flowSpec.getCompilationErrors().isEmpty()) {
+      message.append(" Compilation errors encountered (Sorted by relevance): 
");
+      Object[] errors = flowSpec.getCompilationErrors().toArray();
+      Arrays.sort(errors, Comparator.comparingInt(c -> 
((FlowSpec.CompilationError)c).errorPriority));
+      // This is to avoid we print same error multi times.
+      Set<String> errorSet = new HashSet<>();
+      int errorId = 0;
+      for (Object er: errors) {
+        FlowSpec.CompilationError error = (FlowSpec.CompilationError)er;
+        if (!errorSet.contains(error.errorMessage)) {
+          message.append("\n").append(String.format("ERROR[%s]", 
errorId)).append(error.errorMessage);
+          errorSet.add(error.errorMessage);
+          errorId++;
+        }
+      }
+    }
+    return message.toString();
+  }
   /**
    * Note: this method is only implemented for testing, normally partial 
update would be called in
    * GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index c66a531..37eb584 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -19,8 +19,9 @@ package org.apache.gobblin.runtime.api;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -91,7 +92,7 @@ public class FlowSpec implements Configurable, Spec {
   final Optional<List<Spec>> childSpecs;
 
   /** List of exceptions that occurred during compilation of this FlowSpec **/
-  final Set<String> compilationErrors = new HashSet<>();
+  final List<CompilationError> compilationErrors = new ArrayList<>();
 
   public static FlowSpec.Builder builder(URI flowSpecUri) {
     return new FlowSpec.Builder(flowSpecUri);
@@ -124,6 +125,26 @@ public class FlowSpec implements Configurable, Spec {
       throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
     }
   }
+  public CompilationError getCompilationError(String src, String dst, String 
errorMessage) {
+    return new CompilationError(src, dst, errorMessage);
+  }
+
+
+  public class CompilationError {
+    public int errorPriority;
+    public String errorMessage;
+    CompilationError(String src, String dst, String errorMessage) {
+      errorPriority = 0;
+      if (!src.equals(ConfigUtils.getString(getConfig(), 
ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
+        errorPriority++;
+      }
+      if (!ConfigUtils.getStringList(getConfig(), 
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
+          .containsAll(Arrays.asList(StringUtils.split(dst, ",")))){
+        errorPriority++;
+      }
+      this.errorMessage = errorMessage;
+    }
+  }
 
   public String toShortString() {
     return getUri().toString() + "/" + getVersion();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index cf99332..a55d656 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -192,14 +192,14 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
     DataNode sourceNode = this.flowGraph.getNode(source);
     if (sourceNode == null) {
       flowSpec.getCompilationErrors()
-          .add(String.format("Flowgraph does not have a node with id %s", 
source));
+          .add(flowSpec.getCompilationError(source, destination, 
String.format("Flowgraph does not have a node with id %s", source)));
       return null;
     }
     List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(), 
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
     List<DataNode> destNodes = 
destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
     if (destNodes.contains(null)) {
       flowSpec.getCompilationErrors()
-          .add(String.format("Flowgraph does not have a node with id %s", 
destNodeIds.get(destNodes.indexOf(null))));
+          .add(flowSpec.getCompilationError(source, destination, 
String.format("Flowgraph does not have a node with id %s", 
destNodeIds.get(destNodes.indexOf(null)))));
       return null;
     }
     log.info(String.format("Compiling flow for source: %s and destination: 
%s", source, destination));
@@ -218,12 +218,12 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
               String message = String.format("Data movement is not authorized 
for flow: %s, source: %s, destination: %s",
                   flowSpec.getUri().toString(), source, destination);
               log.error(message);
-              datasetFlowSpec.getCompilationErrors().add(message);
+              
datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source,
 destination, message));
               return null;
             }
           } catch (Exception e) {
             Instrumented.markMeter(flowCompilationFailedMeter);
-            
datasetFlowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(e));
+            
datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source,
 destination, Throwables.getStackTraceAsString(e)));
             return null;
           }
         }
@@ -240,14 +240,17 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
         Instrumented.markMeter(flowCompilationFailedMeter);
         String message = String.format("No path found from source: %s and 
destination: %s", source, destination);
         log.info(message);
-        flowSpec.getCompilationErrors().add(message);
+
+        if 
(!flowSpec.getCompilationErrors().stream().anyMatch(compilationError -> 
compilationError.errorPriority == 0)) {
+          
flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, 
destination, message));
+        }
         return null;
       }
     } catch (PathFinder.PathFinderException | SpecNotFoundException | 
JobTemplate.TemplateException | URISyntaxException | 
ReflectiveOperationException e) {
       Instrumented.markMeter(flowCompilationFailedMeter);
       String message = String.format("Exception encountered while compiling 
flow for source: %s and destination: %s, %s", source, destination, 
Throwables.getStackTraceAsString(e));
       log.error(message, e);
-      flowSpec.getCompilationErrors().add(message);
+      flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source, 
destination, message));
       return null;
     } finally {
       this.rwLock.readLock().unlock();
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 334b750..8c9fa8f 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements 
PathFinder {
             try {
               flowEdge.getFlowTemplate().tryResolving(mergedConfig, 
datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
             } catch (JobTemplate.TemplateException | ConfigException | 
SpecNotFoundException e) {
-              this.flowSpec.getCompilationErrors().add("Error compiling edge " 
+ flowEdge.toString() + ": " + e.toString());
+              
this.flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(flowEdge.getSrc(),
 flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " + 
e.toString()));
               continue;
             }
 
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 831ca18..7e67549 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -647,8 +648,8 @@ public class MultiHopFlowCompilerTest {
     Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
 
     Assert.assertNull(dag);
-    Assert.assertEquals(spec.getCompilationErrors().size(), 2);
-    spec.getCompilationErrors().stream().anyMatch(s -> 
s.contains(AzkabanProjectConfig.USER_TO_PROXY));
+    Assert.assertEquals(spec.getCompilationErrors().stream().map(c -> 
c.errorMessage).collect(Collectors.toSet()).size(), 1);
+    spec.getCompilationErrors().stream().anyMatch(s -> 
s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY));
   }
 
   @Test (dependsOnMethods = "testUnresolvedFlow")
@@ -659,7 +660,7 @@ public class MultiHopFlowCompilerTest {
 
     Assert.assertEquals(dag, null);
     Assert.assertEquals(spec.getCompilationErrors().size(), 1);
-    spec.getCompilationErrors().stream().anyMatch(s -> s.contains("Flowgraph 
does not have a node with id"));
+    spec.getCompilationErrors().stream().anyMatch(s -> 
s.errorMessage.contains("Flowgraph does not have a node with id"));
   }
 
   @Test (dependsOnMethods = "testMissingSourceNodeError")
@@ -670,7 +671,7 @@ public class MultiHopFlowCompilerTest {
 
     Assert.assertNull(dag);
     Assert.assertEquals(spec.getCompilationErrors().size(), 1);
-    spec.getCompilationErrors().stream().anyMatch(s -> s.contains("Flowgraph 
does not have a node with id"));
+    spec.getCompilationErrors().stream().anyMatch(s -> 
s.errorMessage.contains("Flowgraph does not have a node with id"));
   }
 
   @Test (dependsOnMethods = "testMissingDestinationNodeError")

Reply via email to