Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 ac95e0584 -> 1a59c3c41


APEXCORE-391 #resolve delay the creation of tmp directory until save is called


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1a59c3c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1a59c3c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1a59c3c4

Branch: refs/heads/release-3.3
Commit: 1a59c3c416b379bae6fe71b69f7c2bdb0e6871da
Parents: ac95e05
Author: David Yan <[email protected]>
Authored: Wed Mar 16 13:37:30 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Wed Mar 16 16:45:24 2016 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java            | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a59c3c4/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java 
b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 788a68c..e28c0e0 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -48,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class AsyncFSStorageAgent extends FSStorageAgent
 {
   private final transient Configuration conf;
-  private final transient String localBasePath;
+  private transient volatile String localBasePath;
 
   private boolean syncCheckpoint = false;
 
@@ -63,12 +63,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   public AsyncFSStorageAgent(String path, Configuration conf)
   {
     super(path, conf);
-    try {
-      this.localBasePath = Files.createTempDirectory("chkp").toString();
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-    logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
     this.conf = conf == null ? new Configuration() : conf;
   }
 
@@ -84,6 +78,11 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public void save(final Object object, final int operatorId, final long 
windowId) throws IOException
   {
+    // save() is only called by one thread in the worker container so the 
following is okay
+    if (this.localBasePath == null) {
+      this.localBasePath = Files.createTempDirectory("chkp").toString();
+      logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
+    }
     if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
@@ -100,6 +99,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent
 
   public void copyToHDFS(final int operatorId, final long windowId) throws 
IOException
   {
+    if (this.localBasePath == null) {
+      throw new AssertionError("save() was not called before copyToHDFS");
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     String window = Long.toHexString(windowId);

Reply via email to