This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 381b659 [GOBBLIN-1453] Improve error reporting on failed flow
compilations and fix bugs wher…
381b659 is described below
commit 381b659d3da7a241d6f038696396e6bba690120e
Author: William Lo <[email protected]>
AuthorDate: Fri Jun 4 15:46:57 2021 -0700
[GOBBLIN-1453] Improve error reporting on failed flow compilations and fix
bugs wher…
Closes #3291 from Will-Lo/modify-flow-compilation-
error-reporting
---
.../org/apache/gobblin/service/FlowConfigTest.java | 10 ++++++-
.../apache/gobblin/service/FlowConfigV2Test.java | 10 ++++++-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 19 ++++---------
.../runtime/spec_catalog/FlowCatalogTest.java | 24 ++++++++++++++--
.../service/modules/flow/MultiHopFlowCompiler.java | 33 ++++++++++++++--------
.../flowgraph/pathfinder/AbstractPathFinder.java | 2 +-
.../scheduler/GobblinServiceJobScheduler.java | 6 ++--
.../service/modules/core/GitConfigMonitorTest.java | 14 +++++++++
.../modules/flow/MultiHopFlowCompilerTest.java | 9 +++---
.../modules/orchestration/OrchestratorTest.java | 13 +++++++++
.../scheduler/GobblinServiceJobSchedulerTest.java | 25 ++++++++++++----
11 files changed, 118 insertions(+), 47 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 2da8bdf..8876725 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -50,6 +52,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
@Test(groups = { "gobblin.service" }, singleThreaded = true)
public class FlowConfigTest {
@@ -78,7 +83,10 @@ public class FlowConfigTest {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+ final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
flowCatalog.awaitRunning();
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 9f645cc..0710a3e 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -23,6 +23,8 @@ import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.mortbay.jetty.HttpStatus;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -56,6 +58,9 @@ import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
@Test(groups = { "gobblin.service" }, singleThreaded = true)
public class FlowConfigV2Test {
@@ -93,7 +98,10 @@ public class FlowConfigV2Test {
Config config = configBuilder.build();
final FlowCatalog flowCatalog = new FlowCatalog(config);
-
+ final SpecCatalogListener mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
flowCatalog.startAsync();
flowCatalog.awaitRunning();
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index be7b205..147a86b 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.runtime.spec_catalog;
-import com.google.common.base.Throwables;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -29,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +40,6 @@ import com.typesafe.config.ConfigException;
import javax.annotation.Nonnull;
import lombok.Getter;
-
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -341,16 +338,9 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener,
AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
- // If flow fails callbacks, need to prevent adding the flow to the
catalog
- if (!response.getValue().getFailures().isEmpty()) {
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry: response.getValue().getFailures().entrySet()) {
-
flowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(entry.getValue().getError()));
- responseMap.put(entry.getKey().getName(), new
AddSpecResponse(entry.getValue().getResult()));
- }
- } else {
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getSuccesses().entrySet()) {
- responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
- }
+ // If flow fails compilation, the result will have a non-empty string
with the error
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(),
entry.getValue().getResult());
}
}
@@ -378,8 +368,9 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
}
public static boolean isCompileSuccessful(Map<String, AddSpecResponse>
responseMap) {
+ // If we cannot get the response from the scheduler, assume that the flow
failed compilation
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
- ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new
AddSpecResponse<>(""));
+ ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new
AddSpecResponse<>(null));
return isCompileSuccessful(addSpecResponse.getValue());
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 91fee52..d69d4e5 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -208,15 +208,33 @@ public class FlowCatalogTest {
FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(),
"badFlow");
// Assume that spec is rejected
- when(this.mockListener.onAddSpec(any())).thenThrow(new
RuntimeException("Could not compile flow"));
+ when(this.mockListener.onAddSpec(any())).thenReturn(new
AddSpecResponse(null));
Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
// Spec should be rejected from being stored
specs = flowCatalog.getSpecs();
Assert.assertEquals(specs.size(), 0);
+ }
+
+ @Test (dependsOnMethods = "testRejectBadFlow")
+ public void testRejectMissingListener() {
+ flowCatalog.removeListener(this.mockListener);
+ Collection<Spec> specs = flowCatalog.getSpecs();
+ logger.info("[Before Create] Number of specs: " + specs.size());
+ int i=0;
+ for (Spec spec : specs) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ logger.info("[Before Create] Spec " + i++ + ": " +
gson.toJson(flowSpec));
+ }
+ Assert.assertTrue(specs.size() == 0, "Spec store should be empty before
addition");
+
+ // Create and add Spec
- // Add compilation errors to spec so that it will print it back to user
- Assert.assertEquals(badSpec.getCompilationErrors().size(), 1);
+ Map<String, AddSpecResponse> response = this.flowCatalog.put(flowSpec);
+
+ // Spec should be rejected from being stored
+ specs = flowCatalog.getSpecs();
+ Assert.assertEquals(specs.size(), 0);
}
public static URI computeFlowSpecURI() {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 2d0874e..e343e5e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ServiceManager;
import com.typesafe.config.Config;
@@ -201,13 +202,19 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
for (FlowSpec datasetFlowSpec : flowSpecs) {
for (DataNode destNode : destNodes) {
long authStartTime = System.nanoTime();
- boolean authorized =
this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode,
destNode);
- Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime() -
authStartTime, TimeUnit.NANOSECONDS);
- if (!authorized) {
- String message = String.format("Data movement is not authorized
for flow: %s, source: %s, destination: %s",
- flowSpec.getUri().toString(), source, destination);
- log.error(message);
- datasetFlowSpec.getCompilationErrors().add(message);
+ try {
+ boolean authorized =
this.dataMovementAuthorizer.isMovementAuthorized(flowSpec, sourceNode,
destNode);
+ Instrumented.updateTimer(dataAuthorizationTimer, System.nanoTime()
- authStartTime, TimeUnit.NANOSECONDS);
+ if (!authorized) {
+ String message = String.format("Data movement is not authorized
for flow: %s, source: %s, destination: %s",
+ flowSpec.getUri().toString(), source, destination);
+ log.error(message);
+ datasetFlowSpec.getCompilationErrors().add(message);
+ return null;
+ }
+ } catch (Exception e) {
+ Instrumented.markMeter(flowCompilationFailedMeter);
+
datasetFlowSpec.getCompilationErrors().add(Throwables.getStackTraceAsString(e));
return null;
}
}
@@ -222,14 +229,16 @@ public class MultiHopFlowCompiler extends
BaseFlowToJobSpecCompiler {
if (jobExecutionPlanDag.isEmpty()) {
Instrumented.markMeter(flowCompilationFailedMeter);
- log.info(String.format("No path found from source: %s and destination:
%s", source, destination));
- return jobExecutionPlanDag;
+ String message = String.format("No path found from source: %s and
destination: %s", source, destination);
+ log.info(message);
+ flowSpec.getCompilationErrors().add(message);
+ return null;
}
} catch (PathFinder.PathFinderException | SpecNotFoundException |
JobTemplate.TemplateException | URISyntaxException |
ReflectiveOperationException e) {
Instrumented.markMeter(flowCompilationFailedMeter);
- log.error(String
- .format("Exception encountered while compiling flow for source:
%s and destination: %s", source, destination),
- e);
+ String message = String.format("Exception encountered while compiling
flow for source: %s and destination: %s, %s", source, destination,
Throwables.getStackTraceAsString(e));
+ log.error(message, e);
+ flowSpec.getCompilationErrors().add(message);
return null;
} finally {
this.rwLock.readLock().unlock();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
index cc0a9a2..334b750 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/pathfinder/AbstractPathFinder.java
@@ -210,7 +210,7 @@ public abstract class AbstractPathFinder implements
PathFinder {
try {
flowEdge.getFlowTemplate().tryResolving(mergedConfig,
datasetDescriptorPair.getLeft(), datasetDescriptorPair.getRight());
} catch (JobTemplate.TemplateException | ConfigException |
SpecNotFoundException e) {
- this.flowSpec.getCompilationErrors().add(e.toString());
+ this.flowSpec.getCompilationErrors().add("Error compiling edge "
+ flowEdge.toString() + ": " + e.toString());
continue;
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index cc9e1e6..53366ae 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.service.modules.scheduler;
import java.io.IOException;
import java.net.URI;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
@@ -264,7 +263,7 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
/**
*
* @param addedSpec spec to be added
- * @return add spec response
+ * @return add spec response, which contains <code>null</code> if there is
an error
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
@@ -289,10 +288,9 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag =
this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ // If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
- } else if (!flowSpec.getCompilationErrors().isEmpty()) {
- response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
}
boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
index 1912d15..3fddd4a 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GitConfigMonitorTest.java
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.ResetCommand;
import org.eclipse.jgit.api.errors.GitAPIException;
@@ -55,6 +58,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+
public class GitConfigMonitorTest {
private static final Logger logger =
LoggerFactory.getLogger(GitConfigMonitorTest.class);
private Repository remoteRepo;
@@ -73,6 +80,7 @@ public class GitConfigMonitorTest {
private RefSpec masterRefSpec = new RefSpec("master");
private FlowCatalog flowCatalog;
+ private SpecCatalogListener mockListener;
private Config config;
private GitConfigMonitor gitConfigMonitor;
@@ -101,6 +109,12 @@ public class GitConfigMonitorTest {
.build();
this.flowCatalog = new FlowCatalog(config);
+
+ this.mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+ this.flowCatalog.addListener(mockListener);
this.flowCatalog.startAsync().awaitRunning();
this.gitConfigMonitor = new GitConfigMonitor(this.config,
this.flowCatalog);
this.gitConfigMonitor.setActive(true);
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index a99928e..14f2078 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -33,7 +33,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
@@ -528,7 +527,7 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
//Ensure no path to destination.
- Assert.assertTrue(jobDag.isEmpty());
+ Assert.assertEquals(jobDag, null);
}
@Test (dependsOnMethods = "testCompileFlowAfterSecondEdgeDeletion")
@@ -647,9 +646,9 @@ public class MultiHopFlowCompilerTest {
Dag<JobExecutionPlan> dag = specCompiler.compileFlow(spec);
- Assert.assertTrue(dag.isEmpty());
- Assert.assertEquals(spec.getCompilationErrors().size(), 1);
-
Assert.assertTrue(spec.getCompilationErrors().iterator().next().contains(AzkabanProjectConfig.USER_TO_PROXY));
+ Assert.assertEquals(dag, null);
+ Assert.assertEquals(spec.getCompilationErrors().size(), 2);
+ spec.getCompilationErrors().stream().anyMatch(s ->
s.contains(AzkabanProjectConfig.USER_TO_PROXY));
}
@Test (dependsOnMethods = "testUnresolvedFlow")
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index bdacbb8..7087c1e 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,7 +17,9 @@
package org.apache.gobblin.service.modules.orchestration;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import java.io.File;
import java.net.URI;
@@ -27,6 +29,7 @@ import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +53,9 @@ import
org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
public class OrchestratorTest {
private static final Logger logger =
LoggerFactory.getLogger(TopologyCatalog.class);
@@ -67,6 +73,7 @@ public class OrchestratorTest {
private TopologySpec topologySpec;
private FlowCatalog flowCatalog;
+ private SpecCatalogListener mockListener;
private FlowSpec flowSpec;
private Orchestrator orchestrator;
@@ -92,6 +99,12 @@ public class OrchestratorTest {
this.flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties),
Optional.of(logger));
+
+ this.mockListener = mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+
+ this.flowCatalog.addListener(mockListener);
this.serviceLauncher.addService(flowCatalog);
this.orchestrator = new
Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
diff --git
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index b7dc76f..cc84632 100644
---
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -28,11 +28,13 @@ import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
@@ -46,6 +48,8 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
public class GobblinServiceJobSchedulerTest {
@@ -65,6 +69,10 @@ public class GobblinServiceJobSchedulerTest {
Properties properties = new Properties();
properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
serviceLauncher.addService(flowCatalog);
@@ -73,8 +81,8 @@ public class GobblinServiceJobSchedulerTest {
FlowSpec flowSpec0 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
- flowCatalog.put(flowSpec0, false);
- flowCatalog.put(flowSpec1, false);
+ flowCatalog.put(flowSpec0, true);
+ flowCatalog.put(flowSpec1, true);
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
@@ -143,6 +151,12 @@ public class GobblinServiceJobSchedulerTest {
FlowCatalog flowCatalog = new
FlowCatalog(ConfigUtils.propertiesToConfig(properties));
ServiceBasedAppLauncher serviceLauncher = new
ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+ // Assume that the catalog can store corrupted flows
+ SpecCatalogListener mockListener = Mockito.mock(SpecCatalogListener.class);
+
when(mockListener.getName()).thenReturn(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS);
+ when(mockListener.onAddSpec(any())).thenReturn(new AddSpecResponse(""));
+ flowCatalog.addListener(mockListener);
+
serviceLauncher.addService(flowCatalog);
serviceLauncher.start();
@@ -151,11 +165,10 @@ public class GobblinServiceJobSchedulerTest {
FlowSpec flowSpec1 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
FlowSpec flowSpec2 =
FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec2"));
- // Assume that the catalog can store corrupted flows
- flowCatalog.put(flowSpec0, false);
// Ensure that these flows are scheduled
- flowCatalog.put(flowSpec1, false);
- flowCatalog.put(flowSpec2, false);
+ flowCatalog.put(flowSpec0, true);
+ flowCatalog.put(flowSpec1, true);
+ flowCatalog.put(flowSpec2, true);
Assert.assertEquals(flowCatalog.getSpecs().size(), 3);