Repository: incubator-apex-core Updated Branches: refs/heads/master 5786866e5 -> 4a83c6db9
APEXCORE-445 - Race condition in AsynFSStorageAgent.save() Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/4a83c6db Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/4a83c6db Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/4a83c6db Branch: refs/heads/master Commit: 4a83c6db95e8b7a84f74929ef586fcd3497de08c Parents: 5786866 Author: Vlad Rozov <v.ro...@datatorrent.com> Authored: Tue Apr 26 15:04:20 2016 -0700 Committer: Thomas Weise <tho...@datatorrent.com> Committed: Tue Apr 26 19:23:48 2016 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/4a83c6db/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index 01a23d8..24d850e 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -57,7 +57,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent { super(); conf = null; - localBasePath = null; } public AsyncFSStorageAgent(String path, Configuration conf) @@ -73,20 +72,25 @@ public class AsyncFSStorageAgent extends FSStorageAgent public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf) { this(path, conf); + this.localBasePath = localBasePath; } @Override public void save(final Object object, final int operatorId, final long windowId) throws IOException { - // save() is only called by one thread in the worker container so the following is okay - if (this.localBasePath == null) { - this.localBasePath = Files.createTempDirectory("chkp").toString(); - logger.info("using {} as the basepath for checkpointing.", this.localBasePath); - } if (syncCheckpoint) { super.save(object, operatorId, windowId); return; } + + if (localBasePath == null) { + synchronized (this) { + if (localBasePath == null) { + localBasePath = Files.createTempDirectory("chkp").toString(); + logger.info("using {} as the basepath for checkpointing.", localBasePath); + } + } + } String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); if (!directory.exists()) {