[GOBBLIN-392] Load all dataset states when getLatestDatasetStatesByUrns() is called
Closes #2268 from htran1/fix_dataset_state_fetch Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c35f76e4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c35f76e4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c35f76e4 Branch: refs/heads/0.12.0 Commit: c35f76e4e2a0f9f25580924d46cef1b732af7d63 Parents: 378ccaa Author: Hung Tran <[email protected]> Authored: Mon Jan 29 17:08:46 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Jan 29 17:08:46 2018 -0800 ---------------------------------------------------------------------- .../gobblin/runtime/ZkDatasetStateStore.java | 4 ++-- .../gobblin/runtime/ZkDatasetStateStoreTest.java | 19 ++++++++++++++++++- .../gobblin/runtime/MysqlDatasetStateStore.java | 4 ++-- .../runtime/MysqlDatasetStateStoreTest.java | 19 ++++++++++++++++++- 4 files changed, 40 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java index dbde3fc..e9ecf35 100644 --- a/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java +++ b/gobblin-modules/gobblin-helix/src/main/java/org/apache/gobblin/runtime/ZkDatasetStateStore.java @@ -65,8 +65,8 @@ public class ZkDatasetStateStore extends ZkStateStore<JobState.DatasetState> }}); Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap(); - if (!previousDatasetStates.isEmpty()) { - JobState.DatasetState previousDatasetState = previousDatasetStates.get(0); + + for (JobState.DatasetState previousDatasetState : previousDatasetStates) { datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java index 742aa98..1091cf7 100644 --- a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java +++ b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/ZkDatasetStateStoreTest.java @@ -44,6 +44,7 @@ public class ZkDatasetStateStoreTest { private static final String TEST_JOB_ID = "TestJob1"; private static final String TEST_TASK_ID_PREFIX = "TestTask-"; private static final String TEST_DATASET_URN = "TestDataset"; + private static final String TEST_DATASET_URN2 = "TestDataset2"; private TestingServer testingServer; private StateStore<JobState> zkJobStateStore; @@ -142,6 +143,13 @@ public class ZkDatasetStateStoreTest { } zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState); + + // persist a second dataset state to test that retrieval of multiple dataset states works + datasetState.setDatasetUrn(TEST_DATASET_URN2); + datasetState.setId(TEST_DATASET_URN2); + datasetState.setDuration(2000); + + zkDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); } @Test(dependsOnMethods = "testPersistDatasetState") @@ -171,7 +179,7 @@ public class ZkDatasetStateStoreTest { public void testGetPreviousDatasetStatesByUrns() throws IOException { Map<String, JobState.DatasetState> datasetStatesByUrns = zkDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME); - Assert.assertEquals(datasetStatesByUrns.size(), 1); + Assert.assertEquals(datasetStatesByUrns.size(), 2); JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN); Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN); @@ -181,6 +189,15 @@ public class ZkDatasetStateStoreTest { Assert.assertEquals(datasetState.getStartTime(), this.startTime); Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000); Assert.assertEquals(datasetState.getDuration(), 1000); + + datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2); + Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2); + Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME); + Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID); + Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); + Assert.assertEquals(datasetState.getStartTime(), this.startTime); + Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000); + Assert.assertEquals(datasetState.getDuration(), 2000); } @Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns") http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java index 400e52c..741ac07 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java @@ -69,8 +69,8 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat getAll(jobName, "%" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX, true); Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap(); - if (!previousDatasetStates.isEmpty()) { - JobState.DatasetState previousDatasetState = previousDatasetStates.get(0); + + for (JobState.DatasetState previousDatasetState : previousDatasetStates) { datasetStatesByUrns.put(previousDatasetState.getDatasetUrn(), previousDatasetState); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c35f76e4/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java index 9c35610..86ba8ba 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/MysqlDatasetStateStoreTest.java @@ -46,6 +46,7 @@ public class MysqlDatasetStateStoreTest { private static final String TEST_JOB_ID = "TestJob1"; private static final String TEST_TASK_ID_PREFIX = "TestTask-"; private static final String TEST_DATASET_URN = "TestDataset"; + private static final String TEST_DATASET_URN2 = "TestDataset2"; private StateStore<JobState> dbJobStateStore; private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore; @@ -154,6 +155,13 @@ public class MysqlDatasetStateStoreTest { } dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState); + + // persist a second dataset state to test that retrieval of multiple dataset states works + datasetState.setDatasetUrn(TEST_DATASET_URN2); + datasetState.setId(TEST_DATASET_URN2); + datasetState.setDuration(2000); + + dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); } @Test(dependsOnMethods = "testPersistDatasetState") @@ -183,7 +191,7 @@ public class MysqlDatasetStateStoreTest { public void testGetPreviousDatasetStatesByUrns() throws IOException { Map<String, JobState.DatasetState> datasetStatesByUrns = dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME); - Assert.assertEquals(datasetStatesByUrns.size(), 1); + Assert.assertEquals(datasetStatesByUrns.size(), 2); JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN); Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN); @@ -193,6 +201,15 @@ public class MysqlDatasetStateStoreTest { Assert.assertEquals(datasetState.getStartTime(), this.startTime); Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000); Assert.assertEquals(datasetState.getDuration(), 1000); + + datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2); + Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2); + Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME); + Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID); + Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED); + Assert.assertEquals(datasetState.getStartTime(), this.startTime); + Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000); + Assert.assertEquals(datasetState.getDuration(), 2000); } @Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
