sv2000 commented on a change in pull request #3011:
URL: https://github.com/apache/incubator-gobblin/pull/3011#discussion_r432873482



##########
File path: 
gobblin-modules/gobblin-kafka-09/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
##########
@@ -147,8 +165,8 @@ public void setup() throws Exception {
         ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new 
Path(SERVICE_WORK_DIR)));
     this.gobblinServiceManager.start();
 
-    this.flowConfigClient = new 
FlowConfigClient(String.format("http://localhost:%s/";,
-        this.gobblinServiceManager.getRestLiServer().getPort()));
+    this.flowConfigClient = new 
FlowConfigV2Client(String.format("http://localhost:%s/";,

Review comment:
       Can we replace localhost with 127.0.0.1 to avoid flakiness on Travis?

##########
File path: 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
##########
@@ -289,6 +297,9 @@ public Spec getSpecWrapper(URI uri) {
   /**
    * Persist {@link Spec} into {@link SpecStore} and notify {@link 
SpecCatalogListener} if triggerListener
    * is set to true.
+   * If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be 
compiled at the time this method received
+   * the spec. `explain` specs are not persisted. The logic of this method is 
tightly coupled with the logic of

Review comment:
       Javadoc can be enhanced to explain how the coupling is achieved e.g. we 
use condition variables to achieve synchronization between 
NonScheduledJobRunner thread and FlowCatalog thread to ensure deletion of 
FlowSpec happens after the corresponding run once flow is submitted to the 
orchestrator.

##########
File path: 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
##########
@@ -435,10 +452,22 @@ public void run() {
       try {
         GobblinServiceJobScheduler.this.runJob(this.jobConfig, 
this.jobListener);
         if (flowCatalog.isPresent() && removeSpec) {
+          if 
(GobblinServiceJobScheduler.this.flowCatalog.get().getSpecSyncObjects().containsKey(specUri.toString()))
 {
+            // if the sync object does not exist, this job must be set to run 
due to job submission at service restart
+            Object syncObject = 
GobblinServiceJobScheduler.this.flowCatalog.get().getSpecSyncObjects().get(specUri.toString());

Review comment:
       This looks very cumbersome! Let's add a method in flowCatalog that 
returns the syncObject given the specUri and avoid exposing specSyncObjects map 
outside flowcatalog.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to