Repository: incubator-apex-core
Updated Branches:
  refs/heads/release-3.3 7995cbe74 -> 8655ffabe


APEXCORE-445 - Race condition in AsynFSStorageAgent.save()


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8655ffab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8655ffab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8655ffab

Branch: refs/heads/release-3.3
Commit: 8655ffabe84b4493606ab899c5b2a6eaefd8013d
Parents: 7995cbe
Author: Vlad Rozov <v.ro...@datatorrent.com>
Authored: Tue Apr 26 15:04:20 2016 -0700
Committer: Thomas Weise <tho...@datatorrent.com>
Committed: Tue Apr 26 19:23:01 2016 -0700

----------------------------------------------------------------------
 .../common/util/AsyncFSStorageAgent.java            | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8655ffab/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java 
b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index e28c0e0..673eb9e 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -57,7 +57,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   {
     super();
     conf = null;
-    localBasePath = null;
   }
 
   public AsyncFSStorageAgent(String path, Configuration conf)
@@ -73,20 +72,25 @@ public class AsyncFSStorageAgent extends FSStorageAgent
   public AsyncFSStorageAgent(String localBasePath, String path, Configuration 
conf)
   {
     this(path, conf);
+    this.localBasePath = localBasePath;
   }
 
   @Override
   public void save(final Object object, final int operatorId, final long 
windowId) throws IOException
   {
-    // save() is only called by one thread in the worker container so the 
following is okay
-    if (this.localBasePath == null) {
-      this.localBasePath = Files.createTempDirectory("chkp").toString();
-      logger.info("using {} as the basepath for checkpointing.", 
this.localBasePath);
-    }
     if (syncCheckpoint) {
       super.save(object, operatorId, windowId);
       return;
     }
+
+    if (localBasePath == null) {
+      synchronized (this) {
+        if (localBasePath == null) {
+          localBasePath = Files.createTempDirectory("chkp").toString();
+          logger.info("using {} as the basepath for checkpointing.", 
localBasePath);
+        }
+      }
+    }
     String operatorIdStr = String.valueOf(operatorId);
     File directory = new File(localBasePath, operatorIdStr);
     if (!directory.exists()) {

Reply via email to