This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b3781ec [GOBBLIN-1561]Improve error message when flow compilation
fails (#3412)
b3781ec is described below
commit b3781ec6025a3acc27bd65d05595326cc2ddd27d
Author: Zihan Li <[email protected]>
AuthorDate: Thu Oct 14 15:02:54 2021 -0700
[GOBBLIN-1561]Improve error message when flow compilation fails (#3412)
* [GOBBLIN-1561]Improve error message when flow compilation fails
* adddress comments
---
.../service/FlowConfigV2ResourceLocalHandler.java | 30 ++++++++++++++++++----
.../org/apache/gobblin/runtime/api/FlowSpec.java | 25 ++++++++++++++++--
.../service/modules/flow/MultiHopFlowCompiler.java | 15 ++++++-----
.../flowgraph/pathfinder/AbstractPathFinder.java | 2 +-
.../modules/flow/MultiHopFlowCompilerTest.java | 9 ++++---
5 files changed, 63 insertions(+), 18 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 3c64e47..60dfe7b 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -16,8 +16,12 @@
*/
package org.apache.gobblin.service;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.StringEscapeUtils;
import com.linkedin.data.template.StringMap;
@@ -89,16 +93,32 @@ public class FlowConfigV2ResourceLocalHandler extends
FlowConfigResourceLocalHan
} else if
(Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL,
new AddSpecResponse<>("false")).getValue().toString())) {
httpStatus = HttpStatus.S_201_CREATED;
} else {
- String message = "Flow was not compiled successfully.";
- if (!flowSpec.getCompilationErrors().isEmpty()) {
- message = message + " Compilation errors encountered: " +
flowSpec.getCompilationErrors();
- }
- return new CreateKVResponse<>(new
RestLiServiceException(HttpStatus.S_400_BAD_REQUEST, message));
+ throw new RestLiServiceException(HttpStatus.S_400_BAD_REQUEST,
getErrorMessage(flowSpec));
}
return new CreateKVResponse<>(new ComplexResourceKey<>(flowConfig.getId(),
flowStatusId), flowConfig, httpStatus);
}
+ private String getErrorMessage(FlowSpec flowSpec) {
+ StringBuilder message = new StringBuilder("Flow was not compiled
successfully.");
+ if (!flowSpec.getCompilationErrors().isEmpty()) {
+ message.append(" Compilation errors encountered (Sorted by relevance):
");
+ Object[] errors = flowSpec.getCompilationErrors().toArray();
+ Arrays.sort(errors, Comparator.comparingInt(c ->
((FlowSpec.CompilationError)c).errorPriority));
+ // This is to avoid we print same error multi times.
+ Set<String> errorSet = new HashSet<>();
+ int errorId = 0;
+ for (Object er: errors) {
+ FlowSpec.CompilationError error = (FlowSpec.CompilationError)er;
+ if (!errorSet.contains(error.errorMessage)) {
+ message.append("\n").append(String.format("ERROR[%s]",
errorId)).append(error.errorMessage);
+ errorSet.add(error.errorMessage);
+ errorId++;
+ }
+ }
+ }
+ return message.toString();
+ }
/**
* Note: this method is only implemented for testing, normally partial
update would be called in
* GobblinServiceFlowConfigResourceHandler.partialUpdateFlowConfig
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index c66a531..37eb584 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -19,8 +19,9 @@ package org.apache.gobblin.runtime.api;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -91,7 +92,7 @@ public class FlowSpec implements Configurable, Spec {
final Optional<List<Spec>> childSpecs;
/** List of exceptions that occurred during compilation of this FlowSpec **/
- final Set<String> compilationErrors = new HashSet<>();
+ final List<CompilationError> compilationErrors = new ArrayList<>();
public static FlowSpec.Builder builder(URI flowSpecUri) {
return new FlowSpec.Builder(flowSpecUri);
@@ -124,6 +125,26 @@ public class FlowSpec implements Configurable, Spec {
throw new RuntimeException("Unable to create a FlowSpec URI: " + e, e);
}
}
+ public CompilationError getCompilationError(String src, String dst, String
errorMessage) {
+ return new CompilationError(src, dst, errorMessage);
+ }
+
+
+ public class CompilationError {
+ public int errorPriority;
+ public String errorMessage;
+ CompilationError(String src, String dst, String errorMessage) {
+ errorPriority = 0;
+ if (!src.equals(ConfigUtils.getString(getConfig(),
ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))){
+ errorPriority++;
+ }
+ if (!ConfigUtils.getStringList(getConfig(),
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY)
+ .containsAll(Arrays.asList(StringUtils.split(dst, ",")))){
+ errorPriority++;
+ }
+ this.errorMessage = errorMessage;
+ }
+ }
public String toShortString() {
return getUri().toString() + "/" + getVersion();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index cf99332..a55d656 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -192,14 +192,14 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
DataNode sourceNode = this.flowGraph.getNode(source);
if (sourceNode == null) {
flowSpec.getCompilationErrors()
- .add(String.format("Flowgraph does not have a node with id %s",
source));
+ .add(flowSpec.getCompilationError(source, destination,
String.format("Flowgraph does not have a node with id %s", source)));
return null;
}
List<String> destNodeIds = ConfigUtils.getStringList(flowSpec.getConfig(),
ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY);
List<DataNode> destNodes =
destNodeIds.stream().map(this.flowGraph::getNode).collect(Collectors.toList());
if (destNodes.contains(null)) {
flowSpec.getCompilationErrors()
- .add(String.format("Flowgraph does not have a node with id %s",
destNodeIds.get(destNodes.indexOf(null))));
+ .add(flowSpec.getCompilationError(source, destination,
String.format("Flowgraph does not have a node with id %s",
destNodeIds.get(destNodes.indexOf(null)))));
return null;
}
log.info(String.format("Compiling flow for source: %s and destination:
%s", source, destination));
@@ -218,12 +218,12 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
String message = String.format("Data movement is not authorized
for flow: %s, source: %s, destination: %s",
flowSpec.getUri().toString(), source, destination);
log.error(message);
- datasetFlowSpec.getCompilationErrors().add(message);
+
datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source,
destination, message));
return null;
}
} catch (Exception e) {
Instrumented.markMeter(flowCompilationFailedMeter);
-
datasetFlowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(e));
+
datasetFlowSpec.getCompilationErrors().add(datasetFlowSpec.getCompilationError(source,
destination, Throwables.getStackTraceAsString(e)));
return null;
}
}
@@ -240,14 +240,17 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
Instrumented.markMeter(flowCompilationFailedMeter);
String message = String.format("No path found from source: %s and
destination: %s", source, destination);
log.info(message);
- flowSpec.getCompilationErrors().add(message);
+
+ if
(!flowSpec.getCompilationErrors().stream().anyMatch(compilationError ->
compilationError.errorPriority == 0)) {
+
flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source,
destination, message));
+ }
return null;
}
} catch (PathFinder.PathFinderException | SpecNotFoundException |
JobTemplate.TemplateException | URISyntaxException |
ReflectiveOperationException e) {
Instrumented.markMeter(flowCompilationFailedMeter);
String message = String.format("Exception encountered while compiling
flow for source: %s and destination: %s, %s", source, destination,
Throwables.getStackTraceAsString(e));
log.error(message, e);
- flowSpec.getCompilationErrors().add(message);
+ flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(source,
destination, message));
return null;
} finally {
this.rwLock.readLock().unlock();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index 334b750..8c9fa8f 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements
PathFinder {
try {
flowEdge.getFlowTemplate().tryResolving(mergedConfig,
datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
} catch (JobTemplate.TemplateException | ConfigException |
SpecNotFoundException e) {
- this.flowSpec.getCompilationErrors().add("Error compiling edge "
+ flowEdge.toString() + ": " + e.toString());
+
this.flowSpec.getCompilationErrors().add(flowSpec.getCompilationError(flowEdge.getSrc(),
flowEdge.getDest(), "Error compiling edge " + flowEdge.toString() + ": " +
e.toString()));
continue;
}
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 831ca18..7e67549 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -647,8 +648,8 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
Assert.assertNull(dag);
- Assert.assertEquals(spec.getCompilationErrors().size(), 2);
- spec.getCompilationErrors().stream().anyMatch(s ->
s.contains(AzkabanProjectConfig.USER_TO_PROXY));
+ Assert.assertEquals(spec.getCompilationErrors().stream().map(c ->
c.errorMessage).collect(Collectors.toSet()).size(), 1);
+ spec.getCompilationErrors().stream().anyMatch(s ->
s.errorMessage.contains(AzkabanProjectConfig.USER_TO_PROXY));
}
@Test (dependsOnMethods = "testUnresolvedFlow")
@@ -659,7 +660,7 @@ public class MultiHopFlowCompilerTest {
Assert.assertEquals(dag, null);
Assert.assertEquals(spec.getCompilationErrors().size(), 1);
- spec.getCompilationErrors().stream().anyMatch(s -> s.contains("Flowgraph
does not have a node with id"));
+ spec.getCompilationErrors().stream().anyMatch(s ->
s.errorMessage.contains("Flowgraph does not have a node with id"));
}
@Test (dependsOnMethods = "testMissingSourceNodeError")
@@ -670,7 +671,7 @@ public class MultiHopFlowCompilerTest {
Assert.assertNull(dag);
Assert.assertEquals(spec.getCompilationErrors().size(), 1);
- spec.getCompilationErrors().stream().anyMatch(s -> s.contains("Flowgraph
does not have a node with id"));
+ spec.getCompilationErrors().stream().anyMatch(s ->
s.errorMessage.contains("Flowgraph does not have a node with id"));
}
@Test (dependsOnMethods = "testMissingDestinationNodeError")