[
https://issues.apache.org/jira/browse/GOBBLIN-1697?focusedWorklogId=808490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-808490
]
ASF GitHub Bot logged work on GOBBLIN-1697:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Sep/22 02:06
Start Date: 14/Sep/22 02:06
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3549:
URL: https://github.com/apache/gobblin/pull/3549#discussion_r970233775
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
* @return a map of listeners and their {@link AddSpecResponse}s
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener)
throws Throwable {
+ Map<String, AddSpecResponse> responseMap = updateHelper(spec,
triggerListener);
+ AddSpecResponse<String> compileResponse =
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+ // Check that the flow configuration is valid and matches to a
corresponding edge
+ if (isCompileSuccessful(compileResponse.getValue())) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Object syncObject = getSyncObject(flowSpec.getUri().toString());
+ synchronized (syncObject) {
+ try {
+ if (!flowSpec.isExplain()) {
+ long startTime = System.currentTimeMillis();
+ specStore.addSpec(spec);
+ metrics.updatePutSpecTime(startTime);
+ }
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("true"));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " +
flowSpec, e);
+ } finally {
+ syncObject.notifyAll();
+ this.specSyncObjects.remove(flowSpec.getUri().toString());
+ }
+ }
+ } else {
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("false"));
+ }
+
+ return responseMap;
+ }
Review Comment:
This seems very similar to update besides that it calls
specStore.addSpec(spec) vs specStore.updateSpec(spec, version). Is it possible
for us to parameterize this or move logic around? e.g.
`validateFlowCompilation(Flowspec spec, Callable<Void> specStoreCallable)` and
send in a as a parameter. It's only used twice though so current still works,
but this helps with repetition.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java:
##########
@@ -364,6 +364,64 @@ public Spec getSpecWrapper(URI uri) {
* @return a map of listeners and their {@link AddSpecResponse}s
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener)
throws Throwable {
+ Map<String, AddSpecResponse> responseMap = updateHelper(spec,
triggerListener);
+ AddSpecResponse<String> compileResponse =
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+ // Check that the flow configuration is valid and matches to a
corresponding edge
+ if (isCompileSuccessful(compileResponse.getValue())) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Object syncObject = getSyncObject(flowSpec.getUri().toString());
+ synchronized (syncObject) {
+ try {
+ if (!flowSpec.isExplain()) {
+ long startTime = System.currentTimeMillis();
+ specStore.addSpec(spec);
+ metrics.updatePutSpecTime(startTime);
+ }
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("true"));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " +
flowSpec, e);
+ } finally {
+ syncObject.notifyAll();
+ this.specSyncObjects.remove(flowSpec.getUri().toString());
+ }
+ }
+ } else {
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("false"));
+ }
+
+ return responseMap;
+ }
+
+ public Map<String, AddSpecResponse> update(Spec spec, boolean
triggerListener, long version) throws Throwable {
+ Map<String, AddSpecResponse> responseMap = updateHelper(spec,
triggerListener);
+ AddSpecResponse<String> compileResponse =
responseMap.get(ServiceConfigKeys.COMPILATION_RESPONSE);
+ // Check that the flow configuration is valid and matches to a
corresponding edge
+ if (isCompileSuccessful(compileResponse.getValue())) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ Object syncObject = getSyncObject(flowSpec.getUri().toString());
+ synchronized (syncObject) {
+ try {
+ if (!flowSpec.isExplain()) {
+ long startTime = System.currentTimeMillis();
+ specStore.updateSpec(spec, version);
+ metrics.updatePutSpecTime(startTime);
+ }
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("true"));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " +
flowSpec, e);
+ } finally {
+ syncObject.notifyAll();
+ this.specSyncObjects.remove(flowSpec.getUri().toString());
+ }
+ }
+ } else {
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("false"));
+ }
+
+ return responseMap;
+ }
+
+ private Map<String, AddSpecResponse> updateHelper(Spec spec, boolean
triggerListener) throws Throwable {
Review Comment:
Maybe more direct naming as to what this is doing? e.g.
`getListenerResponses` and add a javadoc as to which listeners its currently
listening on.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java:
##########
@@ -78,6 +78,16 @@ public interface SpecStore {
*/
Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;
+ /***
+ * Update {@link Spec} in the {@link SpecStore} when current version is
smaller than {@link version}.
+ * @param spec {@link Spec} to be updated.
+ * @param version largest version that current spec should be
+ * @throws IOException Exception in updating the {@link Spec}.
+ * @return Updated {@link Spec}.
+ * @throws SpecNotFoundException If {@link Spec} being updated is not
present in store.
+ */
+ default Spec updateSpec(Spec spec, long version) throws IOException,
SpecNotFoundException {return updateSpec(spec);};
Review Comment:
I'm confused about the purpose of the version parameter here. What's the
intention of it if we set it as the max integer?
Issue Time Tracking
-------------------
Worklog Id: (was: 808490)
Time Spent: 1h (was: 50m)
> Have a separate resource handler to rely on CDC stream to do message
> forwarding
> -------------------------------------------------------------------------------
>
> Key: GOBBLIN-1697
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1697
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)