Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 578b4f950 -> 29c76c9e7


[GOBBLIN-637] Create dag checkpoint dir on initialization to avoid NPE.[]

Closes #2507 from sv2000/dagStateStoreNpe


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/29c76c9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/29c76c9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/29c76c9e

Branch: refs/heads/master
Commit: 29c76c9e7fc87271219d3fe5be8f3f9790742614
Parents: 578b4f9
Author: suvasude <[email protected]>
Authored: Mon Nov 26 09:55:11 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Nov 26 09:55:11 2018 -0800

----------------------------------------------------------------------
 .../modules/orchestration/FSDagStateStore.java   | 19 ++++++++++---------
 .../orchestration/FSDagStateStoreTest.java       |  8 ++++----
 2 files changed, 14 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29c76c9e/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 0720210..e647fed 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -57,8 +57,15 @@ public class FSDagStateStore implements DagStateStore {
   private final String dagCheckpointDir;
   private final Gson gson;
 
-  public FSDagStateStore(Config config) {
+  public FSDagStateStore(Config config) throws IOException {
     this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
+    File checkpointDir = new File(this.dagCheckpointDir);
+    if (!checkpointDir.exists()) {
+      if (!checkpointDir.mkdirs()) {
+        throw new IOException("Could not create dag state store dir - " + 
this.dagCheckpointDir);
+      }
+    }
+
     JsonSerializer<List<JobExecutionPlan>> serializer = new 
JobExecutionPlanListSerializer();
     JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer();
     this.gson = new 
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
@@ -75,13 +82,6 @@ public class FSDagStateStore implements DagStateStore {
     String fileName = DagManagerUtils.generateDagId(dag) + DAG_FILE_EXTENSION;
     String serializedDag = serializeDag(dag);
 
-    File checkpointDir = new File(this.dagCheckpointDir);
-    if (!checkpointDir.exists()) {
-      if (!checkpointDir.mkdirs()) {
-        throw new IOException("Could not create dir - " + 
this.dagCheckpointDir);
-      }
-    }
-
     File tmpCheckpointFile = new File(this.dagCheckpointDir, fileName + 
".tmp");
     File checkpointFile = new File(this.dagCheckpointDir, fileName);
 
@@ -110,8 +110,9 @@ public class FSDagStateStore implements DagStateStore {
   public List<Dag<JobExecutionPlan>> getDags() throws IOException {
     List<Dag<JobExecutionPlan>> runningDags = Lists.newArrayList();
     File dagCheckpointFolder = new File(this.dagCheckpointDir);
+
     for (File file : dagCheckpointFolder.listFiles((dir, name) -> 
name.endsWith(DAG_FILE_EXTENSION))) {
-        runningDags.add(getDag(file));
+      runningDags.add(getDag(file));
     }
     return runningDags;
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/29c76c9e/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
index 05d800d..073d091 100644
--- 
a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
+++ 
b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStoreTest.java
@@ -52,11 +52,11 @@ public class FSDagStateStoreTest {
 
   @BeforeClass
   public void setUp() throws IOException {
+    this.checkpointDir = new File(dagStateStoreDir);
+    FileUtils.deleteDirectory(this.checkpointDir);
     Config config = 
ConfigFactory.empty().withValue(DagManager.DAG_STATESTORE_DIR, 
ConfigValueFactory.fromAnyRef(
         this.dagStateStoreDir));
     this._dagStateStore = new FSDagStateStore(config);
-    this.checkpointDir = new File(dagStateStoreDir);
-    FileUtils.deleteDirectory(this.checkpointDir);
   }
 
   /**
@@ -128,8 +128,8 @@ public class FSDagStateStoreTest {
 
   @Test (dependsOnMethods = "testCleanUp")
   public void testGetDags() throws IOException, URISyntaxException {
-    //Delete dag checkpoint dir
-    FileUtils.deleteDirectory(this.checkpointDir);
+    //Set up a new FSDagStateStore instance.
+    setUp();
     List<Long> flowExecutionIds = 
Lists.newArrayList(System.currentTimeMillis(), System.currentTimeMillis() + 1);
     for (int i = 0; i < 2; i++) {
       String flowGroupId = Integer.toString(i);

Reply via email to