Repository: incubator-apex-core
Updated Branches:
  refs/heads/master 5786866e5 -> 4a83c6db9


APEXCORE-445 - Race condition in AsynFSStorageAgent.save()


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4a83c6db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4a83c6db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4a83c6db

Branch: refs/heads/master
Commit: 4a83c6db95e8b7a84f74929ef586fcd3497de08c
Parents: 5786866
Author: Vlad Rozov <v.ro...@datatorrent.com>
Authored: Tue Apr 26 15:04:20 2016 -0700
Committer: Thomas Weise <tho...@datatorrent.com>
Committed: Tue Apr 26 19:23:48 2016 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java            | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4a83c6db/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java 
b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 01a23d8..24d850e 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -57,7 +57,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   {
     super();
     conf = null;
-    localBasePath = null;
   }
 
   public AsyncFSStorageAgent(String path, Configuration conf)
@@ -73,20 +72,25 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   public AsyncFSStorageAgent(String localBasePath, String path, Configuration 
conf)
   {
     this(path, conf);
+    this.localBasePath = localBasePath;
   }
 
   @Override
   public void save(final Object object, final int operatorId, final long 
windowId) throws IOException
   {
-    // save() is only called by one thread in the worker container so the 
following is okay
-    if (this.localBasePath == null) {
-      this.localBasePath = Files.createTempDirectory("chkp").toString();
-      logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
-    }
     if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
     }
+
+    if (localBasePath == null) {
+      synchronized (this) {
+        if (localBasePath == null) {
+          localBasePath = Files.createTempDirectory("chkp").toString();
+          logger.info("using {} as the basepath for checkpointing.", 
localBasePath);
+        }
+      }
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     if (!directory.exists()) {

Reply via email to