This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1a90d1a3b [GOBBLIN-1697]Have a separate resource handler to rely on
CDC stream to do message forwarding (#3549)
1a90d1a3b is described below
commit 1a90d1a3b126b39a1b03f4c4d5dbddb1171fb399
Author: Zihan Li <[email protected]>
AuthorDate: Wed Sep 21 20:13:28 2022 -0700
[GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to do
message forwarding (#3549)
* address comments
* use connectionmanager when httpclient is not cloesable
* fix test case to test orchestor as one listener of flow spec
* remove unintentional change
* [GOBBLIN-1697]Have a separate resource handler to rely on CDC stream to
do message forwarding
* fix compilation error
* address comments
* address comments
* address comments
* update outdated javadoc
Co-authored-by: Zihan Li <[email protected]>
---
.../service/FlowConfigResourceLocalHandler.java | 8 +-
.../gobblin/runtime/api/InstrumentedSpecStore.java | 9 +
.../org/apache/gobblin/runtime/api/SpecStore.java | 10 +
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 26 +-
.../runtime/spec_store/MysqlBaseSpecStore.java | 5 +
.../spec_store/MysqlSpecStoreWithUpdate.java | 89 +++++
.../spec_store/MysqlSpecStoreWithUpdateTest.java | 399 +++++++++++++++++++++
.../GobblinServiceFlowConfigResourceHandler.java | 2 +-
...FlowConfigV2ResourceHandlerWithWarmStandby.java | 127 +++++++
9 files changed, 667 insertions(+), 8 deletions(-)
diff --git
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 5837359d1..5c8dc8c41 100644
---
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -161,10 +161,14 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
return this.createFlowConfig(flowConfig, true);
}
+ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig,
boolean triggerListener) {
+ // Set the max version to be the largest value so that we blindly update
the flow spec in this case
+ return updateFlowConfig(flowId, flowConfig, triggerListener,
Long.MAX_VALUE);
+ }
/**
* Update flowConfig locally and trigger all listeners iff @param
triggerListener is set to true
*/
- public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig,
boolean triggerListener) {
+ public UpdateResponse updateFlowConfig(FlowId flowId, FlowConfig flowConfig,
boolean triggerListener, long modifiedWatermark) {
log.info("[GAAS-REST] Update called with flowGroup {} flowName {}",
flowId.getFlowGroup(), flowId.getFlowName());
if (!flowId.getFlowGroup().equals(flowConfig.getId().getFlowGroup()) ||
!flowId.getFlowName().equals(flowConfig.getId().getFlowName())) {
@@ -185,7 +189,7 @@ public class FlowConfigResourceLocalHandler implements
FlowConfigsResourceHandle
flowConfig = originalFlowConfig;
}
try {
- this.flowCatalog.put(createFlowSpecForConfig(flowConfig),
triggerListener);
+ this.flowCatalog.update(createFlowSpecForConfig(flowConfig),
triggerListener, modifiedWatermark);
} catch (QuotaExceededException e) {
throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE,
e.getMessage());
} catch (Throwable e) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index dfd643cd4..d2dfd7eb8 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -150,6 +150,11 @@ public abstract class InstrumentedSpecStore implements
SpecStore {
return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec));
}
+ @Override
+ public Spec updateSpec(Spec spec, long modifiedWatermark) throws
IOException, SpecNotFoundException {
+ return this.updateTimer.invokeMayThrowBoth(() -> updateSpecImpl(spec,
modifiedWatermark));
+ }
+
@Override
public Collection<Spec> getSpecs() throws IOException {
return this.getAllTimer.invokeMayThrowIO(() -> getSpecsImpl());
@@ -175,6 +180,10 @@ public abstract class InstrumentedSpecStore implements
SpecStore {
return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
}
+ public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws
IOException, SpecNotFoundException{
+ return updateSpecImpl(spec);
+ }
+
public abstract void addSpecImpl(Spec spec) throws IOException;
public abstract Spec updateSpecImpl(Spec spec) throws IOException,
SpecNotFoundException;
public abstract boolean existsImpl(URI specUri) throws IOException;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 3642d5a02..072ce2195 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -78,6 +78,16 @@ public interface SpecStore {
*/
Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException;
+ /***
+ * Update {@link Spec} in the {@link SpecStore} when modification time of
current entry is smaller than {@link modifiedWatermark}.
+ * @param spec {@link Spec} to be updated.
+ * @param modifiedWatermark largest modifiedWatermark that current spec
should be
+ * @throws IOException Exception in updating the {@link Spec}.
+ * @return Updated {@link Spec}.
+ * @throws SpecNotFoundException If {@link Spec} being updated is not
present in store.
+ */
+ default Spec updateSpec(Spec spec, long modifiedWatermark) throws
IOException, SpecNotFoundException {return updateSpec(spec);};
+
/***
* Retrieve the latest version of the {@link Spec} by URI from the {@link
SpecStore}.
* @param specUri URI for the {@link Spec} to be retrieved.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 6a64ae9bb..2ab61e14d 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -349,21 +349,33 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
return spec;
}
+
+ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener)
throws Throwable {
+ return updateOrAddSpecHelper(spec, triggerListener, false, Long.MAX_VALUE);
+ }
+
+ public Map<String, AddSpecResponse> update(Spec spec, boolean
triggerListener, long modifiedWatermark) throws Throwable {
+ return updateOrAddSpecHelper(spec, triggerListener, true,
modifiedWatermark);
+ }
+
/**
* Persist {@link Spec} into {@link SpecStore} and notify {@link
SpecCatalogListener} if triggerListener
* is set to true.
* If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be
compiled at the time this method received
* the spec. `explain` specs are not persisted. The logic of this method is
tightly coupled with the logic of
- * {@link GobblinServiceJobScheduler#onAddSpec()}, which is one of the
listener of {@link FlowCatalog}.
+ * {@link GobblinServiceJobScheduler#onAddSpec()} or {@link
Orchestrator#onAddSpec()} in warm standby mode,
+ * which is one of the listener of {@link FlowCatalog}.
* We use condition variables {@link #specSyncObjects} to achieve
synchronization between
* {@link GobblinServiceJobScheduler#NonScheduledJobRunner} thread and this
thread to ensure deletion of
* {@link FlowSpec} happens after the corresponding run once flow is
submitted to the orchestrator.
*
* @param spec The Spec to be added
* @param triggerListener True if listeners should be notified.
+ * @param isUpdate Whether this is update or add operation, it will call
different method in spec store to persist the spec
+ * @param modifiedWatermark If it's update operation, the largest
modifiedWatermark that it can modify, or in other word, the timestamp which old
spec should be modified before
* @return a map of listeners and their {@link AddSpecResponse}s
*/
- public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener)
throws Throwable {
+ private Map<String, AddSpecResponse> updateOrAddSpecHelper(Spec spec,
boolean triggerListener, boolean isUpdate, long modifiedWatermark) throws
Throwable {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is
not running.", this.getClass().getName()));
@@ -384,7 +396,6 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>>
entry : response.getValue().getFailures().entrySet()) {
throw entry.getValue().getError().getCause();
}
- return responseMap;
}
}
AddSpecResponse<String> compileResponse;
@@ -402,12 +413,17 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
try {
if (!flowSpec.isExplain()) {
long startTime = System.currentTimeMillis();
- specStore.addSpec(spec);
+ if (isUpdate) {
+ specStore.updateSpec(spec, modifiedWatermark);
+ } else {
+ specStore.addSpec(spec);
+ }
metrics.updatePutSpecTime(startTime);
}
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new
AddSpecResponse<>("true"));
} catch (IOException e) {
- throw new RuntimeException("Cannot add Spec to Spec store: " +
flowSpec, e);
+ String operation = isUpdate ? "update" : "add";
+ throw new RuntimeException("Cannot " + operation + " Spec to Spec
store: " + flowSpec, e);
} finally {
syncObject.notifyAll();
this.specSyncObjects.remove(flowSpec.getUri().toString());
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index df3d65912..f5d6dcebf 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -75,6 +75,9 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore
{
private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE spec_uri = ?)";
protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri,
tag, spec) "
+ "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+ // Keep previous syntax that update only update spec and spec_json
+ //todo: do we need change this behavior so that everything can be updated
+ protected static final String UPDATE_STATEMENT = "UPDATE %s SET
spec=?,spec_json=? WHERE spec_uri=? AND UNIX_TIMESTAMP(modified_time) < ?";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
spec_uri = ?";
private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM
%s WHERE ";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM
%s";
@@ -91,6 +94,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore
{
* between statements, collect them within this inner class that enables
selective, per-statement override, and delivers them as a unit.
*/
protected class SqlStatements {
+ public final String updateStatement =
String.format(getTablelessUpdateStatement(), MysqlBaseSpecStore.this.tableName);
public final String existsStatement =
String.format(getTablelessExistsStatement(), MysqlBaseSpecStore.this.tableName);
public final String insertStatement =
String.format(getTablelessInsertStatement(), MysqlBaseSpecStore.this.tableName);
public final String deleteStatement =
String.format(getTablelessDeleteStatement(), MysqlBaseSpecStore.this.tableName);
@@ -115,6 +119,7 @@ public class MysqlBaseSpecStore extends
InstrumentedSpecStore {
}
protected String getTablelessExistsStatement() { return
MysqlBaseSpecStore.EXISTS_STATEMENT; }
+ protected String getTablelessUpdateStatement() { return
MysqlBaseSpecStore.UPDATE_STATEMENT; }
protected String getTablelessInsertStatement() { return
MysqlBaseSpecStore.INSERT_STATEMENT; }
protected String getTablelessDeleteStatement() { return
MysqlBaseSpecStore.DELETE_STATEMENT; }
protected String getTablelessGetStatementBase() { return
MysqlBaseSpecStore.GET_STATEMENT_BASE; }
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
new file mode 100644
index 000000000..158b99659
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+public class MysqlSpecStoreWithUpdate extends MysqlSpecStore{
+ // In this case, when we try to insert but key is existed, we will throw
exception
+ protected static final String INSERT_STATEMENT_WITHOUT_UPDATE = "INSERT INTO
%s (spec_uri, flow_group, flow_name, template_uri, "
+ + "user_to_proxy, source_identifier, destination_identifier, schedule,
tag, isRunImmediately, owning_group, spec, spec_json) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ public MysqlSpecStoreWithUpdate(Config config, SpecSerDe specSerDe) throws
IOException {
+ super(config, specSerDe);
+ }
+
+ /** Bundle all changes following from schema differences against the base
class. */
+ protected class SpecificSqlStatementsWithUpdate extends
SpecificSqlStatements {
+ public void completeUpdatePreparedStatement(PreparedStatement statement,
Spec spec, long modifiedWatermark) throws
+
SQLException {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ URI specUri = flowSpec.getUri();
+
+ int i = 0;
+
+ statement.setBlob(++i, new
ByteArrayInputStream(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec)));
+ statement.setString(++i, new
String(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec),
Charsets.UTF_8));
+ statement.setString(++i, specUri.toString());
+ statement.setLong(++i, modifiedWatermark);
+ }
+
+ @Override
+ protected String getTablelessInsertStatement() { return
INSERT_STATEMENT_WITHOUT_UPDATE; }
+ }
+
+ @Override
+ protected SqlStatements createSqlStatements() {
+ return new SpecificSqlStatementsWithUpdate();
+ }
+
+ @Override
+ // TODO: fix to obey the `SpecStore` contract of returning the *updated*
`Spec`
+ public Spec updateSpecImpl(Spec spec) throws IOException {
+ updateSpecImpl(spec, Long.MAX_VALUE);
+ return spec;
+ }
+
+ @Override
+ // TODO: fix to obey the `SpecStore` contract of returning the *updated*
`Spec`
+ // Update {@link Spec} in the {@link SpecStore} when current modification
time is smaller than {@link modifiedWatermark}.
+ public Spec updateSpecImpl(Spec spec, long modifiedWatermark) throws
IOException {
+ withPreparedStatement(this.sqlStatements.updateStatement, statement -> {
+
((SpecificSqlStatementsWithUpdate)this.sqlStatements).completeUpdatePreparedStatement(statement,
spec, modifiedWatermark);
+ int i = statement.executeUpdate();
+ if (i == 0) {
+ throw new IOException("Spec does not exist or concurrent update
happens, please check current spec and update again");
+ }
+ return null; // (type: `Void`)
+ }, true);
+ return spec;
+ }
+
+
+
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
new file mode 100644
index 000000000..621fa3771
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
+import org.apache.gobblin.service.FlowId;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static
org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+
+
+public class MysqlSpecStoreWithUpdateTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "spec_store";
+
+ private MysqlSpecStoreWithUpdate specStore;
+ private MysqlSpecStore oldSpecStore;
+ private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+ private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+ private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+ private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
+ private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4,
flowSpec4_update;
+
+ public MysqlSpecStoreWithUpdateTest()
+ throws URISyntaxException { // (based on `uri1` and other
initializations just above)
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.specStore = new MysqlSpecStoreWithUpdate(config, new TestSpecSerDe());
+ this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
+
+ flowSpec1 = FlowSpec.builder(this.uri1)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key", "value")
+ .addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive("config.with.dot", "value4")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn1").build())
+ .withDescription("Test flow spec")
+ .withVersion("Test version")
+ .build();
+ flowSpec2 = FlowSpec.builder(this.uri2)
+ .withConfig(ConfigBuilder.create().addPrimitive("converter",
"value1,value2,value3")
+ .addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn2").build())
+ .withDescription("Test flow spec 2")
+ .withVersion("Test version 2")
+ .build();
+ flowSpec3 = FlowSpec.builder(this.uri3)
+ .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn3").build())
+ .withDescription("Test flow spec 3")
+ .withVersion("Test version 3")
+ .build();
+
+ flowSpec4 = FlowSpec.builder(this.uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
+ flowSpec4_update = FlowSpec.builder(this.uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4",
"value4_update")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testSpecSearch() throws Exception {
+ // empty FlowSpecSearchObject should throw an error
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().build();
+ flowSpecSearchObject.augmentBaseGetStatement("SELECT * FROM Dummy WHERE ");
+ }
+
+ @Test
+ public void testAddSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ this.specStore.addSpec(this.flowSpec2);
+ this.specStore.addSpec(this.flowSpec4);
+
+ Assert.assertEquals(this.specStore.getSize(), 3);
+ Assert.assertTrue(this.specStore.exists(this.uri1));
+ Assert.assertTrue(this.specStore.exists(this.uri2));
+ Assert.assertTrue(this.specStore.exists(this.uri4));
+ Assert.expectThrows(Exception.class, () ->
this.specStore.addSpec(this.flowSpec1));
+ Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+ }
+
+ @Test (dependsOnMethods = "testAddSpec")
+ public void testGetSpec() throws Exception {
+ FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(result, this.flowSpec1);
+
+ Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ Iterator<URI> uris = this.specStore.getSpecURIs();
+ Assert.assertTrue(Iterators.contains(uris, this.uri1));
+ Assert.assertTrue(Iterators.contains(uris, this.uri2));
+
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().flowGroup("fg1").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().flowName("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().flowName("fg1").flowGroup("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 0);
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("key=value").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("converter=value2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("key3").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("config.with.dot=value4").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().owningGroup("owningGroup4").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetSpecWithTag() throws Exception {
+
+ //Creating and inserting flowspecs with tags
+ URI uri5 = URI.create("flowspec5");
+ FlowSpec flowSpec5 = FlowSpec.builder(uri5)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
+ .addPrimitive("key5", "value5").build())
+ .withDescription("Test flow spec 5")
+ .withVersion("Test version 5")
+ .build();
+
+ URI uri6 = URI.create("flowspec6");
+ FlowSpec flowSpec6 = FlowSpec.builder(uri6)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
+ .addPrimitive("key6", "value6").build())
+ .withDescription("Test flow spec 6")
+ .withVersion("Test version 6")
+ .build();
+
+ this.specStore.addSpec(flowSpec5, "dr");
+ this.specStore.addSpec(flowSpec6, "dr");
+
+ Assert.assertTrue(this.specStore.exists(uri5));
+ Assert.assertTrue(this.specStore.exists(uri6));
+ List<URI> result = new ArrayList<>();
+ this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+ Assert.assertEquals(result.size(), 2);
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetFilterSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
+ * flowSpec3 is not included as it is a 'corrupted' flowspec
+ * flowSpec4 is not included as it doesn't have the 'filter.this.flow'
property
+ * Start is the offset of the first configuration to return
+ * Count is the total number of configurations to return
+ * PropertyFilter is the property to filter by
+ */
+
+ // Start of 0 and count of 1 means start from index 0, and return one
configuration only
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+ Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertFalse(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ // Start of 1 and count of 1 means start from index 1, and return one
configuration only
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(1).count(1).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertFalse(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ /**
+ * Start of 0 and count of 5 means start from index 0, and return five
configuration only
+ * Total of 3 flowSpecs in the DB, but flowSpec4 doesn't have
'filter.this.flow' filter so only returns 2 flowSpecs
+ * flowSpec1 and flowSpec2 match all the criteria
+ */
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(5).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testUpdate() throws Exception{
+ long version = System.currentTimeMillis() /1000;
+ this.specStore.updateSpec(this.flowSpec4_update);
+ Assert.assertEquals(((FlowSpec) this.specStore.getSpec(this.uri4)),
flowSpec4_update);
+ Assert.expectThrows(IOException.class, () ->
this.specStore.updateSpec(flowSpec4, version));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetAllSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2,
flowSpec4
+ */
+ // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so
return all 3 flowSpecs
+ Collection<Spec> specs = this.specStore.getSpecs(0,10);
+ for (Spec spec: specs) {
+ System.out.println("test" + spec.getUri());
+ }
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs using the default get all specs function. Testing
default functionality of returning everything
+ specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return
first two.
+ specs = this.specStore.getSpecs(0,2);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return
first two.
+ // Check that functionality for not including a start value is the same as
including start value of 0
+ specs = this.specStore.getSpecs(-1, 2);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+ }
+
+ @Test (expectedExceptions = {IOException.class})
+ public void testGetCorruptedSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec3);
+ }
+
+ @Test (dependsOnMethods = "testGetSpecWithTag")
+ public void testDeleteSpec() throws Exception {
+ Assert.assertEquals(this.specStore.getSize(), 5);
+ this.specStore.deleteSpec(this.uri1);
+ Assert.assertEquals(this.specStore.getSize(), 4);
+ Assert.assertFalse(this.specStore.exists(this.uri1));
+ }
+
+ @Test (dependsOnMethods = "testDeleteSpec")
+ public void testReadOldColumn() throws Exception {
+ this.oldSpecStore.addSpec(this.flowSpec1);
+
+ FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(spec, this.flowSpec1);
+ }
+
+ /**
+ * A {@link MysqlSpecStore} which does not write into the new spec_json
column
+ * to simulate behavior of a table with old data.
+ */
+ public static class OldSpecStore extends MysqlSpecStore {
+
+ public OldSpecStore(Config config, SpecSerDe specSerDe) throws IOException
{
+ super(config, specSerDe);
+ }
+
+ @Override
+ public void addSpec(Spec spec, String tagValue) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(this.sqlStatements.insertStatement)) {
+ this.sqlStatements.completeInsertPreparedStatement(statement, spec,
tagValue);
+ statement.setString(4, null);
+ statement.executeUpdate();
+ connection.commit();
+ } catch (SQLException | SpecSerDeException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public class TestSpecSerDe extends GsonFlowSpecSerDe {
+ @Override
+ public byte[] serialize(Spec spec) throws SpecSerDeException {
+ byte[] bytes = super.serialize(spec);
+ // Reverse bytes to simulate corrupted Spec
+ if (spec.getUri().equals(uri3)) {
+ ArrayUtils.reverse(bytes);
+ }
+ return bytes;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 8045ae171..f0db8b582 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -63,7 +63,7 @@ public class GobblinServiceFlowConfigResourceHandler
implements FlowConfigsResou
@Getter
private String serviceName;
private boolean flowCatalogLocalCommit;
- private FlowConfigResourceLocalHandler localHandler;
+ protected FlowConfigResourceLocalHandler localHandler;
private Optional<HelixManager> helixManager;
private GobblinServiceJobScheduler jobScheduler;
private boolean forceLeader;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
new file mode 100644
index 000000000..7bfd93a4c
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import com.google.common.base.Optional;
+import com.linkedin.data.transform.DataProcessingException;
+import com.linkedin.restli.common.ComplexResourceKey;
+import com.linkedin.restli.common.EmptyRecord;
+import com.linkedin.restli.common.HttpStatus;
+import com.linkedin.restli.common.PatchRequest;
+import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.util.PatchApplier;
+import java.util.Properties;
+import javax.inject.Inject;
+import javax.inject.Named;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowConfig;
+import org.apache.gobblin.service.FlowConfigLoggedException;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.runtime.util.InjectionNames;
+import org.apache.helix.HelixManager;
+
+@Slf4j
+public class GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby extends
GobblinServiceFlowConfigV2ResourceHandler {
+ @Inject
+ public
GobblinServiceFlowConfigV2ResourceHandlerWithWarmStandby(@Named(InjectionNames.SERVICE_NAME)
String serviceName,
+ @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean
flowCatalogLocalCommit,
+ FlowConfigV2ResourceLocalHandler handler, Optional<HelixManager>
manager, GobblinServiceJobScheduler jobScheduler,
+ @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
+ super(serviceName, flowCatalogLocalCommit, handler, manager, jobScheduler,
forceLeader);
+ }
+
+
+ @Override
+ public UpdateResponse deleteFlowConfig(FlowId flowId, Properties header)
+ throws FlowConfigLoggedException {
+ return this.localHandler.deleteFlowConfig(flowId, header);
+ }
+
+ @Override
+ public UpdateResponse partialUpdateFlowConfig(FlowId flowId,
+ PatchRequest<FlowConfig> flowConfigPatch) throws
FlowConfigLoggedException {
+ long modifiedWatermark = System.currentTimeMillis() / 1000;
+ FlowConfig flowConfig = getFlowConfig(flowId);
+
+ try {
+ PatchApplier.applyPatch(flowConfig, flowConfigPatch);
+ } catch (DataProcessingException e) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
"Failed to apply partial update", e);
+ }
+
+ return updateFlowConfig(flowId, flowConfig, modifiedWatermark);
+ }
+
+ @Override
+ public UpdateResponse updateFlowConfig(FlowId flowId,
+ FlowConfig flowConfig) throws FlowConfigLoggedException {
+ // We have modifiedWatermark here to avoid update config happens at the
same time on different hosts overwrite each other
+ // timestamp here will be treated as largest modifiedWatermark that we can
update
+ long version = System.currentTimeMillis() / 1000;
+ return updateFlowConfig(flowId, flowConfig, version);
+ }
+ public UpdateResponse updateFlowConfig(FlowId flowId,
+ FlowConfig flowConfig, long modifiedWatermark) throws
FlowConfigLoggedException {
+ String flowName = flowId.getFlowName();
+ String flowGroup = flowId.getFlowGroup();
+
+ if (!flowGroup.equals(flowConfig.getId().getFlowGroup()) ||
!flowName.equals(flowConfig.getId().getFlowName())) {
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+ "flowName and flowGroup cannot be changed in update", null);
+ }
+
+ // We directly call localHandler to create flow config and put it in
spec store
+
+ //Instead of helix message, forwarding message is done by change stream
of spec store
+
+ return this.localHandler.updateFlowConfig(flowId, flowConfig, true,
modifiedWatermark);
+ }
+ /**
+ * Adding {@link FlowConfig} call {@link
FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
+ * no matter it's active or standby, rely on the CDC stream for spec store
to forward the change to other hosts
+ *
+ */
+ @Override
+ public CreateResponse createFlowConfig(FlowConfig flowConfig)
+ throws FlowConfigLoggedException {
+
+ if
(flowConfig.getProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
{
+ throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST,
+ String.format("%s cannot be set by the user",
ConfigurationKeys.FLOW_EXECUTION_ID_KEY), null);
+ }
+
+
+ CreateResponse response = null;
+ // We directly call localHandler to create flow config and put it in spec
store
+ response = this.localHandler.createFlowConfig(flowConfig, true);
+
+ //Instead of helix message, forwarding message is done by change stream of
spec store
+
+ // Do actual work on remote node, directly return success
+
+ return response == null ? new CreateResponse(new
ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()),
+ HttpStatus.S_201_CREATED) : response;
+
+ }
+}