Repository: incubator-apex-core
Updated Branches:
refs/heads/release-3.2 031b5785b -> e4eadadab
APEXCORE-391 #resolve delay the creation of tmp directory until save is called
Conflicts:
common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit:
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/e4eadada
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/e4eadada
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/e4eadada
Branch: refs/heads/release-3.2
Commit: e4eadadab395ea7403033e5a401f769f7f34a166
Parents: 031b578
Author: David Yan <[email protected]>
Authored: Wed Mar 16 13:37:30 2016 -0700
Committer: Thomas Weise <[email protected]>
Committed: Wed Mar 16 16:49:56 2016 -0700
----------------------------------------------------------------------
.../common/util/AsyncFSStorageAgent.java | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/e4eadada/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
----------------------------------------------------------------------
diff --git
a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 83bbdca..d0f00fa 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -38,7 +38,7 @@ import com.datatorrent.netlet.util.DTThrowable;
public class AsyncFSStorageAgent extends FSStorageAgent
{
private final transient Configuration conf;
- private final transient String localBasePath;
+ private transient volatile String localBasePath;
private boolean syncCheckpoint = false;
@@ -53,12 +53,6 @@ public class AsyncFSStorageAgent extends FSStorageAgent
public AsyncFSStorageAgent(String path, Configuration conf)
{
super(path, conf);
- try {
- this.localBasePath = Files.createTempDirectory("chkp").toString();
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
- logger.info("using {} as the basepath for checkpointing.",
this.localBasePath);
this.conf = conf == null ? new Configuration() : conf;
}
@@ -74,7 +68,12 @@ public class AsyncFSStorageAgent extends FSStorageAgent
@Override
public void save(final Object object, final int operatorId, final long
windowId) throws IOException
{
- if(syncCheckpoint){
+ // save() is only called by one thread in the worker container so the
following is okay
+ if (this.localBasePath == null) {
+ this.localBasePath = Files.createTempDirectory("chkp").toString();
+ logger.info("using {} as the basepath for checkpointing.",
this.localBasePath);
+ }
+ if (syncCheckpoint) {
super.save(object, operatorId, windowId);
return;
}
@@ -90,6 +89,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent
public void copyToHDFS(final int operatorId, final long windowId) throws
IOException
{
+ if (this.localBasePath == null) {
+ throw new AssertionError("save() was not called before copyToHDFS");
+ }
String operatorIdStr = String.valueOf(operatorId);
File directory = new File(localBasePath, operatorIdStr);
String window = Long.toHexString(windowId);