This is an automated email from the ASF dual-hosted git repository.

arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new aabb8b97c [GOBBLIN-2092] updated all the columns during flow_spec 
update (#3978)
aabb8b97c is described below

commit aabb8b97c00cc530122b1c1eb941609c86a8c0ce
Author: pratapaditya04 <[email protected]>
AuthorDate: Thu Jul 11 20:50:49 2024 +0530

    [GOBBLIN-2092] updated all the columns during flow_spec update (#3978)
    
    * updated all the columns during flow_spec update
---
 .../gobblin/runtime/spec_store/MysqlSpecStore.java |  16 +-
 .../runtime/spec_store/MysqlSpecStoreTest.java     | 208 +++++++++++++++++++--
 2 files changed, 207 insertions(+), 17 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 3e9cd4351..0cf6c34e7 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -61,9 +61,23 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
 
   // Historical Note: the `spec_json` column didn't always exist and was 
introduced for GOBBLIN-1150; the impl. thus allows that not every
   // record may contain data there... though in practice, all should, given 
the passage of time (amidst the usual retention expiry).
+
+  //ON DUPLICATE KEY ... VALUES() is deprecated , use alias instead 
https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html
   protected static final String SPECIFIC_INSERT_STATEMENT = "INSERT INTO %s 
(spec_uri, flow_group, flow_name, template_uri, "
       + "user_to_proxy, source_identifier, destination_identifier, schedule, 
tag, isRunImmediately, owning_group, spec, spec_json) "
-      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY 
UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
+      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) AS flowSpec "
+      + "ON DUPLICATE KEY UPDATE "
+      + "template_uri = flowSpec.template_uri, "
+      + "user_to_proxy = flowSpec.user_to_proxy, "
+      + "source_identifier = flowSpec.source_identifier, "
+      + "destination_identifier = flowSpec.destination_identifier, "
+      + "schedule = flowSpec.schedule, "
+      + "tag = flowSpec.tag, "
+      + "isRunImmediately = flowSpec.isRunImmediately, "
+      + "owning_group = flowSpec.owning_group, "
+      + "spec = flowSpec.spec, "
+      + "spec_json = flowSpec.spec_json";
+
   private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri, 
spec, spec_json FROM %s WHERE ";
   private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, 
spec, spec_json, modified_time FROM %s";
   private static final String SPECIFIC_GET_SPECS_BATCH_STATEMENT = "SELECT 
spec_uri, spec, spec_json, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ? 
OFFSET ?";
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 425f8aad7..cc45da2d4 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -27,12 +27,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
+
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
+import org.testng.internal.collections.Pair;
 
 import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
@@ -65,7 +68,9 @@ public class MysqlSpecStoreTest {
   private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg2").setFlowGroup("fn2"));
   private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg3").setFlowGroup("fn3"));
   private final URI uri4 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg4").setFlowGroup("fn4"));
-  private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
+  private final URI uri5 = FlowSpec.Utils.createFlowSpecUri(new 
FlowId().setFlowName("fg5").setFlowGroup("fn5"));
+
+  private FlowSpec flowSpec1, flowSpec2, flowSpec3, 
flowSpec4,flowSpec5,flowSpec5Updated;
 
   public MysqlSpecStoreTest()
       throws URISyntaxException { // (based on `uri1` and other 
initializations just above)
@@ -130,6 +135,35 @@ public class MysqlSpecStoreTest {
         .withDescription("Test flow spec 4")
         .withVersion("Test version 4")
         .build();
+    flowSpec5=
+        FlowSpec.builder(this.uri5)
+            .withConfig(ConfigBuilder.create()
+                .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source5")
+                .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination5")
+                .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 8 * * ? 
*")
+                .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
"owningGroup")
+                .addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "true")
+                .addPrimitive("user.to.proxy", "userA")
+                .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+                .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5").build())
+            .withDescription("Test flow spec 5")
+            .withVersion("Test version 5")
+            .build();
+
+    flowSpec5Updated=
+        FlowSpec.builder(this.uri5)
+            .withConfig(ConfigBuilder.create()
+                .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source5Updated")
+                .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, 
"destination5Updated")
+                .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 0 9 * * ? 
*")
+                .addPrimitive(ConfigurationKeys.FLOW_OWNING_GROUP_KEY, 
"owningGroupUpdated")
+                .addPrimitive(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, 
"true")//not updated,kept same as flowSpec5
+                .addPrimitive("user.to.proxy", "userAUpdated")
+                .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
+                .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5").build())
+            .withDescription("Test flow spec 5")
+            .withVersion("Test version 5")
+            .build();
   }
 
   @AfterClass(alwaysRun = true)
@@ -160,8 +194,8 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
   }
 
