Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 e512610ed -> 09f716e00
APEX-97, APEX-96 #resolve Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9b2abe09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9b2abe09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9b2abe09 Branch: refs/heads/devel-3 Commit: 9b2abe09228714bdffdde1991942c70d9a94b50a Parents: 711fd07 Author: Gaurav <[email protected]> Authored: Fri Sep 4 15:19:23 2015 -0700 Committer: Gaurav <[email protected]> Committed: Fri Sep 4 17:59:18 2015 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 8 ++++- .../java/com/datatorrent/stram/engine/Node.java | 31 ++++++++++---------- .../stram/plan/physical/PhysicalPlan.java | 5 +++- 3 files changed, 27 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index b89ae59..cc8da25 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -66,6 +66,10 @@ public class AsyncFSStorageAgent extends FSStorageAgent @Override public void save(final Object object, final int operatorId, final long windowId) throws IOException { + if(syncCheckpoint){ + super.save(object, operatorId, windowId); + return; + } String operatorIdStr = String.valueOf(operatorId); File directory = new File(localBasePath, operatorIdStr); if (!directory.exists()) { @@ -120,7 +124,9 @@ public class AsyncFSStorageAgent extends FSStorageAgent @Override public Object readResolve() throws ObjectStreamException { - return new AsyncFSStorageAgent(this.path, null); + AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null); + asyncFSStorageAgent.setSyncCheckpoint(syncCheckpoint); + return asyncFSStorageAgent; } public boolean isSyncCheckpoint() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index ea33970..7b1e762 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -466,21 +466,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera ba.save(operator, id, windowId); if (ba instanceof AsyncFSStorageAgent) { AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba; - if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { - CheckpointHandler checkpointHandler = new CheckpointHandler(); - checkpointHandler.agent = asyncFSStorageAgent; - checkpointHandler.operatorId = id; - checkpointHandler.windowId = windowId; - checkpointHandler.stats = checkpointStats; - FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler); - taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId)); - executorService.submit(futureTask); - checkpoint = null; - checkpointStats = null; - return; - } - else { - asyncFSStorageAgent.copyToHDFS(id, windowId); + if (!asyncFSStorageAgent.isSyncCheckpoint()) { + if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { + CheckpointHandler checkpointHandler = new CheckpointHandler(); + checkpointHandler.agent = asyncFSStorageAgent; + checkpointHandler.operatorId = id; + checkpointHandler.windowId = windowId; + checkpointHandler.stats = checkpointStats; + FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler); + taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId)); + executorService.submit(futureTask); + checkpoint = null; + checkpointStats = null; + return; + }else{ + asyncFSStorageAgent.copyToHDFS(id, windowId); + } } } checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9b2abe09/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 2176035..de2e8d5 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1053,7 +1053,10 @@ public class PhysicalPlan implements Serializable StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT); agent.save(oo, oper.id, windowId); if (agent instanceof AsyncFSStorageAgent) { - ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId); + AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent; + if(!asyncFSStorageAgent.isSyncCheckpoint()) { + asyncFSStorageAgent.copyToHDFS(oper.id, windowId); + } } } catch (IOException e) { // inconsistent state, no recovery option, requires shutdown
