This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new aabb8b97c [GOBBLIN-2092] updated all the columns during flow_spec
update (#3978)
aabb8b97c is described below
commit aabb8b97c00cc530122b1c1eb941609c86a8c0ce
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Jul 11 20:50:49 2024 +0530
[GOBBLIN-2092] updated all the columns during flow_spec update (#3978)
* updated all the columns during flow_spec update
---
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 16 +-
.../runtime/spec_store/MysqlSpecStoreTest.java | 208 +++++++++++++++++++--
2 files changed, 207 insertions(+), 17 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 3e9cd4351..0cf6c34e7 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -61,9 +61,23 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
// Historical Note: the `spec_json` column didn't always exist and was
introduced for GOBBLIN-1150; the impl. thus allows that not every
// record may contain data there... though in practice, all should, given
the passage of time (amidst the usual retention expiry).
+
+ //ON DUPLICATE KEY ... VALUES() is deprecated , use alias instead
https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
protected static final String SPECIFIC_INSERT_STATEMENT = "INSERT INTO %s
(spec_uri, flow_group, flow_name, template_uri, "
+ "user_to_proxy, source_identifier, destination_identifier, schedule,
tag, isRunImmediately, owning_group, spec, spec_json) "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY
UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS flowSpec "
+ + "ON DUPLICATE KEY UPDATE "
+ + "template_uri = flowSpec.template_uri, "
+ + "user_to_proxy = flowSpec.user_to_proxy, "
+ + "source_identifier = flowSpec.source_identifier, "
+ + "destination_identifier = flowSpec.destination_identifier, "
+ + "schedule = flowSpec.schedule, "
+ + "tag = flowSpec.tag, "
+ + "isRunImmediately = flowSpec.isRunImmediately, "
+ + "owning_group = flowSpec.owning_group, "
+ + "spec = flowSpec.spec, "
+ + "spec_json = flowSpec.spec_json";
+
private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri,
spec, spec_json FROM %s WHERE ";
private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri,
spec, spec_json, modified_time FROM %s";
private static final String SPECIFIC_GET_SPECS_BATCH_STATEMENT = "SELECT
spec_uri, spec, spec_json, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ?
OFFSET ?";
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 425f8aad7..cc45da2d4 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -27,12 +27,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
+
import org.apache.commons.lang3.ArrayUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import org.testng.internal.collections.Pair;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
@@ -65,7 +68,9 @@ public class MysqlSpecStoreTest {
private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
- private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
+ private final URI uri5 = FlowSpec.Utils.createFlowSpecUri(new
FlowId().setFlowName("fg5").setFlowGroup("fn5"));
+
+ private FlowSpec flowSpec1, flowSpec2, flowSpec3,
flowSpec4,flowSpec5,flowSpec5Updated;
public MysqlSpecStoreTest()
throws URISyntaxException { // (based on `uri1` and other
initializations just above)
@@ -130,6 +135,35 @@ public class MysqlSpecStoreTest {
.withDescription("Test flow spec 4")
.withVersion("Test version 4")
.build();
+ flowSpec5=
+ FlowSpec.builder(this.uri5)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source5")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination5")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 8 * * ?
*")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroup")
+ .addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "true")
+ .addPrimitive("user.to.proxy", "userA")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5").build())
+ .withDescription("Test flow spec 5")
+ .withVersion("Test version 5")
+ .build();
+
+ flowSpec5Updated=
+ FlowSpec.builder(this.uri5)
+ .withConfig(ConfigBuilder.create()
+ .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source5Updated")
+ .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY,
"destination5Updated")
+ .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 9 * * ?
*")
+ .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY,
"owningGroupUpdated")
+ .addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
"true")//not updated,kept same as flowSpec5
+ .addPrimitive("user.to.proxy", "userAUpdated")
+ .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+ .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5").build())
+ .withDescription("Test flow spec 5")
+ .withVersion("Test version 5")
+ .build();
}
@AfterClass(alwaysRun = true)
@@ -160,8 +194,8 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
}
- @Test (dependsOnMethods = "testAddSpec")
- public void testGetSpec() throws Exception {
+ @Test(dependsOnMethods = "testAddSpec")
+ public void testGetSpecs() throws Exception {
FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
removeModificationTimestampFromSpecs(result);
Assert.assertEquals(result, this.flowSpec1);
@@ -216,7 +250,46 @@ public class MysqlSpecStoreTest {
Assert.assertTrue(specs.contains(this.flowSpec4));
}
- @Test (dependsOnMethods = "testGetSpec")
+ @Test
+ public void testGetSpecsAfterUpdate() throws Exception {
+ // Adding a flow spec to the spec store
+ this.specStore.addSpec(this.flowSpec5);
+
+ // Retrieving the flow spec from the store
+ FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri5);
+
+ // Modifying the retrieved result (removing modification timestamp) for
equality check
+ removeModificationTimestampFromSpecs(result);
+
+ Assert.assertEquals(result, this.flowSpec5);
+
+ List<Pair<FlowSpecSearchObject, Boolean>> preUpdateValidations = new
ArrayList<>();
+ List<Pair<FlowSpecSearchObject, Boolean>> postUpdateValidations = new
ArrayList<>();
+
+ populatePreAndPostUpdateFlowSpecSearchValidations(preUpdateValidations,
postUpdateValidations);
+
+ // Validating pre-update search results against flowSpec5
+ for (Pair<FlowSpecSearchObject, Boolean> searchObjectBooleanPair :
preUpdateValidations) {
+ validateSearchResult(searchObjectBooleanPair, this.flowSpec5);
+ }
+
+ // Updating the flow spec in the spec store with flowSpec5Updated
+ this.specStore.addSpec(this.flowSpec5Updated);
+
+ // Validating post-update search results against flowSpec5Updated
+ for (Pair<FlowSpecSearchObject, Boolean> searchObjectBooleanPair :
postUpdateValidations) {
+ validateSearchResult(searchObjectBooleanPair, this.flowSpec5Updated);
+ }
+
+ //Deleting the spec
+ this.specStore.deleteSpec(this.flowSpec5Updated);
+
+ FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().flowName("fn5").flowGroup("fg5").build();
+ Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 0);
+ }
+
+ @Test(dependsOnMethods = "testGetSpecs")
public void testGetSpecWithTag() throws Exception {
//Creating and inserting flowspecs with tags
@@ -227,7 +300,8 @@ public class MysqlSpecStoreTest {
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
- .addPrimitive("key5", "value5").build())
+ .addPrimitive("key5", "value5")
+ .build())
.withDescription("Test flow spec 5")
.withVersion("Test version 5")
.build();
@@ -239,7 +313,8 @@ public class MysqlSpecStoreTest {
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
.addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
- .addPrimitive("key6", "value6").build())
+ .addPrimitive("key6", "value6")
+ .build())
.withDescription("Test flow spec 6")
.withVersion("Test version 6")
.build();
@@ -254,7 +329,7 @@ public class MysqlSpecStoreTest {
Assert.assertEquals(result.size(), 2);
}
- @Test (dependsOnMethods = "testGetSpec")
+ @Test(dependsOnMethods = "testGetSpecs")
public void testGetFilterSpecPaginate() throws Exception {
/**
* Sorted order of the specStore configurations is flowSpec1, flowSpec2.
@@ -266,7 +341,8 @@ public class MysqlSpecStoreTest {
*/
// Start of 0 and count of 1 means start from index 0, and return one
configuration only
- FlowSpecSearchObject flowSpecSearchObject =
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+ FlowSpecSearchObject flowSpecSearchObject =
+
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
Assert.assertEquals(specs.size(), 1);
Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -304,13 +380,13 @@ public class MysqlSpecStoreTest {
((FlowSpec)
spec).getConfigAsProperties().remove(FlowSpec.MODIFICATION_TIME_KEY);
}
- @Test (dependsOnMethods = "testGetSpec")
+ @Test(dependsOnMethods = "testGetSpecs")
public void testGetAllSpecPaginate() throws Exception {
/**
* Sorted order of the specStore configurations is flowSpec1, flowSpec2,
flowSpec4
*/
// Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so
return all 3 flowSpecs
- Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+ Collection<Spec> specs = this.specStore.getSpecsPaginated(0, 10);
specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 3);
Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -326,7 +402,7 @@ public class MysqlSpecStoreTest {
Assert.assertTrue(specs.contains(this.flowSpec4));
// Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only
return first two.
- specs = this.specStore.getSpecsPaginated(0,2);
+ specs = this.specStore.getSpecsPaginated(0, 2);
specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -334,11 +410,11 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(specs.contains(this.flowSpec4));
// Return 0 flowSpecs when batch size is 0.
- specs = this.specStore.getSpecsPaginated(2,0);
+ specs = this.specStore.getSpecsPaginated(2, 0);
Assert.assertEquals(specs.size(), 0);
// Return 0 flowSpecs when start offset is past the end
- specs = this.specStore.getSpecsPaginated(3,1);
+ specs = this.specStore.getSpecsPaginated(3, 1);
Assert.assertEquals(specs.size(), 0);
// Check that we throw an error for incorrect inputs
@@ -346,12 +422,12 @@ public class MysqlSpecStoreTest {
Assert.assertThrows(IllegalArgumentException.class, () ->
this.specStore.getSpecsPaginated(2, -4));
}
- @Test (expectedExceptions = {IOException.class})
+ @Test(expectedExceptions = {IOException.class})
public void testGetCorruptedSpec() throws Exception {
this.specStore.addSpec(this.flowSpec3);
}
- @Test (dependsOnMethods = "testGetSpecWithTag")
+ @Test(dependsOnMethods = "testGetSpecWithTag")
public void testDeleteSpec() throws Exception {
Assert.assertEquals(this.specStore.getSize(), 5);
this.specStore.deleteSpec(this.uri1);
@@ -359,7 +435,7 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(this.specStore.exists(this.uri1));
}
- @Test (dependsOnMethods = "testDeleteSpec")
+ @Test(dependsOnMethods = "testDeleteSpec")
public void testReadOldColumn() throws Exception {
this.oldSpecStore.addSpec(this.flowSpec1);
@@ -392,6 +468,106 @@ public class MysqlSpecStoreTest {
}
}
+ /**
+ * Validates the search result for a given FlowSpec against expected
conditions.
+ *
+ * @param searchObjectBooleanPair A Pair containing the search object and a
Boolean indicating the expected result.
+ * @param flowSpec The FlowSpec to validate against the search results.
+ * @throws IOException If there is an issue accessing data from the spec
store.
+ */
+ private void validateSearchResult(Pair<FlowSpecSearchObject, Boolean>
searchObjectBooleanPair,
+ FlowSpec flowSpec) throws IOException {
+ Collection<Spec> specs =
this.specStore.getSpecs(searchObjectBooleanPair.first());
+ Assert.assertEquals(Optional.of(specs.size() == 1).get(),
searchObjectBooleanPair.second());
+ Assert.assertEquals(Optional.of(specs.contains(flowSpec)).get(),
searchObjectBooleanPair.second());
+ }
+
+ /**
+ * Populates pre-update and post-update validations for FlowSpecs based on
various search criteria.
+ *
+ * @param preUpdateValidations List to store pairs of FlowSpecSearchObject
and expected Boolean results
+ * before updating.
+ * @param postUpdateValidations List to store pairs of FlowSpecSearchObject
and expected Boolean results
+ * after updating.
+ */
+ private void populatePreAndPostUpdateFlowSpecSearchValidations(
+ List<Pair<FlowSpecSearchObject, Boolean>> preUpdateValidations,
+ List<Pair<FlowSpecSearchObject, Boolean>> postUpdateValidations) {
+
+ FlowSpecSearchObject flowGroupFlowNameSearchOb =
+
FlowSpecSearchObject.builder().flowName("fn5").flowGroup("fg5").build();
+ preUpdateValidations.add(new Pair<>(flowGroupFlowNameSearchOb, true));
+ postUpdateValidations.add(new Pair<>(flowGroupFlowNameSearchOb, true));
+
+ FlowSpecSearchObject preUpdateSourceIdentifierSearchOb =
+ FlowSpecSearchObject.builder().sourceIdentifier("source5").build();
+ preUpdateValidations.add(new Pair<>(preUpdateSourceIdentifierSearchOb,
true));
+ postUpdateValidations.add(new Pair<>(preUpdateSourceIdentifierSearchOb,
false));
+
+ FlowSpecSearchObject postUpdateSourceIdentifierSearchOb =
+
FlowSpecSearchObject.builder().sourceIdentifier("source5Updated").build();
+ preUpdateValidations.add(new Pair<>(postUpdateSourceIdentifierSearchOb,
false));
+ postUpdateValidations.add(new Pair<>(postUpdateSourceIdentifierSearchOb,
true));
+
+ FlowSpecSearchObject preUpdateDestinationIdentifierSearchOb =
+
FlowSpecSearchObject.builder().destinationIdentifier("destination5").build();
+ preUpdateValidations.add(new
Pair<>(preUpdateDestinationIdentifierSearchOb, true));
+ postUpdateValidations.add(new
Pair<>(preUpdateDestinationIdentifierSearchOb, false));
+ FlowSpecSearchObject postUpdateDestinationIdentifierSearchOb =
+
FlowSpecSearchObject.builder().destinationIdentifier("destination5Updated").build();
+ preUpdateValidations.add(new
Pair<>(postUpdateDestinationIdentifierSearchOb, false));
+ postUpdateValidations.add(new
Pair<>(postUpdateDestinationIdentifierSearchOb, true));
+
+ //search with cronSchedule and runImmediately Filter
+ FlowSpecSearchObject preUpdateJobScheduleSearchOb =
+ FlowSpecSearchObject.builder().schedule("0 0 8 * * ?
*").isRunImmediately(true).build();
+ preUpdateValidations.add(new Pair<>(preUpdateJobScheduleSearchOb, true));
+ postUpdateValidations.add(new Pair<>(preUpdateJobScheduleSearchOb, false));
+ FlowSpecSearchObject postUpdateJobScheduleSearchOb =
+ FlowSpecSearchObject.builder().schedule("0 0 9 * * ?
*").isRunImmediately(true).build();
+ preUpdateValidations.add(new Pair<>(postUpdateJobScheduleSearchOb, false));
+ postUpdateValidations.add(new Pair<>(postUpdateJobScheduleSearchOb, true));
+
+ //search with userToProxy and owningGroup Filter
+ FlowSpecSearchObject preUpdateUserToProxySearchOb =
+
FlowSpecSearchObject.builder().userToProxy("userA").owningGroup("owningGroup").build();
+ preUpdateValidations.add(new Pair<>(preUpdateUserToProxySearchOb, true));
+ postUpdateValidations.add(new Pair<>(preUpdateUserToProxySearchOb, false));
+ FlowSpecSearchObject postUpdateUserToProxySearchOb =
+
FlowSpecSearchObject.builder().userToProxy("userAUpdated").owningGroup("owningGroupUpdated").build();
+ preUpdateValidations.add(new Pair<>(postUpdateUserToProxySearchOb, false));
+ postUpdateValidations.add(new Pair<>(postUpdateUserToProxySearchOb, true));
+
+ //Search with all the columns present in flowSpec5
+ FlowSpecSearchObject preUpdateFullSearchValidator =
FlowSpecSearchObject.builder()
+ .flowGroup("fg5")
+ .flowName("fn5")
+ .sourceIdentifier("source5")
+ .destinationIdentifier("destination5")
+ .schedule("0 0 8 * * ? *")
+ .userToProxy("userA")
+ .owningGroup("owningGroup")
+ .isRunImmediately(true)
+ .build();
+ preUpdateValidations.add(new Pair<>(preUpdateFullSearchValidator, true));
+ postUpdateValidations.add(new Pair<>(preUpdateFullSearchValidator, false));
+
+ //Search with all the columns present in flowSpec5Updated
+ FlowSpecSearchObject postUpdateFullSearchValidator =
FlowSpecSearchObject.builder()
+ .flowGroup("fg5")
+ .flowName("fn5")
+ .sourceIdentifier("source5Updated")
+ .destinationIdentifier("destination5Updated")
+ .schedule("0 0 9 * * ? *")
+ .userToProxy("userAUpdated")
+ .owningGroup("owningGroupUpdated")
+ .isRunImmediately(true)
+ .build();
+
+ postUpdateValidations.add(new Pair<>(postUpdateFullSearchValidator, true));
+ preUpdateValidations.add(new Pair<>(postUpdateFullSearchValidator, false));
+ }
+
public class TestSpecSerDe extends GsonFlowSpecSerDe {
@Override
public byte[] serialize(Spec spec) throws SpecSerDeException {