Repository: incubator-apex-core Updated Branches: refs/heads/master d421a5abd -> 03267b3de
APEXCORE-391 #resolve delay the creation of tmp directory until save is called Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/03267b3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/03267b3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/03267b3d Branch: refs/heads/master Commit: 03267b3de7cbbd53d1c8ccad21e45021f26e9c33 Parents: d421a5a Author: David Yan <[email protected]> Authored: Wed Mar 16 13:37:30 2016 -0700 Committer: David Yan <[email protected]> Committed: Wed Mar 16 13:37:30 2016 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/03267b3d/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index 788a68c..e28c0e0 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -48,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class AsyncFSStorageAgent extends FSStorageAgent { private final transient Configuration conf; - private final transient String localBasePath; + private transient volatile String localBasePath; private boolean syncCheckpoint = false; @@ -63,12 +63,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent public AsyncFSStorageAgent(String path, Configuration conf) { super(path, conf); - try { - this.localBasePath = Files.createTempDirectory("chkp").toString(); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - logger.info("using {} as the basepath for checkpointing.", this.localBasePath); this.conf = conf == null ? new Configuration() : conf; } @@ -84,6 +78,11 @@ public class AsyncFSStorageAgent extends FSStorageAgent @Override public void save(final Object object, final int operatorId, final long windowId) throws IOException { + // save() is only called by one thread in the worker container so the following is okay + if (this.localBasePath == null) { + this.localBasePath = Files.createTempDirectory("chkp").toString(); + logger.info("using {} as the basepath for checkpointing.", this.localBasePath); + } if (syncCheckpoint) { super.save(object, operatorId, windowId); return; @@ -100,6 +99,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent public void copyToHDFS(final int operatorId, final long windowId) throws IOException { + if (this.localBasePath == null) { + throw new AssertionError("save() was not called before copyToHDFS"); + } String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); String window = Long.toHexString(windowId);
