This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2304f11  [GOBBLIN-1002] Set state id when deserializing state from 
Gobblin state store[]
2304f11 is described below

commit 2304f11e033febd4ab3389937ef8ff48b27e99ed
Author: sv2000 <[email protected]>
AuthorDate: Tue Dec 10 15:56:13 2019 -0800

    [GOBBLIN-1002] Set state id when deserializing state from Gobblin state 
store[]
    
    Closes #2847 from sv2000/checkPointableWatermark
---
 .../src/main/java/org/apache/gobblin/metastore/FsStateStore.java | 2 ++
 .../main/java/org/apache/gobblin/metastore/MysqlStateStore.java  | 9 +++++----
 .../test/java/org/apache/gobblin/metastore/FsStateStoreTest.java | 9 +++++++++
 .../src/main/java/org/apache/gobblin/metastore/ZkStateStore.java | 1 +
 .../java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java | 4 ++++
 .../org/apache/gobblin/runtime/CheckpointableWatermarkState.java | 4 ++++
 .../org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java   | 8 +++++++-
 7 files changed, 32 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 34c0b8f..4ea9491 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -238,6 +238,7 @@ public class FsStateStore<T extends State> implements 
StateStore<T> {
         while (reader.next(key)) {
           state = (T)reader.getCurrentValue(state);
           if (key.toString().equals(stateId)) {
+            state.setId(stateId);
             return state;
           }
         }
@@ -272,6 +273,7 @@ public class FsStateStore<T extends State> implements 
StateStore<T> {
         T state = this.stateClass.newInstance();
         while (reader.next(key)) {
           state = (T)reader.getCurrentValue(state);
+          state.setId(key.toString());
           states.add(state);
           // We need a new object for each read state
           state = this.stateClass.newInstance();
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index a833a1d..de84f52 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -47,6 +47,8 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import javax.sql.DataSource;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
@@ -56,8 +58,6 @@ import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.io.StreamUtils;
 
-import javax.sql.DataSource;
-
 /**
  * An implementation of {@link StateStore} backed by MySQL.
  *
@@ -312,7 +312,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
 
               key.readFields(dis);
               state.readFields(dis);
-
+              state.setId(key.toString());
               if (key.toString().equals(stateId)) {
                 return state;
               }
@@ -395,8 +395,9 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
           // keep deserializing while we have data
           while (dis.available() > 0) {
             T state = this.stateClass.newInstance();
-            key.readString(dis);
+            String stateId = key.readString(dis);
             state.readFields(dis);
+            state.setId(stateId);
             states.add(state);
           }
         } catch (EOFException e) {
diff --git 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
index 70160c8..2f53c51 100644
--- 
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
+++ 
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
@@ -103,8 +103,11 @@ public class FsStateStoreTest {
     Assert.assertEquals(states.size(), 3);
 
     Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+    Assert.assertEquals(states.get(0).getId(),  "s1");
     Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+    Assert.assertEquals(states.get(1).getId(),  "s2");
     Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+    Assert.assertEquals(states.get(2).getId(),  "s3");
   }
 
   @Test(dependsOnMethods = { "testPut" })
@@ -119,8 +122,11 @@ public class FsStateStoreTest {
     Assert.assertEquals(states.size(), 3);
 
     Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+    Assert.assertEquals(states.get(0).getId(),  "s1");
     Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+    Assert.assertEquals(states.get(1).getId(),  "s2");
     Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+    Assert.assertEquals(states.get(2).getId(),  "s3");
   }
 
   @Test(dependsOnMethods = { "testGetAlias" })
@@ -152,8 +158,11 @@ public class FsStateStoreTest {
     Assert.assertEquals(states.size(), 3);
 
     Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+    Assert.assertEquals(states.get(0).getId(),  "s1");
     Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+    Assert.assertEquals(states.get(1).getId(),  "s2");
     Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+    Assert.assertEquals(states.get(2).getId(),  "s3");
   }
 
   @AfterClass
diff --git 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index c09c42a..e383060 100644
--- 
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++ 
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -322,6 +322,7 @@ public class ZkStateStore<T extends State> implements 
StateStore<T> {
 
           key.readFields(dis);
           state.readFields(dis);
+          state.setId(key.toString());
           states.add(state);
 
           if (stateId != null && key.toString().equals(stateId)) {
diff --git 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 2ee5f1d..71e5cbc 100644
--- 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++ 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -121,6 +121,7 @@ public class ZkDatasetStateStoreTest {
         zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
         TEST_JOB_ID);
 
+    Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
@@ -184,6 +185,7 @@ public class ZkDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getStartTime(), this.startTime);
     Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
     Assert.assertEquals(datasetState.getDuration(), 1000);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
 
     Assert.assertEquals(datasetState.getCompletedTasks(), 3);
     for (int i = 0; i < datasetState.getCompletedTasks(); i++) {
@@ -219,6 +221,7 @@ public class ZkDatasetStateStoreTest {
     Assert.assertEquals(datasetStatesByUrns.size(), 2);
 
     JobState.DatasetState datasetState = 
datasetStatesByUrns.get(TEST_DATASET_URN);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -228,6 +231,7 @@ public class ZkDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getDuration(), 1000);
 
     datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN2);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
index f7ce9d7..791f1b3 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
@@ -39,6 +39,10 @@ public class CheckpointableWatermarkState extends State {
     super.setId(watermark.getSource());
   }
 
+  public String getSource() {
+    return getId();
+  }
+
   /**
    * Needed for reflection based construction.
    */
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index e8ea281..956a875 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -141,6 +141,7 @@ public class MysqlDatasetStateStoreTest {
         dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
         TEST_JOB_ID);
 
+    Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getProp("foo"), "bar");
@@ -163,6 +164,7 @@ public class MysqlDatasetStateStoreTest {
         dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
         TEST_JOB_ID);
 
+    Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME_LOWER);
     Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
     Assert.assertEquals(jobState.getProp("foo"), "bar");
@@ -200,7 +202,7 @@ public class MysqlDatasetStateStoreTest {
 
     // persist a colliding lowercase dataset state to test that retrieval is 
case sensitive
     datasetState.setDatasetUrn(TEST_DATASET_URN_LOWER);
-    datasetState.setId(TEST_DATASET_URN_LOWER );
+    datasetState.setId(TEST_DATASET_URN_LOWER);
     datasetState.setDuration(3000);
 
     dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER, 
datasetState);
@@ -215,6 +217,7 @@ public class MysqlDatasetStateStoreTest {
     JobState.DatasetState datasetState =
         dbDatasetStateStore.getLatestDatasetState(TEST_JOB_NAME, 
TEST_DATASET_URN);
 
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -259,6 +262,7 @@ public class MysqlDatasetStateStoreTest {
     Assert.assertEquals(datasetStatesByUrns.size(), 3);
 
     JobState.DatasetState datasetState = 
datasetStatesByUrns.get(TEST_DATASET_URN);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -268,6 +272,7 @@ public class MysqlDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getDuration(), 1000);
 
     datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN2);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -277,6 +282,7 @@ public class MysqlDatasetStateStoreTest {
     Assert.assertEquals(datasetState.getDuration(), 2000);
 
     datasetState = datasetStatesByUrns.get(TEST_DATASET_URN_LOWER);
+    Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN_LOWER);
     Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN_LOWER);
     Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
     Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);

Reply via email to