This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a90d1a3b [GOBBLIN-1697]Have a separate resource handler to rely on 
CDC stream to do message forwarding (#3549)
1a90d1a3b is described below

commit 1a90d1a3b126b39a1b03f4c4d5dbddb1171fb399
Author: Zihan Li <[email protected]>
AuthorDate: Wed Sep 21 20:13:28 2022 -0700

    [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do 
message forwarding (#3549)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * fix test case to test orchestor as one listener of flow spec
    
    * remove unintentional change
    
    * [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to 
do message forwarding
    
    * fix compilation error
    
    * address comments
    
    * address comments
    
    * address comments
    
    * update outdated javadoc
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../service/FlowConfigResourceLocalHandler.java    |   8 +-
 .../gobblin/runtime/api/InstrumentedSpecStore.java |   9 +
 .../org/apache/gobblin/runtime/api/SpecStore.java  |  10 +
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  26 +-
 .../runtime/spec_store/MysqlBaseSpecStore.java     |   5 +
 .../spec_store/MysqlSpecStoreWithUpdate.java       |  89 +++++
 .../spec_store/MysqlSpecStoreWithUpdateTest.java   | 399 +++++++++++++++++++++
 .../GobblinServiceFlowConfigResourceHandler.java   |   2 +-
 ...FlowConfigV2ResourceHandlerWithWarmStandby.java | 127 +++++++
 9 files changed, 667 insertions(+), 8 deletions(-)

diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 5837359d1..5c8dc8c41 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -161,10 +161,14 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
     return this.createFlowConfig(flowConfig, true);
   }
 
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, 
boolean triggerListener) {
+    // Set the max version to be the largest value so that we blindly update 
the flow spec in this case
+    return updateFlowConfig(flowId, flowConfig, triggerListener, 
Long.MAX_VALUE);
+  }
   /**
    * Update flowConfig locally and trigger all listeners iff @param 
triggerListener is set to true
    */
-  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, 
boolean triggerListener) {
+  public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig, 
boolean triggerListener, long modifiedWatermark) {
     log.info("[GAAS-REST] Update called with flowGroup {} flowName {}", 
flowId.getFlowGroup(), flowId.getFlowName());
 
     if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) || 
!flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
@@ -185,7 +189,7 @@ public class FlowConfigResourceLocalHandler implements 
FlowConfigsResourceHandle
       flowConfig = originalFlowConfig;
     }
     try {
-      this.flowCatalog.put(createFlowSpecForConfig(flowConfig), 
triggerListener);
+      this.flowCatalog.update(createFlowSpecForConfig(flowConfig), 
triggerListener, modifiedWatermark);
     } catch (QuotaExceededException e) {
       throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, 
e.getMessage());
     } catch (Throwable e) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index dfd643cd4..d2dfd7eb8 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -150,6 +150,11 @@ public abstract class InstrumentedSpecStore implements 
SpecStore {
     return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec));
   }
 