-  @Test (dependsOnMethods = "testAddSpec")
-  public void testGetSpec() throws Exception {
+  @Test(dependsOnMethods = "testAddSpec")
+  public void testGetSpecs() throws Exception {
     FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
     removeModificationTimestampFromSpecs(result);
     Assert.assertEquals(result, this.flowSpec1);
@@ -216,7 +250,46 @@ public class MysqlSpecStoreTest {
     Assert.assertTrue(specs.contains(this.flowSpec4));
   }
 
-  @Test  (dependsOnMethods = "testGetSpec")
+  @Test
+  public void testGetSpecsAfterUpdate() throws Exception {
+    // Adding a flow spec to the spec store
+    this.specStore.addSpec(this.flowSpec5);
+
+    // Retrieving the flow spec from the store
+    FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri5);
+
+    // Modifying the retrieved result (removing modification timestamp) for 
equality check
+    removeModificationTimestampFromSpecs(result);
+
+    Assert.assertEquals(result, this.flowSpec5);
+
+    List<Pair<FlowSpecSearchObject, Boolean>> preUpdateValidations = new 
ArrayList<>();
+    List<Pair<FlowSpecSearchObject, Boolean>> postUpdateValidations = new 
ArrayList<>();
+
+    populatePreAndPostUpdateFlowSpecSearchValidations(preUpdateValidations, 
postUpdateValidations);
+
+    // Validating pre-update search results against flowSpec5
+    for (Pair<FlowSpecSearchObject, Boolean> searchObjectBooleanPair : 
preUpdateValidations) {
+      validateSearchResult(searchObjectBooleanPair, this.flowSpec5);
+    }
+
+    // Updating the flow spec in the spec store with flowSpec5Updated
+    this.specStore.addSpec(this.flowSpec5Updated);
+
+    // Validating post-update search results against flowSpec5Updated
+    for (Pair<FlowSpecSearchObject, Boolean> searchObjectBooleanPair : 
postUpdateValidations) {
+      validateSearchResult(searchObjectBooleanPair, this.flowSpec5Updated);
+    }
+
+    //Deleting the spec
+    this.specStore.deleteSpec(this.flowSpec5Updated);
+
+    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().flowName("fn5").flowGroup("fg5").build();
+    Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test(dependsOnMethods = "testGetSpecs")
   public void testGetSpecWithTag() throws Exception {
 
     //Creating and inserting flowspecs with tags
@@ -227,7 +300,8 @@ public class MysqlSpecStoreTest {
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg5")
             .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn5")
-            .addPrimitive("key5", "value5").build())
+            .addPrimitive("key5", "value5")
+            .build())
         .withDescription("Test flow spec 5")
         .withVersion("Test version 5")
         .build();
@@ -239,7 +313,8 @@ public class MysqlSpecStoreTest {
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg6")
             .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "fn6")
-            .addPrimitive("key6", "value6").build())
+            .addPrimitive("key6", "value6")
+            .build())
         .withDescription("Test flow spec 6")
         .withVersion("Test version 6")
         .build();
@@ -254,7 +329,7 @@ public class MysqlSpecStoreTest {
     Assert.assertEquals(result.size(), 2);
   }
 
