Repository: incubator-apex-core Updated Branches: refs/heads/release-3.3 ac95e0584 -> 1a59c3c41
APEXCORE-391 #resolve delay the creation of tmp directory until save is called Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/1a59c3c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/1a59c3c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/1a59c3c4 Branch: refs/heads/release-3.3 Commit: 1a59c3c416b379bae6fe71b69f7c2bdb0e6871da Parents: ac95e05 Author: David Yan <[email protected]> Authored: Wed Mar 16 13:37:30 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Wed Mar 16 16:45:24 2016 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/1a59c3c4/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index 788a68c..e28c0e0 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -48,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class AsyncFSStorageAgent extends FSStorageAgent { private final transient Configuration conf; - private final transient String localBasePath; + private transient volatile String localBasePath; private boolean syncCheckpoint = false; @@ -63,12 +63,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent public AsyncFSStorageAgent(String path, Configuration conf) { super(path, conf); - try { - this.localBasePath = Files.createTempDirectory("chkp").toString(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - logger.info("using {} as the basepath for checkpointing.", this.localBasePath); this.conf = conf == null ? new Configuration() : conf; } @@ -84,6 +78,11 @@ public class AsyncFSStorageAgent extends FSStorageAgent @Override public void save(final Object object, final int operatorId, final long windowId) throws IOException { + // save() is only called by one thread in the worker container so the following is okay + if (this.localBasePath == null) { + this.localBasePath = Files.createTempDirectory("chkp").toString(); + logger.info("using {} as the basepath for checkpointing.", this.localBasePath); + } if (syncCheckpoint) { super.save(object, operatorId, windowId); return; @@ -100,6 +99,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent public void copyToHDFS(final int operatorId, final long windowId) throws IOException { + if (this.localBasePath == null) { + throw new AssertionError("save() was not called before copyToHDFS"); + } String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); String window = Long.toHexString(windowId);
