[
https://issues.apache.org/jira/browse/GOBBLIN-1697?focusedWorklogId=807622&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-807622
]
ASF GitHub Bot logged work on GOBBLIN-1697:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 10/Sep/22 00:28
Start Date: 10/Sep/22 00:28
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #3549:
URL: https://github.com/apache/gobblin/pull/3549#discussion_r967547782
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdate.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.base.Charsets;
+import com.typesafe.config.Config;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+public class MysqlSpecStoreWithUpdate extends MysqlSpecStore{
+ // In this case, when we try to insert but key is existed, we will throw
exception
+ protected static final String INSERT_STATEMENT_WITHOUT_UPDATE = "INSERT INTO
%s (spec_uri, flow_group, flow_name, template_uri, "
+ + "user_to_proxy, source_identifier, destination_identifier, schedule,
tag, isRunImmediately, owning_group, spec, spec_json) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+ public MysqlSpecStoreWithUpdate(Config config, SpecSerDe specSerDe) throws
IOException {
+ super(config, specSerDe);
+ }
+
+ /** Bundle all changes following from schema differences against the base
class. */
+ protected class SpecificSqlStatementsWithUpdate extends
SpecificSqlStatements {
+ public void completeUpdatePreparedStatement(PreparedStatement statement,
Spec spec, long version) throws
+
SQLException {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ URI specUri = flowSpec.getUri();
+
+ int i = 0;
+
+ statement.setBlob(++i, new
ByteArrayInputStream(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec)));
+ statement.setString(++i, new
String(MysqlSpecStoreWithUpdate.this.specSerDe.serialize(flowSpec),
Charsets.UTF_8));
+ statement.setString(++i, specUri.toString());
+ statement.setLong(++i, version);
+ }
+
+ @Override
+ protected String getTablelessInsertStatement() { return
INSERT_STATEMENT_WITHOUT_UPDATE; }
+ }
+
+ @Override
+ protected SqlStatements createSqlStatements() {
+ return new SpecificSqlStatementsWithUpdate();
+ }
+
+ @Override
+ // TODO: fix to obey the `SpecStore` contract of returning the *updated*
`Spec`
+ public Spec updateSpecImpl(Spec spec) throws IOException {
+ updateSpecImpl(spec, Long.MAX_VALUE);
+ return spec;
+ }
+
+ @Override
+ // TODO: fix to obey the `SpecStore` contract of returning the *updated*
`Spec`
+ public Spec updateSpecImpl(Spec spec, long version) throws IOException {
+ withPreparedStatement(this.sqlStatements.updateStatement, statement -> {
+
((SpecificSqlStatementsWithUpdate)this.sqlStatements).completeUpdatePreparedStatement(statement,
spec, version);
Review Comment:
is the last value supposed to be `timestamp` or `version`? In the insert
statement it looks like it is expecting a `timestamp`, maybe we can make name
consistent or add a comment that says timestamp is also acting as versioning.
##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.spec_serde.GsonFlowSpecSerDe;
+import org.apache.gobblin.service.FlowId;
+
+import static
org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static
org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+
+
+public class MysqlSpecStoreWithUpdateTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "spec_store";
+
+ private MysqlSpecStoreWithUpdate specStore;
+ private MysqlSpecStore oldSpecStore;
+ private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+ private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+ private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+ private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
+ private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4,
flowSpec4_update;
+
+ public MysqlSpecStoreWithUpdateTest()
+ throws URISyntaxException { // (based on `uri1` and other
initializations just above)
+ }
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.specStore = new MysqlSpecStoreWithUpdate(config, new TestSpecSerDe());
+ this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
+
+ flowSpec1 = FlowSpec.builder(this.uri1)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive("key", "value")
+ .addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive("config.with.dot", "value4")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn1").build())
+ .withDescription("Test flow spec")
+ .withVersion("Test version")
+ .build();
+ flowSpec2 = FlowSpec.builder(this.uri2)
+ .withConfig(ConfigBuilder.create().addPrimitive("converter",
"value1,value2,value3")
+ .addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn2").build())
+ .withDescription("Test flow spec 2")
+ .withVersion("Test version 2")
+ .build();
+ flowSpec3 = FlowSpec.builder(this.uri3)
+ .withConfig(ConfigBuilder.create().addPrimitive("key3", "value3")
+ .addPrimitive("filter.this.flow", true)
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg3")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn3").build())
+ .withDescription("Test flow spec 3")
+ .withVersion("Test version 3")
+ .build();
+
+ flowSpec4 = FlowSpec.builder(this.uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4", "value4")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
+ flowSpec4_update = FlowSpec.builder(this.uri4)
+ .withConfig(ConfigBuilder.create().addPrimitive("key4",
"value4_update")
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg4")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn4")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup4").build())
+ .withDescription("Test flow spec 4")
+ .withVersion("Test version 4")
+ .build();
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void testSpecSearch() throws Exception {
+ // empty FlowSpecSearchObject should throw an error
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().build();
+ flowSpecSearchObject.augmentBaseGetStatement("SELECT * FROM Dummy WHERE ");
+ }
+
+ @Test
+ public void testAddSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ this.specStore.addSpec(this.flowSpec2);
+ this.specStore.addSpec(this.flowSpec4);
+
+ Assert.assertEquals(this.specStore.getSize(), 3);
+ Assert.assertTrue(this.specStore.exists(this.uri1));
+ Assert.assertTrue(this.specStore.exists(this.uri2));
+ Assert.assertTrue(this.specStore.exists(this.uri4));
+ Assert.expectThrows(Exception.class, () ->
this.specStore.addSpec(this.flowSpec1));
+ Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+ }
+
+ @Test (dependsOnMethods = "testAddSpec")
+ public void testGetSpec() throws Exception {
+ FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(result, this.flowSpec1);
+
+ Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ Iterator<URI> uris = this.specStore.getSpecURIs();
+ Assert.assertTrue(Iterators.contains(uris, this.uri1));
+ Assert.assertTrue(Iterators.contains(uris, this.uri2));
+
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().flowGroup("fg1").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().flowName("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().flowName("fg1").flowGroup("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 0);
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("key=value").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("converter=value2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("key3").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().propertyFilter("config.with.dot=value4").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().owningGroup("owningGroup4").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetSpecWithTag() throws Exception {
+
+ //Creating and inserting flowspecs with tags
+ URI uri5 = URI.create("flowspec5");
+ FlowSpec flowSpec5 = FlowSpec.builder(uri5)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
+ .addPrimitive("key5", "value5").build())
+ .withDescription("Test flow spec 5")
+ .withVersion("Test version 5")
+ .build();
+
+ URI uri6 = URI.create("flowspec6");
+ FlowSpec flowSpec6 = FlowSpec.builder(uri6)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
+ .addPrimitive("key6", "value6").build())
+ .withDescription("Test flow spec 6")
+ .withVersion("Test version 6")
+ .build();
+
+ this.specStore.addSpec(flowSpec5, "dr");
+ this.specStore.addSpec(flowSpec6, "dr");
+
+ Assert.assertTrue(this.specStore.exists(uri5));
+ Assert.assertTrue(this.specStore.exists(uri6));
+ List<URI> result = new ArrayList<>();
+ this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+ Assert.assertEquals(result.size(), 2);
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetFilterSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
+ * flowSpec3 is not included as it is a 'corrupted' flowspec
+ * flowSpec4 is not included as it doesn't have the 'filter.this.flow'
property
+ * Start is the offset of the first configuration to return
+ * Count is the total number of configurations to return
+ * PropertyFilter is the property to filter by
+ */
+
+ // Start of 0 and count of 1 means start from index 0, and return one
configuration only
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+ Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertFalse(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ // Start of 1 and count of 1 means start from index 1, and return one
configuration only
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(1).count(1).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertFalse(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+
+ /**
+ * Start of 0 and count of 5 means start from index 0, and return five
configuration only
+ * Total of 3 flowSpecs in the DB, but flowSpec4 doesn't have
'filter.this.flow' filter so only returns 2 flowSpecs
+ * flowSpec1 and flowSpec2 match all the criteria
+ */
+ flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(5).propertyFilter("filter.this.flow").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec4));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testUpdate() throws Exception{
+ long version = System.currentTimeMillis() /1000;
+ this.specStore.updateSpec(this.flowSpec4_update);
+ Assert.assertEquals(((FlowSpec) this.specStore.getSpec(this.uri4)),
flowSpec4_update);
+ Assert.expectThrows(IOException.class, () ->
this.specStore.updateSpec(flowSpec4, version));
+ }
+
+ @Test (dependsOnMethods = "testGetSpec")
+ public void testGetAllSpecPaginate() throws Exception {
+ /**
+ * Sorted order of the specStore configurations is flowSpec1, flowSpec2,
flowSpec4
+ */
+ // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so
return all 3 flowSpecs
+ Collection<Spec> specs = this.specStore.getSpecs(0,10);
+ for (Spec spec: specs) {
+ System.out.println("test" + spec.getUri());
+ }
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs using the default get all specs function. Testing
default functionality of returning everything
+ specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 3);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertTrue(specs.contains(this.flowSpec4));
+
+ // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
Review Comment:
the index specification is a bit confusing here, are you sayign `2-1`
because the end is not inclusive?
Issue Time Tracking
-------------------
Worklog Id: (was: 807622)
Time Spent: 0.5h (was: 20m)
> Have a separate resource handler to rely on CDC stream to do message
> forwarding
> -------------------------------------------------------------------------------
>
> Key: GOBBLIN-1697
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1697
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Zihan Li
> Priority: Major
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)