-  @Test (dependsOnMethods = "testGetSpec")
+  @Test(dependsOnMethods = "testGetSpecs")
   public void testGetFilterSpecPaginate() throws Exception {
     /**
      * Sorted order of the specStore configurations is flowSpec1, flowSpec2.
@@ -266,7 +341,8 @@ public class MysqlSpecStoreTest {
      */
 
     // Start of 0 and count of 1 means start from index 0, and return one 
configuration only
-    FlowSpecSearchObject flowSpecSearchObject = 
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
+    FlowSpecSearchObject flowSpecSearchObject =
+        
FlowSpecSearchObject.builder().start(0).count(1).propertyFilter("filter.this.flow").build();
     Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
     Assert.assertEquals(specs.size(), 1);
     Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -304,13 +380,13 @@ public class MysqlSpecStoreTest {
     ((FlowSpec) 
spec).getConfigAsProperties().remove(FlowSpec.MODIFICATION_TIME_KEY);
   }
 
-  @Test (dependsOnMethods =  "testGetSpec")
+  @Test(dependsOnMethods = "testGetSpecs")
   public void testGetAllSpecPaginate() throws Exception {
     /**
      * Sorted order of the specStore configurations is flowSpec1, flowSpec2, 
flowSpec4
      */
     // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so 
return all 3 flowSpecs
-    Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+    Collection<Spec> specs = this.specStore.getSpecsPaginated(0, 10);
     specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 3);
     Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -326,7 +402,7 @@ public class MysqlSpecStoreTest {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only 
return first two.
-    specs = this.specStore.getSpecsPaginated(0,2);
+    specs = this.specStore.getSpecsPaginated(0, 2);
     specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
@@ -334,11 +410,11 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
     // Return 0 flowSpecs when batch size is 0.
-    specs = this.specStore.getSpecsPaginated(2,0);
+    specs = this.specStore.getSpecsPaginated(2, 0);
     Assert.assertEquals(specs.size(), 0);
 
     // Return 0 flowSpecs when start offset is past the end
-    specs = this.specStore.getSpecsPaginated(3,1);
+    specs = this.specStore.getSpecsPaginated(3, 1);
     Assert.assertEquals(specs.size(), 0);
 
     // Check that we throw an error for incorrect inputs
@@ -346,12 +422,12 @@ public class MysqlSpecStoreTest {
     Assert.assertThrows(IllegalArgumentException.class, () -> 
this.specStore.getSpecsPaginated(2, -4));
   }
 
-  @Test (expectedExceptions = {IOException.class})
+  @Test(expectedExceptions = {IOException.class})
   public void testGetCorruptedSpec() throws Exception {
     this.specStore.addSpec(this.flowSpec3);
   }
 
-  @Test (dependsOnMethods = "testGetSpecWithTag")
+  @Test(dependsOnMethods = "testGetSpecWithTag")
   public void testDeleteSpec() throws Exception {
     Assert.assertEquals(this.specStore.getSize(), 5);
     this.specStore.deleteSpec(this.uri1);
@@ -359,7 +435,7 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(this.specStore.exists(this.uri1));
   }
 
-  @Test (dependsOnMethods = "testDeleteSpec")
+  @Test(dependsOnMethods = "testDeleteSpec")
   public void testReadOldColumn() throws Exception {
     this.oldSpecStore.addSpec(this.flowSpec1);
 
@@ -392,6 +468,106 @@ public class MysqlSpecStoreTest {
     }
   }
 
+  /**
+   * Validates the search result for a given FlowSpec against expected 
conditions.
+   *
+   * @param searchObjectBooleanPair A Pair containing the search object and a 
Boolean indicating the expected result.
+   * @param flowSpec The FlowSpec to validate against the search results.
+   * @throws IOException If there is an issue accessing data from the spec 
store.
+   */
+  private void validateSearchResult(Pair<FlowSpecSearchObject, Boolean> 
searchObjectBooleanPair,
+      FlowSpec flowSpec) throws IOException {
+    Collection<Spec> specs = 
this.specStore.getSpecs(searchObjectBooleanPair.first());
+    Assert.assertEquals(Optional.of(specs.size() == 1).get(), 
searchObjectBooleanPair.second());
+    Assert.assertEquals(Optional.of(specs.contains(flowSpec)).get(), 
searchObjectBooleanPair.second());
+  }
+
+  /**
+   * Populates pre-update and post-update validations for FlowSpecs based on 
various search criteria.
+   *
+   * @param preUpdateValidations  List to store pairs of FlowSpecSearchObject 
and expected Boolean results
+   *                              before updating.
+   * @param postUpdateValidations List to store pairs of FlowSpecSearchObject 
and expected Boolean results
+   *                              after updating.
+   */
+  private void populatePreAndPostUpdateFlowSpecSearchValidations(
+      List<Pair<FlowSpecSearchObject, Boolean>> preUpdateValidations,
+      List<Pair<FlowSpecSearchObject, Boolean>> postUpdateValidations) {
+
+    FlowSpecSearchObject flowGroupFlowNameSearchOb =
+        
FlowSpecSearchObject.builder().flowName("fn5").flowGroup("fg5").build();
+    preUpdateValidations.add(new Pair<>(flowGroupFlowNameSearchOb, true));
+    postUpdateValidations.add(new Pair<>(flowGroupFlowNameSearchOb, true));
+
+    FlowSpecSearchObject preUpdateSourceIdentifierSearchOb =
+        FlowSpecSearchObject.builder().sourceIdentifier("source5").build();
+    preUpdateValidations.add(new Pair<>(preUpdateSourceIdentifierSearchOb, 
true));
+    postUpdateValidations.add(new Pair<>(preUpdateSourceIdentifierSearchOb, 
false));
+
+    FlowSpecSearchObject postUpdateSourceIdentifierSearchOb =
+        
FlowSpecSearchObject.builder().sourceIdentifier("source5Updated").build();
+    preUpdateValidations.add(new Pair<>(postUpdateSourceIdentifierSearchOb, 
false));
+    postUpdateValidations.add(new Pair<>(postUpdateSourceIdentifierSearchOb, 
true));
+
+    FlowSpecSearchObject preUpdateDestinationIdentifierSearchOb =
+        
FlowSpecSearchObject.builder().destinationIdentifier("destination5").build();
+    preUpdateValidations.add(new 
Pair<>(preUpdateDestinationIdentifierSearchOb, true));
+    postUpdateValidations.add(new 
Pair<>(preUpdateDestinationIdentifierSearchOb, false));
+    FlowSpecSearchObject postUpdateDestinationIdentifierSearchOb =
+        
FlowSpecSearchObject.builder().destinationIdentifier("destination5Updated").build();
+    preUpdateValidations.add(new 
Pair<>(postUpdateDestinationIdentifierSearchOb, false));
+    postUpdateValidations.add(new 
Pair<>(postUpdateDestinationIdentifierSearchOb, true));
+
+    //search with cronSchedule and runImmediately Filter
+    FlowSpecSearchObject preUpdateJobScheduleSearchOb =
+        FlowSpecSearchObject.builder().schedule("0 0 8 * * ? 
*").isRunImmediately(true).build();
+    preUpdateValidations.add(new Pair<>(preUpdateJobScheduleSearchOb, true));
+    postUpdateValidations.add(new Pair<>(preUpdateJobScheduleSearchOb, false));
+    FlowSpecSearchObject postUpdateJobScheduleSearchOb =
+        FlowSpecSearchObject.builder().schedule("0 0 9 * * ? 
*").isRunImmediately(true).build();
+    preUpdateValidations.add(new Pair<>(postUpdateJobScheduleSearchOb, false));
+    postUpdateValidations.add(new Pair<>(postUpdateJobScheduleSearchOb, true));
+
+    //search with userToProxy and owningGroup Filter
+    FlowSpecSearchObject preUpdateUserToProxySearchOb =
+        
FlowSpecSearchObject.builder().userToProxy("userA").owningGroup("owningGroup").build();
+    preUpdateValidations.add(new Pair<>(preUpdateUserToProxySearchOb, true));
+    postUpdateValidations.add(new Pair<>(preUpdateUserToProxySearchOb, false));
+    FlowSpecSearchObject postUpdateUserToProxySearchOb =
+        
FlowSpecSearchObject.builder().userToProxy("userAUpdated").owningGroup("owningGroupUpdated").build();
+    preUpdateValidations.add(new Pair<>(postUpdateUserToProxySearchOb, false));
+    postUpdateValidations.add(new Pair<>(postUpdateUserToProxySearchOb, true));
+
+    //Search with all the columns present in flowSpec5
+    FlowSpecSearchObject preUpdateFullSearchValidator = 
FlowSpecSearchObject.builder()
+        .flowGroup("fg5")
+        .flowName("fn5")
+        .sourceIdentifier("source5")
+        .destinationIdentifier("destination5")
+        .schedule("0 0 8 * * ? *")
+        .userToProxy("userA")
+        .owningGroup("owningGroup")
+        .isRunImmediately(true)
+        .build();
+    preUpdateValidations.add(new Pair<>(preUpdateFullSearchValidator, true));
+    postUpdateValidations.add(new Pair<>(preUpdateFullSearchValidator, false));
+
+    //Search with all the columns present in flowSpec5Updated
+    FlowSpecSearchObject postUpdateFullSearchValidator = 
FlowSpecSearchObject.builder()
+        .flowGroup("fg5")
+        .flowName("fn5")
+        .sourceIdentifier("source5Updated")
+        .destinationIdentifier("destination5Updated")
+        .schedule("0 0 9 * * ? *")
+        .userToProxy("userAUpdated")
+        .owningGroup("owningGroupUpdated")
+        .isRunImmediately(true)
+        .build();
+
+    postUpdateValidations.add(new Pair<>(postUpdateFullSearchValidator, true));
+    preUpdateValidations.add(new Pair<>(postUpdateFullSearchValidator, false));
+  }
+
   public class TestSpecSerDe extends GsonFlowSpecSerDe {
     @Override
     public byte[] serialize(Spec spec) throws SpecSerDeException {

Reply via email to