+  @Override
+  public Spec updateSpec(Spec spec, long modifiedWatermark) throws 
IOException, SpecNotFoundException {
+    return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec, 
modifiedWatermark));
+  }
+
   @Override
   public Collection<Spec> getSpecs() throws IOException {
     return this.getAllTimer.invokeMayThrowIO(() -> getSpecsImpl());
@@ -175,6 +180,10 @@ public abstract class InstrumentedSpecStore implements 
SpecStore {
     return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
   }
 
+  public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws 
IOException, SpecNotFoundException{
+    return updateSpecImpl(spec);
+  }
+
   public abstract void addSpecImpl(Spec spec) throws IOException;
   public abstract Spec updateSpecImpl(Spec spec) throws IOException, 
SpecNotFoundException;
   public abstract boolean existsImpl(URI specUri) throws IOException;
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 3642d5a02..072ce2195 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -78,6 +78,16 @@ public interface SpecStore {
    */
   Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;
 
+  /***
+   * Update {@link Spec} in the {@link SpecStore} when modification time of 
current entry is smaller than {@link modifiedWatermark}.
+   * @param spec {@link Spec} to be updated.
+   * @param modifiedWatermark largest modifiedWatermark that current spec 
should be
+   * @throws IOException Exception in updating the {@link Spec}.
+   * @return Updated {@link Spec}.
+   * @throws SpecNotFoundException If {@link Spec} being updated is not 
present in store.
+   */
+  default Spec updateSpec(Spec spec, long modifiedWatermark) throws 
IOException, SpecNotFoundException {return updateSpec(spec);};
+
   /***
    * Retrieve the latest version of the {@link Spec} by URI from the {@link 
SpecStore}.
    * @param specUri URI for the {@link Spec} to be retrieved.
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 6a64ae9bb..2ab61e14d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -349,21 +349,33 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
     return spec;
   }
 
+
+  public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+    return updateOrAddSpecHelper(spec, triggerListener, false, Long.MAX_VALUE);
+  }
+
+  public Map<String, AddSpecResponse> update(Spec spec, boolean 
triggerListener, long modifiedWatermark) throws Throwable {
+    return updateOrAddSpecHelper(spec, triggerListener, true, 
modifiedWatermark);
+  }
+
   /**
    * Persist {@link Spec} into {@link SpecStore} and notify {@link 
SpecCatalogListener} if triggerListener
    * is set to true.
    * If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be 
compiled at the time this method received
    * the spec. `explain` specs are not persisted. The logic of this method is 
tightly coupled with the logic of
-   * {@link GobblinServiceJobScheduler#onAddSpec()}, which is one of the 
listener of {@link FlowCatalog}.
+   * {@link GobblinServiceJobScheduler#onAddSpec()} or {@link 
Orchestrator#onAddSpec()} in warm standby mode,
+   * which is one of the listener of {@link FlowCatalog}.
    * We use condition variables {@link #specSyncObjects} to achieve 
synchronization between
    * {@link GobblinServiceJobScheduler#NonScheduledJobRunner} thread and this 
thread to ensure deletion of
    * {@link FlowSpec} happens after the corresponding run once flow is 
submitted to the orchestrator.
    *
    * @param spec The Spec to be added
    * @param triggerListener True if listeners should be notified.
+   * @param isUpdate Whether this is update or add operation, it will call 
different method in spec store to persist the spec
+   * @param modifiedWatermark If it's update operation, the largest 
modifiedWatermark that it can modify, or in other word, the timestamp which old 
spec should be modified before
    * @return a map of listeners and their {@link AddSpecResponse}s
    */
-  public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) 
throws Throwable {
+  private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec, 
boolean triggerListener, boolean isUpdate, long modifiedWatermark) throws 
Throwable {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
     FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is 
not running.", this.getClass().getName()));
@@ -384,7 +396,6 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
         for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> 
entry : response.getValue().getFailures().entrySet()) {
           throw entry.getValue().getError().getCause();
         }
-        return responseMap;
       }
     }
     AddSpecResponse<String> compileResponse;
