This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2304f11 [GOBBLIN-1002] Set state id when deserializing state from
Gobblin state store[]
2304f11 is described below
commit 2304f11e033febd4ab3389937ef8ff48b27e99ed
Author: sv2000 <[email protected]>
AuthorDate: Tue Dec 10 15:56:13 2019 -0800
[GOBBLIN-1002] Set state id when deserializing state from Gobblin state
store[]
Closes #2847 from sv2000/checkPointableWatermark
---
.../src/main/java/org/apache/gobblin/metastore/FsStateStore.java | 2 ++
.../main/java/org/apache/gobblin/metastore/MysqlStateStore.java | 9 +++++----
.../test/java/org/apache/gobblin/metastore/FsStateStoreTest.java | 9 +++++++++
.../src/main/java/org/apache/gobblin/metastore/ZkStateStore.java | 1 +
.../java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java | 4 ++++
.../org/apache/gobblin/runtime/CheckpointableWatermarkState.java | 4 ++++
.../org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java | 8 +++++++-
7 files changed, 32 insertions(+), 5 deletions(-)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
index 34c0b8f..4ea9491 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/FsStateStore.java
@@ -238,6 +238,7 @@ public class FsStateStore<T extends State> implements
StateStore<T> {
while (reader.next(key)) {
state = (T)reader.getCurrentValue(state);
if (key.toString().equals(stateId)) {
+ state.setId(stateId);
return state;
}
}
@@ -272,6 +273,7 @@ public class FsStateStore<T extends State> implements
StateStore<T> {
T state = this.stateClass.newInstance();
while (reader.next(key)) {
state = (T)reader.getCurrentValue(state);
+ state.setId(key.toString());
states.add(state);
// We need a new object for each read state
state = this.stateClass.newInstance();
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index a833a1d..de84f52 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -47,6 +47,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import javax.sql.DataSource;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
@@ -56,8 +58,6 @@ import org.apache.gobblin.password.PasswordManager;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.io.StreamUtils;
-import javax.sql.DataSource;
-
/**
* An implementation of {@link StateStore} backed by MySQL.
*
@@ -312,7 +312,7 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
key.readFields(dis);
state.readFields(dis);
-
+ state.setId(key.toString());
if (key.toString().equals(stateId)) {
return state;
}
@@ -395,8 +395,9 @@ public class MysqlStateStore<T extends State> implements
StateStore<T> {
// keep deserializing while we have data
while (dis.available() > 0) {
T state = this.stateClass.newInstance();
- key.readString(dis);
+ String stateId = key.readString(dis);
state.readFields(dis);
+ state.setId(stateId);
states.add(state);
}
} catch (EOFException e) {
diff --git
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
index 70160c8..2f53c51 100644
---
a/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
+++
b/gobblin-metastore/src/test/java/org/apache/gobblin/metastore/FsStateStoreTest.java
@@ -103,8 +103,11 @@ public class FsStateStoreTest {
Assert.assertEquals(states.size(), 3);
Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+ Assert.assertEquals(states.get(0).getId(), "s1");
Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+ Assert.assertEquals(states.get(1).getId(), "s2");
Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+ Assert.assertEquals(states.get(2).getId(), "s3");
}
@Test(dependsOnMethods = { "testPut" })
@@ -119,8 +122,11 @@ public class FsStateStoreTest {
Assert.assertEquals(states.size(), 3);
Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+ Assert.assertEquals(states.get(0).getId(), "s1");
Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+ Assert.assertEquals(states.get(1).getId(), "s2");
Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+ Assert.assertEquals(states.get(2).getId(), "s3");
}
@Test(dependsOnMethods = { "testGetAlias" })
@@ -152,8 +158,11 @@ public class FsStateStoreTest {
Assert.assertEquals(states.size(), 3);
Assert.assertEquals(states.get(0).getProp("k1"), "v1");
+ Assert.assertEquals(states.get(0).getId(), "s1");
Assert.assertEquals(states.get(1).getProp("k2"), "v2");
+ Assert.assertEquals(states.get(1).getId(), "s2");
Assert.assertEquals(states.get(2).getProp("k3"), "v3");
+ Assert.assertEquals(states.get(2).getId(), "s3");
}
@AfterClass
diff --git
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
index c09c42a..e383060 100644
---
a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
+++
b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/metastore/ZkStateStore.java
@@ -322,6 +322,7 @@ public class ZkStateStore<T extends State> implements
StateStore<T> {
key.readFields(dis);
state.readFields(dis);
+ state.setId(key.toString());
states.add(state);
if (stateId != null && key.toString().equals(stateId)) {
diff --git
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
index 2ee5f1d..71e5cbc 100644
---
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
+++
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java
@@ -121,6 +121,7 @@ public class ZkDatasetStateStoreTest {
zkDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
zkDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
+ Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
@@ -184,6 +185,7 @@ public class ZkDatasetStateStoreTest {
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 1000);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getCompletedTasks(), 3);
for (int i = 0; i < datasetState.getCompletedTasks(); i++) {
@@ -219,6 +221,7 @@ public class ZkDatasetStateStoreTest {
Assert.assertEquals(datasetStatesByUrns.size(), 2);
JobState.DatasetState datasetState =
datasetStatesByUrns.get(TEST_DATASET_URN);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -228,6 +231,7 @@ public class ZkDatasetStateStoreTest {
Assert.assertEquals(datasetState.getDuration(), 1000);
datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
index f7ce9d7..791f1b3 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CheckpointableWatermarkState.java
@@ -39,6 +39,10 @@ public class CheckpointableWatermarkState extends State {
super.setId(watermark.getSource());
}
+ public String getSource() {
+ return getId();
+ }
+
/**
* Needed for reflection based construction.
*/
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
index e8ea281..956a875 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java
@@ -141,6 +141,7 @@ public class MysqlDatasetStateStoreTest {
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
+ Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getProp("foo"), "bar");
@@ -163,6 +164,7 @@ public class MysqlDatasetStateStoreTest {
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
+ Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME_LOWER);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getProp("foo"), "bar");
@@ -200,7 +202,7 @@ public class MysqlDatasetStateStoreTest {
// persist a colliding lowercase dataset state to test that retrieval is
case sensitive
datasetState.setDatasetUrn(TEST_DATASET_URN_LOWER);
- datasetState.setId(TEST_DATASET_URN_LOWER );
+ datasetState.setId(TEST_DATASET_URN_LOWER);
datasetState.setDuration(3000);
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER,
datasetState);
@@ -215,6 +217,7 @@ public class MysqlDatasetStateStoreTest {
JobState.DatasetState datasetState =
dbDatasetStateStore.getLatestDatasetState(TEST_JOB_NAME,
TEST_DATASET_URN);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -259,6 +262,7 @@ public class MysqlDatasetStateStoreTest {
Assert.assertEquals(datasetStatesByUrns.size(), 3);
JobState.DatasetState datasetState =
datasetStatesByUrns.get(TEST_DATASET_URN);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -268,6 +272,7 @@ public class MysqlDatasetStateStoreTest {
Assert.assertEquals(datasetState.getDuration(), 1000);
datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
@@ -277,6 +282,7 @@ public class MysqlDatasetStateStoreTest {
Assert.assertEquals(datasetState.getDuration(), 2000);
datasetState = datasetStatesByUrns.get(TEST_DATASET_URN_LOWER);
+ Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN_LOWER);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN_LOWER);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);