Repository: incubator-gobblin Updated Branches: refs/heads/master 578b4f950 -> 29c76c9e7
[GOBBLIN-637] Create dag checkpoint dir on initialization to avoid NPE.[] Closes #2507 from sv2000/dagStateStoreNpe Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/29c76c9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/29c76c9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/29c76c9e Branch: refs/heads/master Commit: 29c76c9e7fc87271219d3fe5be8f3f9790742614 Parents: 578b4f9 Author: suvasude <[email protected]> Authored: Mon Nov 26 09:55:11 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Nov 26 09:55:11 2018 -0800 ---------------------------------------------------------------------- .../modules/orchestration/FSDagStateStore.java | 19 ++++++++++--------- .../orchestration/FSDagStateStoreTest.java | 8 ++++---- 2 files changed, 14 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29c76c9e/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java index 0720210..e647fed 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java @@ -57,8 +57,15 @@ public class FSDagStateStore implements DagStateStore { private final String dagCheckpointDir; private final Gson gson; - public FSDagStateStore(Config config) { + public FSDagStateStore(Config config) throws IOException { this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR); + File checkpointDir = new File(this.dagCheckpointDir); + if (!checkpointDir.exists()) { + if (!checkpointDir.mkdirs()) { + throw new IOException("Could not create dag state store dir - " + this.dagCheckpointDir); + } + } + JsonSerializer<List<JobExecutionPlan>> serializer = new JobExecutionPlanListSerializer(); JsonDeserializer<List<JobExecutionPlan>> deserializer = new JobExecutionPlanListDeserializer(); this.gson = new GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer) @@ -75,13 +82,6 @@ public class FSDagStateStore implements DagStateStore { String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION; String serializedDag = serializeDag(dag); - File checkpointDir = new File(this.dagCheckpointDir); - if (!checkpointDir.exists()) { - if (!checkpointDir.mkdirs()) { - throw new IOException("Could not create dir - " + this.dagCheckpointDir); - } - } - File tmpCheckpointFile = new File(this.dagCheckpointDir, fileName + ".tmp"); File checkpointFile = new File(this.dagCheckpointDir, fileName); @@ -110,8 +110,9 @@ public class FSDagStateStore implements DagStateStore { public List<Dag<JobExecutionPlan>> getDags() throws IOException { List<Dag<JobExecutionPlan>> runningDags = Lists.newArrayList(); File dagCheckpointFolder = new File(this.dagCheckpointDir); + for (File file : dagCheckpointFolder.listFiles((dir, name) -> name.endsWith(DAG_FILE_EXTENSION))) { - runningDags.add(getDag(file)); + runningDags.add(getDag(file)); } return runningDags; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29c76c9e/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java index 05d800d..073d091 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java @@ -52,11 +52,11 @@ public class FSDagStateStoreTest { @BeforeClass public void setUp() throws IOException { + this.checkpointDir = new File(dagStateStoreDir); + FileUtils.deleteDirectory(this.checkpointDir); Config config = ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef( this.dagStateStoreDir)); this._dagStateStore = new FSDagStateStore(config); - this.checkpointDir = new File(dagStateStoreDir); - FileUtils.deleteDirectory(this.checkpointDir); } /** @@ -128,8 +128,8 @@ public class FSDagStateStoreTest { @Test (dependsOnMethods = "testCleanUp") public void testGetDags() throws IOException, URISyntaxException { - //Delete dag checkpoint dir - FileUtils.deleteDirectory(this.checkpointDir); + //Set up a new FSDagStateStore instance. + setUp(); List<Long> flowExecutionIds = Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1); for (int i = 0; i < 2; i++) { String flowGroupId = Integer.toString(i);
