Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.2 031b5785b -> e4eadadab


APEXCORE-391 #resolve delay the creation of tmp directory until save is called

Conflicts:
        
common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/e4eadada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/e4eadada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/e4eadada

Branch: refs/heads/release-3.2
Commit: e4eadadab395ea7403033e5a401f769f7f34a166
Parents: 031b578
Author: David Yan <[email protected]>
Authored: Wed Mar 16 13:37:30 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Wed Mar 16 16:49:56 2016 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java          | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e4eadada/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java 
b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 83bbdca..d0f00fa 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -38,7 +38,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class AsyncFSStorageAgent extends FSStorageAgent
 {
   private final transient Configuration conf;
-  private final transient String localBasePath;
+  private transient volatile String localBasePath;
 
   private boolean syncCheckpoint = false;
 
@@ -53,12 +53,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   public AsyncFSStorageAgent(String path, Configuration conf)
   {
     super(path, conf);
-    try {
-      this.localBasePath = Files.createTempDirectory("chkp").toString();
-    } catch (IOException ex) {
-      throw new RuntimeException(ex);
-    }
-    logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
     this.conf = conf == null ? new Configuration() : conf;
   }
 
@@ -74,7 +68,12 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   @Override
   public void save(final Object object, final int operatorId, final long 
windowId) throws IOException
   {
-    if(syncCheckpoint){
+    // save() is only called by one thread in the worker container so the 
following is okay
+    if (this.localBasePath == null) {
+      this.localBasePath = Files.createTempDirectory("chkp").toString();
+      logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
+    }
+    if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
     }
@@ -90,6 +89,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent
 
   public void copyToHDFS(final int operatorId, final long windowId) throws 
IOException
   {
+    if (this.localBasePath == null) {
+      throw new AssertionError("save() was not called before copyToHDFS");
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     String window = Long.toHexString(windowId);

Reply via email to