[ 
https://issues.apache.org/jira/browse/GOBBLIN-1697?focusedWorklogId=808490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-808490
 ]

ASF GitHub Bot logged work on GOBBLIN-1697:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Sep/22 02:06
            Start Date: 14/Sep/22 02:06
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3549:
URL: https://github.com/apache/gobblin/pull/3549#discussion_r970233775


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.addSpec(spec);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }

Review Comment:
   This seems very similar to update besides that it calls 
specStore.addSpec(spec) vs specStore.updateSpec(spec, version). Is it possible 
for us to parameterize this or move logic around? e.g. 
`validateFlowCompilation(Flowspec spec, Callable<Void> specStoreCallable)` and 
send in a as a parameter. It's only used twice though so current still works, 
but this helps with repetition.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.addSpec(spec);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }
+
+  public Map<String, AddSpecResponse> update(Spec spec, boolean 
triggerListener, long version) throws Throwable {
+    Map<String, AddSpecResponse> responseMap = updateHelper(spec, 
triggerListener);
+    AddSpecResponse<String> compileResponse = 
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+    // Check that the flow configuration is valid and matches to a 
corresponding edge
+    if (isCompileSuccessful(compileResponse.getValue())) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      Object syncObject = getSyncObject(flowSpec.getUri().toString());
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.updateSpec(spec, version);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("false"));
+    }
+
+    return responseMap;
+  }
+
+  private Map<String, AddSpecResponse> updateHelper(Spec spec, boolean 
triggerListener) throws Throwable {

Review Comment:
   Maybe more direct naming as to what this is doing? e.g. 
`getListenerResponses` and add a javadoc as to which listeners its currently 
listening on.



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java:
##########
@@ -78,6 +78,16 @@ public interface SpecStore {
    */
   Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;
 
+  /***
+   * Update {@link Spec} in the {@link SpecStore} when current version is 
smaller than {@link version}.
+   * @param spec {@link Spec} to be updated.
+   * @param version largest version that current spec should be
+   * @throws IOException Exception in updating the {@link Spec}.
+   * @return Updated {@link Spec}.
+   * @throws SpecNotFoundException If {@link Spec} being updated is not 
present in store.
+   */
+  default Spec updateSpec(Spec spec, long version) throws IOException, 
SpecNotFoundException {return updateSpec(spec);};

Review Comment:
   I'm confused about the purpose of the version parameter here. What's the 
intention of it if we set it as the max integer? 





Issue Time Tracking
-------------------

    Worklog Id:     (was: 808490)
    Time Spent: 1h  (was: 50m)

> Have a separate resource handler to rely on CDC stream to do message 
> forwarding
> -------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1697
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1697
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Zihan Li
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to