@@ -402,12 +413,17 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
         try {
           if (!flowSpec.isExplain()) {
             long startTime = System.currentTimeMillis();
-            specStore.addSpec(spec);
+            if (isUpdate) {
+              specStore.updateSpec(spec, modifiedWatermark);
+            } else {
+              specStore.addSpec(spec);
+            }
             metrics.updatePutSpecTime(startTime);
           }
           responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new 
AddSpecResponse<>("true"));
         } catch (IOException e) {
-          throw new RuntimeException("Cannot add Spec to Spec store: " + 
flowSpec, e);
+          String operation = isUpdate ? "update" : "add";
+          throw new RuntimeException("Cannot " + operation + " Spec to Spec 
store: " + flowSpec, e);
         } finally {
           syncObject.notifyAll();
           this.specSyncObjects.remove(flowSpec.getUri().toString());
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index df3d65912..f5d6dcebf 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -75,6 +75,9 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore 
{
   private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE spec_uri = ?)";
   protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, 
tag, spec) "
       + "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+  // Keep previous syntax that update only update spec and spec_json
+  //todo: do we need change this behavior so that everything can be updated
+  protected static final String UPDATE_STATEMENT = "UPDATE %s SET 
spec=?,spec_json=? WHERE spec_uri=? AND UNIX_TIMESTAMP(modified_time) < ?";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
spec_uri = ?";
   private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM 
%s WHERE ";
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM 
%s";
@@ -91,6 +94,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore 
{
    * between statements, collect them within this inner class that enables 
selective, per-statement override, and delivers them as a unit.
    */
   protected class SqlStatements {
+    public final String updateStatement = 
String.format(getTablelessUpdateStatement(), MysqlBaseSpecStore.this.tableName);
     public final String existsStatement = 
String.format(getTablelessExistsStatement(), MysqlBaseSpecStore.this.tableName);
     public final String insertStatement = 
String.format(getTablelessInsertStatement(), MysqlBaseSpecStore.this.tableName);
     public final String deleteStatement = 
String.format(getTablelessDeleteStatement(), MysqlBaseSpecStore.this.tableName);
@@ -115,6 +119,7 @@ public class MysqlBaseSpecStore extends 
InstrumentedSpecStore {
     }
 
     protected String getTablelessExistsStatement() { return 
MysqlBaseSpecStore.EXISTS_STATEMENT; }
+    protected String getTablelessUpdateStatement() { return 
MysqlBaseSpecStore.UPDATE_STATEMENT; }
     protected String getTablelessInsertStatement() { return 
MysqlBaseSpecStore.INSERT_STATEMENT; }
     protected String getTablelessDeleteStatement() { return 
MysqlBaseSpecStore.DELETE_STATEMENT; }
     protected String getTablelessGetStatementBase() { return 
MysqlBaseSpecStore.GET_STATEMENT_BASE; }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
new file mode 100644
index 000000000..158b99659
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+public class MysqlSpecStoreWithUpdate extends MysqlSpecStore{
+  // In this case, when we try to insert but key is existed, we will throw 
exception
+  protected static final String INSERT_STATEMENT_WITHOUT_UPDATE = "INSERT INTO 
%s (spec_uri, flow_group, flow_name, template_uri, "
+      + "user_to_proxy, source_identifier, destination_identifier, schedule, 
tag, isRunImmediately, owning_group, spec, spec_json) "
+      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+  public MysqlSpecStoreWithUpdate(Config config, SpecSerDe specSerDe) throws 
IOException {
+    super(config, specSerDe);
+  }
+
+  /** Bundle all changes following from schema differences against the base 
class. */
+  protected class SpecificSqlStatementsWithUpdate extends 
SpecificSqlStatements {
+    public void completeUpdatePreparedStatement(PreparedStatement statement, 
Spec spec, long modifiedWatermark) throws
+                                                                               
                          SQLException {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      URI specUri = flowSpec.getUri();
+
+      int i = 0;
+
+      statement.setBlob(++i, new 
ByteArrayInputStream(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec)));
+      statement.setString(++i, new 
String(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec), 
Charsets.UTF_8));
+      statement.setString(++i, specUri.toString());
+      statement.setLong(++i, modifiedWatermark);
+    }
+
+    @Override
+    protected String getTablelessInsertStatement() { return 
INSERT_STATEMENT_WITHOUT_UPDATE; }
+  }
+
+  @Override
+  protected SqlStatements createSqlStatements() {
+    return new SpecificSqlStatementsWithUpdate();
+  }
+
+  @Override
+  // TODO: fix to obey the `SpecStore` contract of returning the *updated* 
`Spec`
+  public Spec updateSpecImpl(Spec spec) throws IOException {
+    updateSpecImpl(spec, Long.MAX_VALUE);
+    return spec;
+  }
+
+  @Override
+  // TODO: fix to obey the `SpecStore` contract of returning the *updated* 
`Spec`
+  // Update {@link Spec} in the {@link SpecStore} when current modification 
time is smaller than {@link modifiedWatermark}.
+  public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws 
IOException {
+    withPreparedStatement(this.sqlStatements.updateStatement, statement -> {
+      
((SpecificSqlStatementsWithUpdate)this.sqlStatements).completeUpdatePreparedStatement(statement,
 spec, modifiedWatermark);
+      int i = statement.executeUpdate();
+      if (i == 0) {
+        throw new IOException("Spec does not exist or concurrent update 
happens, please check current spec and update again");
+      }
+      return null; // (type: `Void`)
+    }, true);
+    return spec;
+  }
+
+
+
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
new file mode 100644
index 000000000..621fa3771
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
+import org.apache.gobblin.service.FlowId;
+
+import static 
org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static 
org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+
+
+public class MysqlSpecStoreWithUpdateTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "spec_store";
+
+  private MysqlSpecStoreWithUpdate specStore;
+  private MysqlSpecStore oldSpecStore;
+  private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+  private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+  private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+  private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
+  private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4, 
flowSpec4_update;
+
+  public MysqlSpecStoreWithUpdateTest()
+      throws URISyntaxException { // (based on `uri1` and other 
initializations just above)
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.specStore = new MysqlSpecStoreWithUpdate(config, new TestSpecSerDe());
+    this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
+
+    flowSpec1 = FlowSpec.builder(this.uri1)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive("key", "value")
+            .addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
+            .addPrimitive("config.with.dot", "value4")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn1").build())
+        .withDescription("Test flow spec")
+        .withVersion("Test version")
+        .build();
+    flowSpec2 = FlowSpec.builder(this.uri2)
+        .withConfig(ConfigBuilder.create().addPrimitive("converter", 
"value1,value2,value3")
+            .addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn2").build())
+        .withDescription("Test flow spec 2")
+        .withVersion("Test version 2")
+        .build();
+    flowSpec3 = FlowSpec.builder(this.uri3)
+        .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+            .addPrimitive("filter.this.flow", true)
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn3").build())
+        .withDescription("Test flow spec 3")
+        .withVersion("Test version 3")
+        .build();
+
+    flowSpec4 = FlowSpec.builder(this.uri4)
+        .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+            .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
"owningGroup4").build())
+        .withDescription("Test flow spec 4")
+        .withVersion("Test version 4")
+        .build();
+    flowSpec4_update = FlowSpec.builder(this.uri4)
+        .withConfig(ConfigBuilder.create().addPrimitive("key4", 
"value4_update")
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+            .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
"owningGroup4").build())
+        .withDescription("Test flow spec 4")
+        .withVersion("Test version 4")
+        .build();
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void testSpecSearch() throws Exception {
+    // empty FlowSpecSearchObject should throw an error
+    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().build();
+    flowSpecSearchObject.augmentBaseGetStatement("SELECT * FROM Dummy WHERE ");
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec1);
+    this.specStore.addSpec(this.flowSpec2);
+    this.specStore.addSpec(this.flowSpec4);
+
+    Assert.assertEquals(this.specStore.getSize(), 3);
+    Assert.assertTrue(this.specStore.exists(this.uri1));
+    Assert.assertTrue(this.specStore.exists(this.uri2));
+    Assert.assertTrue(this.specStore.exists(this.uri4));
+    Assert.expectThrows(Exception.class, () -> 
this.specStore.addSpec(this.flowSpec1));
+    Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+  }
+
+  @Test (dependsOnMethods = "testAddSpec")
+  public void testGetSpec() throws Exception {
+    FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+    Assert.assertEquals(result, this.flowSpec1);
+
+    Collection<Spec> specs = this.specStore.getSpecs();
+    Assert.assertEquals(specs.size(), 3);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    Iterator<URI> uris = this.specStore.getSpecURIs();
+    Assert.assertTrue(Iterators.contains(uris, this.uri1));
+    Assert.assertTrue(Iterators.contains(uris, this.uri2));
+
+    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().flowGroup("fg1").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().flowName("fn2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().flowName("fg1").flowGroup("fn2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 0);
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().propertyFilter("key=value").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().propertyFilter("converter=value2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().propertyFilter("key3").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().propertyFilter("config.with.dot=value4").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().owningGroup("owningGroup4").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec4));
+  }
+
+  @Test  (dependsOnMethods = "testGetSpec")
+  public void testGetSpecWithTag() throws Exception {
+
+    //Creating and inserting flowspecs with tags
+    URI uri5 = URI.create("flowspec5");
+    FlowSpec flowSpec5 = FlowSpec.builder(uri5)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
+            .addPrimitive("key5", "value5").build())
+        .withDescription("Test flow spec 5")
+        .withVersion("Test version 5")
+        .build();
+
+    URI uri6 = URI.create("flowspec6");
+    FlowSpec flowSpec6 = FlowSpec.builder(uri6)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+            .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+            .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
+            .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
+            .addPrimitive("key6", "value6").build())
+        .withDescription("Test flow spec 6")
+        .withVersion("Test version 6")
+        .build();
+
+    this.specStore.addSpec(flowSpec5, "dr");
+    this.specStore.addSpec(flowSpec6, "dr");
+
+    Assert.assertTrue(this.specStore.exists(uri5));
+    Assert.assertTrue(this.specStore.exists(uri6));
+    List<URI> result = new ArrayList<>();
+    this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+    Assert.assertEquals(result.size(), 2);
+  }
+
+  @Test (dependsOnMethods = "testGetSpec")
+  public void testGetFilterSpecPaginate() throws Exception {
+    /**
+     * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
+     * flowSpec3 is not included as it is a 'corrupted' flowspec
+     * flowSpec4 is not included as it doesn't have the 'filter.this.flow' 
property
+     * Start is the offset of the first configuration to return
+     * Count is the total number of configurations to return
+     * PropertyFilter is the property to filter by
+     */
+
+    // Start of 0 and count of 1 means start from index 0, and return one 
configuration only
+    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+    Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertFalse(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    // Start of 1 and count of 1 means start from index 1, and return one 
configuration only
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(1).count(1).propertyFilter("filter.this.flow").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertFalse(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    /**
+     * Start of 0 and count of 5 means start from index 0, and return five 
configuration only
+     * Total of 3 flowSpecs in the DB, but flowSpec4 doesn't have 
'filter.this.flow' filter so only returns 2 flowSpecs
+     * flowSpec1 and flowSpec2 match all the criteria
+     */
+    flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(0).count(5).propertyFilter("filter.this.flow").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+  }
+
+  @Test (dependsOnMethods =  "testGetSpec")
+  public void testUpdate() throws Exception{
+    long version = System.currentTimeMillis() /1000;
+    this.specStore.updateSpec(this.flowSpec4_update);
+    Assert.assertEquals(((FlowSpec) this.specStore.getSpec(this.uri4)), 
flowSpec4_update);
+    Assert.expectThrows(IOException.class, () -> 
this.specStore.updateSpec(flowSpec4, version));
+  }
+
+  @Test (dependsOnMethods =  "testGetSpec")
+  public void testGetAllSpecPaginate() throws Exception {
+    /**
+     * Sorted order of the specStore configurations is flowSpec1, flowSpec2, 
flowSpec4
+     */
+    // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so 
return all 3 flowSpecs
+    Collection<Spec> specs = this.specStore.getSpecs(0,10);
+    for (Spec spec: specs) {
+      System.out.println("test" + spec.getUri());
+    }
+    Assert.assertEquals(specs.size(), 3);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertTrue(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs using the default get all specs function. Testing 
default functionality of returning everything
+    specs = this.specStore.getSpecs();
+    Assert.assertEquals(specs.size(), 3);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertTrue(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return 
first two.
+    specs = this.specStore.getSpecs(0,2);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+
+    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return 
first two.
+    // Check that functionality for not including a start value is the same as 
including start value of 0
+    specs = this.specStore.getSpecs(-1, 2);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec4));
+  }
+
+  @Test (expectedExceptions = {IOException.class})
+  public void testGetCorruptedSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec3);
+  }
+
+  @Test (dependsOnMethods = "testGetSpecWithTag")
+  public void testDeleteSpec() throws Exception {
+    Assert.assertEquals(this.specStore.getSize(), 5);
+    this.specStore.deleteSpec(this.uri1);
+    Assert.assertEquals(this.specStore.getSize(), 4);
+    Assert.assertFalse(this.specStore.exists(this.uri1));
+  }
+
+  @Test (dependsOnMethods = "testDeleteSpec")
+  public void testReadOldColumn() throws Exception {
+    this.oldSpecStore.addSpec(this.flowSpec1);
+
+    FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+    Assert.assertEquals(spec, this.flowSpec1);
+  }
+
+  /**
+   * A {@link MysqlSpecStore} which does not write into the new spec_json 
column
+   * to simulate behavior of a table with old data.
+   */
+  public static class OldSpecStore extends MysqlSpecStore {
+
+    public OldSpecStore(Config config, SpecSerDe specSerDe) throws IOException 
{
+      super(config, specSerDe);
+    }
+
+    @Override
+    public void addSpec(Spec spec, String tagValue) throws IOException {
+      try (Connection connection = this.dataSource.getConnection();
+          PreparedStatement statement = 
connection.prepareStatement(this.sqlStatements.insertStatement)) {
+        this.sqlStatements.completeInsertPreparedStatement(statement, spec, 
tagValue);
+        statement.setString(4, null);
+        statement.executeUpdate();
+        connection.commit();
+      } catch (SQLException | SpecSerDeException e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+  public class TestSpecSerDe extends GsonFlowSpecSerDe {
+    @Override
+    public byte[] serialize(Spec spec) throws SpecSerDeException {
+      byte[] bytes = super.serialize(spec);
+      // Reverse bytes to simulate corrupted Spec
+      if (spec.getUri().equals(uri3)) {
+        ArrayUtils.reverse(bytes);
+      }
+      return bytes;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 8045ae171..f0db8b582 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -63,7 +63,7 @@ public class GobblinServiceFlowConfigResourceHandler 
implements FlowConfigsResou
   @Getter
   private String serviceName;
   private boolean flowCatalogLocalCommit;
-  private FlowConfigResourceLocalHandler localHandler;
+  protected FlowConfigResourceLocalHandler localHandler;
   private Optional<HelixManager> helixManager;
   private GobblinServiceJobScheduler jobScheduler;
   private boolean forceLeader;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
new file mode 100644
index 000000000..7bfd93a4c
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import com.google.common.base.Optional;
+import com.linkedin.data.transform.DataProcessingException;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
+import java.util.Properties;
+import javax.inject.Inject;
+import javax.inject.Named;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.helix.HelixManager;
+
+@Slf4j
+public class GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby extends 
GobblinServiceFlowConfigV2ResourceHandler {
+ @Inject
+  public 
GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby(@Named(InjectionNames.SERVICE_NAME)
 String serviceName,
+      @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean 
flowCatalogLocalCommit,
+      FlowConfigV2ResourceLocalHandler handler, Optional<HelixManager> 
manager, GobblinServiceJobScheduler jobScheduler,
+      @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
+    super(serviceName, flowCatalogLocalCommit, handler, manager, jobScheduler, 
forceLeader);
+  }
+
+
+  @Override
+  public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header)
+      throws FlowConfigLoggedException {
+    return this.localHandler.deleteFlowConfig(flowId, header);
+  }
+
+  @Override
+  public UpdateResponse  partialUpdateFlowConfig(FlowId flowId,
+      PatchRequest<FlowConfig> flowConfigPatch) throws 
FlowConfigLoggedException {
+    long modifiedWatermark = System.currentTimeMillis() / 1000;
+    FlowConfig flowConfig = getFlowConfig(flowId);
+
+    try {
+      PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+    } catch (DataProcessingException e) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, 
"Failed to apply partial update", e);
+    }
+
+    return updateFlowConfig(flowId, flowConfig, modifiedWatermark);
+  }
+
+  @Override
+  public UpdateResponse updateFlowConfig(FlowId flowId,
+      FlowConfig flowConfig) throws FlowConfigLoggedException {
+    // We have modifiedWatermark here to avoid update config happens at the 
same time on different hosts overwrite each other
+    // timestamp here will be treated as largest modifiedWatermark that we can 
update
+    long version = System.currentTimeMillis() / 1000;
+    return updateFlowConfig(flowId, flowConfig, version);
+  }
+  public UpdateResponse updateFlowConfig(FlowId flowId,
+      FlowConfig flowConfig, long modifiedWatermark) throws 
FlowConfigLoggedException {
+    String flowName = flowId.getFlowName();
+    String flowGroup = flowId.getFlowGroup();
+
+    if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) || 
!flowName.equals(flowConfig.getId().getFlowName())) {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          "flowName and flowGroup cannot be changed in update", null);
+    }
+
+      // We directly call localHandler to create flow config and put it in 
spec store
+
+      //Instead of helix message, forwarding message is done by change stream 
of spec store
+
+      return this.localHandler.updateFlowConfig(flowId, flowConfig, true, 
modifiedWatermark);
+  }
+  /**
+   * Adding {@link FlowConfig} call {@link 
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
+   * no matter it's active or standby, rely on the CDC stream for spec store 
to forward the change to other hosts
+   *
+   */
+  @Override
+  public CreateResponse createFlowConfig(FlowConfig flowConfig)
+      throws FlowConfigLoggedException {
+
+    if 
(flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+      throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+          String.format("%s cannot be set by the user", 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY), null);
+    }
+
+
+    CreateResponse response = null;
+    // We directly call localHandler to create flow config and put it in spec 
store
+    response = this.localHandler.createFlowConfig(flowConfig, true);
+
+    //Instead of helix message, forwarding message is done by change stream of 
spec store
+
+    // Do actual work on remote node, directly return success
+
+    return response == null ? new CreateResponse(new 
ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()),
+        HttpStatus.S_201_CREATED) : response;
+
+  }
+}


Reply via email to