Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 7995cbe74 -> 8655ffabe
APEXCORE-445 - Race condition in AsynFSStorageAgent.save() Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/8655ffab Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/8655ffab Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/8655ffab Branch: refs/heads/release-3.3 Commit: 8655ffabe84b4493606ab899c5b2a6eaefd8013d Parents: 7995cbe Author: Vlad Rozov <v.ro...@datatorrent.com> Authored: Tue Apr 26 15:04:20 2016 -0700 Committer: Thomas Weise <tho...@datatorrent.com> Committed: Tue Apr 26 19:23:01 2016 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/8655ffab/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index e28c0e0..673eb9e 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -57,7 +57,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent { super(); conf = null; - localBasePath = null; } public AsyncFSStorageAgent(String path, Configuration conf) @@ -73,20 +72,25 @@ public class AsyncFSStorageAgent extends FSStorageAgent public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf) { this(path, conf); + this.localBasePath = localBasePath; } @Override public void save(final Object object, final int operatorId, final long windowId) throws IOException { - // save() is only called by one thread in the worker container so the following is okay - if (this.localBasePath == null) { - this.localBasePath = Files.createTempDirectory("chkp").toString(); - logger.info("using {} as the basepath for checkpointing.", this.localBasePath); - } if (syncCheckpoint) { super.save(object, operatorId, windowId); return; } + + if (localBasePath == null) { + synchronized (this) { + if (localBasePath == null) { + localBasePath = Files.createTempDirectory("chkp").toString(); + logger.info("using {} as the basepath for checkpointing.", localBasePath); + } + } + } String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); if (!directory.exists()) {