writing checkpoints async
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/29eb6c37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/29eb6c37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/29eb6c37 Branch: refs/heads/master Commit: 29eb6c377e92242c540d9a4d8be43a1fe05b7ac2 Parents: 66a75e0 Author: Gaurav <[email protected]> Authored: Thu Jul 30 11:15:24 2015 -0700 Committer: Gaurav <[email protected]> Committed: Tue Aug 4 16:28:17 2015 -0700 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 111 ++++++++++++++++ .../datatorrent/common/util/FSStorageAgent.java | 3 +- .../common/codec/JsonStreamCodecTest.java | 15 ++- .../common/util/AsyncFSStorageAgentTest.java | 133 +++++++++++++++++++ .../java/com/datatorrent/stram/StramClient.java | 5 +- .../datatorrent/stram/StramLocalCluster.java | 4 +- .../stram/StreamingAppMasterService.java | 2 +- .../stram/StreamingContainerManager.java | 10 +- .../java/com/datatorrent/stram/engine/Node.java | 64 ++++++++- .../stram/plan/physical/PhysicalPlan.java | 8 +- .../com/datatorrent/stram/CheckpointTest.java | 11 +- .../stram/LogicalPlanModificationTest.java | 22 ++- .../com/datatorrent/stram/PartitioningTest.java | 26 +++- .../stram/StramLocalClusterTest.java | 22 ++- .../datatorrent/stram/StramMiniClusterTest.java | 9 +- .../datatorrent/stram/StramRecoveryTest.java | 56 ++++++-- .../stram/StreamingContainerManagerTest.java | 45 ++++++- .../stram/debug/TupleRecorderTest.java | 3 + .../stram/engine/AutoMetricTest.java | 2 + .../stram/engine/InputOperatorTest.java | 5 +- .../stram/engine/ProcessingModeTests.java | 9 ++ .../datatorrent/stram/engine/SliderTest.java | 5 + .../com/datatorrent/stram/engine/StatsTest.java | 10 +- .../stram/engine/WindowGeneratorTest.java | 11 +- .../stram/webapp/StramWebServicesTest.java | 6 +- 25 files changed, 527 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java new file mode 100644 index 0000000..d5de61c --- /dev/null +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.common.util; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectStreamException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncFSStorageAgent extends FSStorageAgent +{ + private final transient FileSystem fs; + private final transient Configuration conf; + private final String localBasePath; + + private boolean syncCheckpoint = false; + + private AsyncFSStorageAgent() + { + super(); + fs = null; + conf = null; + localBasePath = null; + } + + public AsyncFSStorageAgent(String path, Configuration conf) + { + this(".", path, conf); + } + + public AsyncFSStorageAgent(String localBasePath, String path, Configuration conf) + { + super(path, conf); + if (localBasePath == null) { + this.localBasePath = "/tmp"; + } + else { + this.localBasePath = localBasePath; + } + logger.debug("Initialize storage agent with {}.", this.localBasePath); + this.conf = conf == null ? new Configuration() : conf; + try { + fs = FileSystem.newInstance(this.conf); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void save(final Object object, final int operatorId, final long windowId) throws IOException + { + String operatorIdStr = String.valueOf(operatorId); + File directory = new File(localBasePath, operatorIdStr); + if (!directory.exists()) { + directory.mkdirs(); + } + try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(windowId)))) { + store(stream, object); + } + } + + public void copyToHDFS(final int operatorId, final long windowId) throws IOException + { + String operatorIdStr = String.valueOf(operatorId); + File directory = new File(localBasePath, operatorIdStr); + String window = Long.toHexString(windowId); + Path lPath = new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + System.currentTimeMillis() + TMP_FILE); + FileUtil.copy(new File(directory, String.valueOf(windowId)), fs, lPath, true, conf); + fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), Options.Rename.OVERWRITE); + } + + @Override + public Object readResolve() throws ObjectStreamException + { + return new AsyncFSStorageAgent(this.localBasePath, this.path, null); + } + + public boolean isSyncCheckpoint() + { + return syncCheckpoint; + } + + public void setSyncCheckpoint(boolean syncCheckpoint) + { + this.syncCheckpoint = syncCheckpoint; + } + + private static final long serialVersionUID = 201507241610L; + private static final Logger logger = LoggerFactory.getLogger(AsyncFSStorageAgent.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java index 31b537d..14275fa 100644 --- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java @@ -52,8 +52,7 @@ public class FSStorageAgent implements StorageAgent, Serializable kryo = new Kryo(); } - @SuppressWarnings("unused") - private FSStorageAgent() + protected FSStorageAgent() { path = null; fileContext = null; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java index e0a5f01..a9303bc 100644 --- a/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java +++ b/common/src/test/java/com/datatorrent/common/codec/JsonStreamCodecTest.java @@ -1,14 +1,17 @@ /** * Copyright (C) 2015 DataTorrent, Inc. * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package com.datatorrent.common.codec; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java new file mode 100644 index 0000000..e7f9f66 --- /dev/null +++ b/common/src/test/java/com/datatorrent/common/util/AsyncFSStorageAgentTest.java @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2015 DataTorrent, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datatorrent.common.util; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; + +public class AsyncFSStorageAgentTest +{ + private static class TestMeta extends TestWatcher + { + String applicationPath; + String basePath; + AsyncFSStorageAgent storageAgent; + + @Override + protected void starting(Description description) + { + super.starting(description); + basePath = "target/" + description.getClassName() + "/" + description.getMethodName(); + applicationPath = basePath + "/app"; + try { + FileUtils.forceMkdir(new File(basePath)); + } catch (IOException e) { + throw new RuntimeException(e); + } + storageAgent = new AsyncFSStorageAgent(basePath, applicationPath, null); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSave() throws IOException + { + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + testMeta.storageAgent.save(data, 1, 1); + testMeta.storageAgent.copyToHDFS(1, 1); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded = (Map<Integer, String>) testMeta.storageAgent.load(1, 1); + Assert.assertEquals("dataOf1", data, decoded); + } + + @Test + public void testLoad() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageAgent.save(dataOf1, 1, 1); + testMeta.storageAgent.copyToHDFS(1, 1); + testMeta.storageAgent.save(dataOf2, 2, 1); + testMeta.storageAgent.copyToHDFS(2, 1); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded1 = (Map<Integer, String>) testMeta.storageAgent.load(1, 1); + + @SuppressWarnings("unchecked") + Map<Integer, String> decoded2 = (Map<Integer, String>) testMeta.storageAgent.load(2, 1); + Assert.assertEquals("data of 1", dataOf1, decoded1); + Assert.assertEquals("data of 2", dataOf2, decoded2); + } + + @Test + public void testRecovery() throws IOException + { + testSave(); + testMeta.storageAgent = new AsyncFSStorageAgent(testMeta.basePath, testMeta.applicationPath, null); + testSave(); + } + + @Test + public void testDelete() throws IOException + { + testLoad(); + testMeta.storageAgent.delete(1, 1); + Path appPath = new Path(testMeta.applicationPath); + FileContext fileContext = FileContext.getFileContext(); + Assert.assertTrue("operator 2 window 1", fileContext.util().exists(new Path(appPath + "/" + 2 + "/" + 1))); + Assert.assertFalse("operator 1 window 1", fileContext.util().exists(new Path(appPath + "/" + 1 + "/" + 1))); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 657f678..dfb4511 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -52,7 +52,7 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.DTLoggerFactory; import com.datatorrent.api.Context.OperatorContext; - +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BasicContainerOptConfigurator; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.client.StramClientUtils; @@ -456,8 +456,9 @@ public class StramClient if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */ Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS); // use conf client side to pickup any proxy settings from dt-site.xml - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(checkpointPath.toString(), conf)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointPath.toString(), conf)); } + if(dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR) == null){ dag.setAttribute(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR,new BasicContainerOptConfigurator()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java index c7ac0cb..e28c097 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java +++ b/engine/src/main/java/com/datatorrent/stram/StramLocalCluster.java @@ -35,11 +35,13 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.net.NetUtils; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode.Controller; import com.datatorrent.api.Operator; import com.datatorrent.bufferserver.server.Server; import com.datatorrent.bufferserver.storage.DiskStorage; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; import com.datatorrent.stram.StreamingContainerManager.ContainerResource; @@ -298,7 +300,7 @@ public class StramLocalCluster implements Runnable, Controller dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, pathUri); } if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new Path(pathUri, LogicalPlan.SUBDIR_CHECKPOINTS).toString(), null)); } this.dnmgr = new StreamingContainerManager(dag); this.umbilical = new UmbilicalProtocolLocalImpl(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index dbb3d11..5246c9e 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 95f4648..0847f3c 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -76,6 +76,7 @@ import com.datatorrent.api.annotation.Stateless; import com.datatorrent.bufferserver.auth.AuthManager; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.common.experimental.AppData; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.common.util.NumberAggregate; import com.datatorrent.common.util.Pair; @@ -2949,7 +2950,14 @@ public class StreamingContainerManager implements PlanContext this.finals = new FinalVars(finals, lp); StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT); - if (sa instanceof FSStorageAgent) { + if(sa instanceof AsyncFSStorageAgent){ + // replace the default storage agent, if present + AsyncFSStorageAgent fssa = (AsyncFSStorageAgent) sa; + if (fssa.path.contains(oldAppId)) { + fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf); + lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa); + } + } else if (sa instanceof FSStorageAgent) { // replace the default storage agent, if present FSStorageAgent fssa = (FSStorageAgent) sa; if (fssa.path.contains(oldAppId)) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 24679dc..ea33970 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -27,8 +27,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; @@ -46,6 +45,9 @@ import com.datatorrent.api.Operator.Unifier; import com.datatorrent.api.StatsListener.OperatorRequest; import com.datatorrent.bufferserver.util.Codec; +import com.datatorrent.common.util.AsyncFSStorageAgent; +import com.datatorrent.common.util.Pair; +import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.OperatorDeployInfo; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats; @@ -99,12 +101,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera public final BlockingQueue<StatsListener.OperatorResponse> commandResponse; private final List<Field> metricFields; private final Map<String, Method> metricMethods; + private ExecutorService executorService; + private Queue<Pair<FutureTask<Stats.CheckpointStats>, Long>> taskQueue; protected Stats.CheckpointStats checkpointStats; public Node(OPERATOR operator, OperatorContext context) { this.operator = operator; this.context = context; + executorService = Executors.newSingleThreadExecutor(); + taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, Long>>(); outputs = new HashMap<String, Sink<Object>>(); @@ -173,6 +179,9 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera pcpair.component.teardown(); } + if (executorService != null) { + executorService.shutdownNow(); + } operator.teardown(); } @@ -405,6 +414,21 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpointStats = null; checkpoint = null; } + else { + Pair<FutureTask<Stats.CheckpointStats>, Long> pair = taskQueue.peek(); + if (pair != null && pair.getFirst().isDone()) { + taskQueue.poll(); + try { + stats.checkpointStats = pair.getFirst().get(); + stats.checkpoint = new Checkpoint(pair.getSecond(), applicationWindowCount, checkpointWindowCount); + if (operator instanceof Operator.CheckpointListener) { + ((Operator.CheckpointListener) operator).checkpointed(pair.getSecond()); + } + } catch (Exception ex) { + throw DTThrowable.wrapIfChecked(ex); + } + } + } context.report(stats, windowId); } @@ -440,6 +464,25 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpointStats = new Stats.CheckpointStats(); checkpointStats.checkpointStartTime = System.currentTimeMillis(); ba.save(operator, id, windowId); + if (ba instanceof AsyncFSStorageAgent) { + AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba; + if (!asyncFSStorageAgent.isSyncCheckpoint() && PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { + CheckpointHandler checkpointHandler = new CheckpointHandler(); + checkpointHandler.agent = asyncFSStorageAgent; + checkpointHandler.operatorId = id; + checkpointHandler.windowId = windowId; + checkpointHandler.stats = checkpointStats; + FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler); + taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId)); + executorService.submit(futureTask); + checkpoint = null; + checkpointStats = null; + return; + } + else { + asyncFSStorageAgent.copyToHDFS(id, windowId); + } + } checkpointStats.checkpointTime = System.currentTimeMillis() - checkpointStats.checkpointStartTime; } catch (IOException ie) { @@ -570,5 +613,22 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera deactivateSinks(); } + private class CheckpointHandler implements Callable<Stats.CheckpointStats> + { + + public AsyncFSStorageAgent agent; + public int operatorId; + public long windowId; + public Stats.CheckpointStats stats; + + @Override + public Stats.CheckpointStats call() throws Exception + { + agent.copyToHDFS(id, windowId); + stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime; + return stats; + } + } + private static final Logger logger = LoggerFactory.getLogger(Node.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index 7c0432d..5b90c04 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -40,6 +40,8 @@ import com.datatorrent.api.Partitioner.Partition; import com.datatorrent.api.Partitioner.PartitionKeys; import com.datatorrent.api.StatsListener.OperatorRequest; import com.datatorrent.api.annotation.Stateless; + +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.Journal.Recoverable; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.StramEvent; @@ -941,7 +943,11 @@ public class PhysicalPlan implements Serializable try { LOG.debug("Writing activation checkpoint {} {} {}", checkpoint, oper, oo); long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId; - oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT).save(oo, oper.id, windowId); + StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT); + agent.save(oo, oper.id, windowId); + if (agent instanceof AsyncFSStorageAgent) { + ((AsyncFSStorageAgent) agent).copyToHDFS(oper.id, windowId); + } } catch (IOException e) { // inconsistent state, no recovery option, requires shutdown throw new IllegalStateException("Failed to write operator state after partition change " + oper, e); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index dd804ec..4072894 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -37,6 +37,7 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.MockContainer.MockOperatorStats; import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext; import com.datatorrent.stram.api.Checkpoint; @@ -111,6 +112,9 @@ public class CheckpointTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + AsyncFSStorageAgent storageAgent = new AsyncFSStorageAgent(testMeta.dir + "/locaPath", testMeta.dir, null); + storageAgent.setSyncCheckpoint(true); + dag.setAttribute(OperatorContext.STORAGE_AGENT, storageAgent); dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1); dag.setAttribute(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50); dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -127,14 +131,13 @@ public class CheckpointTest sc.setHeartbeatMonitoringEnabled(false); sc.run(); - StorageAgent fssa = sc.getDAG().getValue(OperatorContext.STORAGE_AGENT); StreamingContainerManager dnm = sc.dnmgr; PhysicalPlan plan = dnm.getPhysicalPlan(); Assert.assertEquals("number required containers", 1, dnm.getPhysicalPlan().getContainers().size()); PTOperator o1p1 = plan.getOperators(dag.getMeta(o1)).get(0); Set<Long> checkpoints = Sets.newHashSet(); - for (long windowId : fssa.getWindowIds(o1p1.getId())) { + for (long windowId : storageAgent.getWindowIds(o1p1.getId())) { checkpoints.add(windowId); } Assert.assertEquals("number checkpoints " + checkpoints, 3, checkpoints.size()); @@ -142,7 +145,7 @@ public class CheckpointTest PTOperator o2p1 = plan.getOperators(dag.getMeta(o2)).get(0); checkpoints = Sets.newHashSet(); - for (long windowId : fssa.getWindowIds(o2p1.getId())) { + for (long windowId : storageAgent.getWindowIds(o2p1.getId())) { checkpoints.add(windowId); } Assert.assertEquals("number checkpoints " + checkpoints, 1, checkpoints.size()); @@ -152,7 +155,7 @@ public class CheckpointTest Assert.assertNotNull("checkpoint not null for statefull operator " + o1p1, o1p1.stats.checkpointStats); for (Checkpoint cp : o1p1.checkpoints) { - Object load = fssa.load(o1p1.getId(), cp.windowId); + Object load = storageAgent.load(o1p1.getId(), cp.windowId); Assert.assertEquals("Stored Operator and Saved State", load.getClass(), o1p1.getOperatorMeta().getOperator().getClass()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java index db1d9ec..78a1bd8 100644 --- a/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java +++ b/engine/src/test/java/com/datatorrent/stram/LogicalPlanModificationTest.java @@ -27,9 +27,10 @@ import org.junit.Rule; import org.junit.Test; import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StorageAgent; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; -import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.engine.GenericTestOperator; import com.datatorrent.stram.engine.OperatorContext; import com.datatorrent.stram.engine.TestGeneratorInputOperator; @@ -291,15 +292,14 @@ public class LogicalPlanModificationTest } - @Test - public void testExecutionManager() throws Exception { + private void testExecutionManager(StorageAgent agent) throws Exception { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); StreamingContainerManager dnm = new StreamingContainerManager(dag); - Assert.assertEquals(""+dnm.containerStartRequests, dnm.containerStartRequests.size(), 0); + Assert.assertEquals("" + dnm.containerStartRequests, dnm.containerStartRequests.size(), 0); CreateOperatorRequest cor = new CreateOperatorRequest(); @@ -331,4 +331,16 @@ public class LogicalPlanModificationTest } + @Test + public void testExecutionManagerWithSyncStorageAgent() throws Exception + { + testExecutionManager(new FSStorageAgent(testMeta.dir, null)); + } + + @Test + public void testExecutionManagerWithAsyncStorageAgent() throws Exception + { + testExecutionManager(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + } + } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java index 9c169ee..15ad76e 100644 --- a/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java +++ b/engine/src/test/java/com/datatorrent/stram/PartitioningTest.java @@ -27,13 +27,13 @@ import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.collect.Sets; + import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.engine.Node; @@ -150,6 +150,8 @@ public class PartitioningTest public void testDefaultPartitioning() throws Exception { LogicalPlan dag = new LogicalPlan(); + File checkpointDir = new File(TEST_OUTPUT_DIR, "testDefaultPartitioning"); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); Integer[][] testData = { {4, 5} @@ -249,6 +251,9 @@ public class PartitioningTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 5); + File checkpointDir = new File(TEST_OUTPUT_DIR, "testDynamicDefaultPartitioning"); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); + CollectorOperator.receivedTuples.clear(); TestInputOperator<Integer> input = dag.addOperator("input", new TestInputOperator<Integer>()); @@ -391,12 +396,12 @@ public class PartitioningTest * * @throws Exception */ - @Test - public void testInputOperatorPartitioning() throws Exception + + private void testInputOperatorPartitioning(LogicalPlan dag) throws Exception { File checkpointDir = new File(TEST_OUTPUT_DIR, "testInputOperatorPartitioning"); - LogicalPlan dag = new LogicalPlan(); dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, checkpointDir.getPath()); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null)); PartitionableInputOperator input = dag.addOperator("input", new PartitionableInputOperator()); dag.setAttribute(input, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{new PartitionLoadWatch()})); @@ -418,7 +423,10 @@ public class PartitioningTest Checkpoint checkpoint = new Checkpoint(10L, 0, 0); p.checkpoints.add(checkpoint); p.setRecoveryCheckpoint(checkpoint); - new FSStorageAgent(checkpointDir.getPath(), null).save(inputDeployed, p.getId(), 10L); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(checkpointDir.getPath() + "/localPath", checkpointDir.getPath(), null); + agent.save(inputDeployed, p.getId(), 10L); + agent.copyToHDFS(p.getId(), 10l); + } Assert.assertEquals("", Sets.newHashSet("partition_0", "partition_1", "partition_2"), partProperties); @@ -447,6 +455,12 @@ public class PartitioningTest } + @Test + public void testInputOperatorPartitioningWithAsyncStorageAgent() throws Exception + { + LogicalPlan dag = new LogicalPlan(); + testInputOperatorPartitioning(dag); + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java index 8489c70..1881566 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramLocalClusterTest.java @@ -15,7 +15,6 @@ */ package com.datatorrent.stram; -import com.datatorrent.stram.api.Checkpoint; import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -23,22 +22,17 @@ import java.io.LineNumberReader; import java.util.Arrays; import java.util.Map; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context; + +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster.LocalStreamingContainer; import com.datatorrent.stram.StramLocalCluster.MockComponentFactory; -import com.datatorrent.stram.engine.GenericTestOperator; -import com.datatorrent.stram.engine.Node; -import com.datatorrent.stram.engine.OperatorContext; -import com.datatorrent.stram.engine.TestGeneratorInputOperator; -import com.datatorrent.stram.engine.TestOutputOperator; -import com.datatorrent.stram.engine.WindowGenerator; +import com.datatorrent.stram.api.Checkpoint; +import com.datatorrent.stram.engine.*; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.physical.PTOperator; import com.datatorrent.stram.support.ManualScheduledExecutorService; @@ -75,6 +69,7 @@ public class StramLocalClusterTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); TestGeneratorInputOperator genNode = dag.addOperator("genNode", TestGeneratorInputOperator.class); genNode.setMaxTuples(2); @@ -114,6 +109,9 @@ public class StramLocalClusterTest { LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + agent.setSyncCheckpoint(true); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, agent); TestGeneratorInputOperator node1 = dag.addOperator("o1", TestGeneratorInputOperator.class); // data will be added externally from test http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java index 4d0cd37..99478f5 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramMiniClusterTest.java @@ -48,6 +48,7 @@ import com.sun.jersey.api.client.WebResource; import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.client.StramClientUtils.YarnClientHelper; import com.datatorrent.stram.engine.GenericTestOperator; @@ -202,6 +203,9 @@ public class StramMiniClusterTest LogicalPlanConfiguration tb = new LogicalPlanConfiguration(conf); tb.addFromProperties(dagProps, null); LogicalPlan dag = createDAG(tb); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + agent.setSyncCheckpoint(true); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); Configuration yarnConf = new Configuration(yarnCluster.getConfig()); StramClient client = new StramClient(yarnConf, dag); try { @@ -357,7 +361,10 @@ public class StramMiniClusterTest { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, "file:" + System.getProperty("user.dir") + "/" + testMeta.dir); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); + AsyncFSStorageAgent agent = new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null); + agent.setSyncCheckpoint(true); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); FailingOperator badOperator = dag.addOperator("badOperator", FailingOperator.class); dag.getContextAttributes(badOperator).put(OperatorContext.RECOVERY_ATTEMPTS, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 8515734..6172d8a 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -46,6 +46,7 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.StatsListener; import com.datatorrent.api.StorageAgent; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; @@ -70,8 +71,7 @@ public class StramRecoveryTest private static final Logger LOG = LoggerFactory.getLogger(StramRecoveryTest.class); @Rule public final TestMeta testMeta = new TestMeta(); - @Test - public void testPhysicalPlanSerialization() throws Exception + private void testPhysicalPlanSerialization(StorageAgent agent) throws Exception { LogicalPlan dag = new LogicalPlan(); @@ -86,7 +86,7 @@ public class StramRecoveryTest dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 2); TestPlanContext ctx = new TestPlanContext(); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); PhysicalPlan plan = new PhysicalPlan(dag, ctx); ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -121,6 +121,18 @@ public class StramRecoveryTest } + @Test + public void testPhysicalPlanSerializationWithSyncAgent() throws Exception + { + testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null)); + } + + @Test + public void testPhysicalPlanSerializationWithAsyncAgent() throws Exception + { + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + } + public static class StatsListeningOperator extends TestGeneratorInputOperator implements StatsListener { int processStatsCnt = 0; @@ -144,14 +156,13 @@ public class StramRecoveryTest * Test serialization of the container manager with mock execution layer. * @throws Exception */ - @Test - public void testContainerManager() throws Exception + private void testContainerManager(StorageAgent agent) throws Exception { FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, testMeta.dir); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(testMeta.dir, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); StatsListeningOperator o1 = dag.addOperator("o1", StatsListeningOperator.class); @@ -254,6 +265,18 @@ public class StramRecoveryTest } @Test + public void testContainerManagerWithSyncAgent() throws Exception + { + testPhysicalPlanSerialization(new FSStorageAgent(testMeta.dir, null)); + } + + @Test + public void testContainerManagerWithAsyncAgent() throws Exception + { + testPhysicalPlanSerialization(new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); + } + + @Test public void testWriteAheadLog() throws Exception { final MutableInt flushCount = new MutableInt(); @@ -358,19 +381,17 @@ public class StramRecoveryTest scm.setPhysicalOperatorProperty(o1p1.getId(), "maxTuples", "50"); } - @Test - public void testRestartApp() throws Exception + private void testRestartApp(StorageAgent agent, String appPath1) throws Exception { FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run String appId1 = "app1"; String appId2 = "app2"; - String appPath1 = testMeta.dir + "/" + appId1; String appPath2 = testMeta.dir + "/" + appId2; LogicalPlan dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_ID, appId1); dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath1); - dag.setAttribute(OperatorContext.STORAGE_AGENT, new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null)); + dag.setAttribute(OperatorContext.STORAGE_AGENT, agent); dag.addOperator("o1", StatsListeningOperator.class); FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)); @@ -419,6 +440,21 @@ public class StramRecoveryTest } @Test + public void testRestartAppWithSyncAgent() throws Exception + { + String appPath1 = testMeta.dir + "/app1"; + testRestartApp(new FSStorageAgent(appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); + } + + @Test + public void testRestartAppWithAsyncAgent() throws Exception + { + String appPath1 = testMeta.dir + "/app1"; + String checkpointPath = testMeta.dir + "/localPath"; + testRestartApp(new AsyncFSStorageAgent(checkpointPath, appPath1 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, null), appPath1); + } + + @Test public void testRpcFailover() throws Exception { String appPath = testMeta.dir; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index c1567b8..ba15a78 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -32,6 +32,7 @@ import org.junit.Test; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.datatorrent.api.Context; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Context.PortContext; import com.datatorrent.api.DAG.Locality; @@ -41,6 +42,7 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.stram.StreamingContainerAgent.ContainerStartRequest; import com.datatorrent.stram.StreamingContainerManager.ContainerResource; @@ -56,12 +58,7 @@ import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerSt import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat; import com.datatorrent.stram.appdata.AppDataPushAgent; import com.datatorrent.stram.codec.DefaultStatefulStreamCodec; -import com.datatorrent.stram.engine.DefaultUnifier; -import com.datatorrent.stram.engine.GenericTestOperator; -import com.datatorrent.stram.engine.TestAppDataQueryOperator; -import com.datatorrent.stram.engine.TestAppDataResultOperator; -import com.datatorrent.stram.engine.TestAppDataSourceOperator; -import com.datatorrent.stram.engine.TestGeneratorInputOperator; +import com.datatorrent.stram.engine.*; import com.datatorrent.stram.plan.TestPlanContext; import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta; @@ -471,6 +468,37 @@ public class StreamingContainerManagerTest { } @Test + public void testAsyncCheckpointWindowIds() throws Exception + { + File path = new File(testMeta.dir); + FileUtils.deleteDirectory(path.getAbsoluteFile()); + FileUtils.forceMkdir(new File(path.getAbsoluteFile(), "/localPath")); + + AsyncFSStorageAgent sa = new AsyncFSStorageAgent(path.getPath() + "/localPath", path.getPath(), null); + + long[] windowIds = new long[]{123L, 345L, 234L}; + for (long windowId : windowIds) { + sa.save(windowId, 1, windowId); + sa.copyToHDFS(1, windowId); + } + + Arrays.sort(windowIds); + long[] windowsIds = sa.getWindowIds(1); + Arrays.sort(windowsIds); + Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds); + + for (long windowId : windowIds) { + sa.delete(1, windowId); + } + try { + sa.getWindowIds(1); + Assert.fail("There should not be any most recently saved windowId!"); + } catch (IOException io) { + Assert.assertTrue("No State Saved", true); + } + } + + @Test public void testProcessHeartbeat() throws Exception { FileUtils.deleteDirectory(new File(testMeta.dir)); // clean any state from previous run @@ -712,6 +740,8 @@ public class StreamingContainerManagerTest { @Test public void testPhysicalPropertyUpdate() throws Exception{ LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testPhysicalPropertyUpdate").getAbsolutePath(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.addStream("o1.outport", o1.outport, o2.inport1); @@ -735,7 +765,6 @@ public class StreamingContainerManagerTest { Class<? extends TestAppDataSourceOperator> dsClass, Class<? extends TestAppDataResultOperator> rClass) { LogicalPlan dag = new LogicalPlan(); - TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); TestAppDataQueryOperator q = dag.addOperator("q", qClass); TestAppDataResultOperator r = dag.addOperator("r", rClass); @@ -755,6 +784,8 @@ public class StreamingContainerManagerTest { private void testAppDataSources(LogicalPlan dag, boolean appendQIDToTopic) throws Exception { + String workingDir = new File("target/testAppDataSources").getAbsolutePath(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); StramLocalCluster lc = new StramLocalCluster(dag); lc.runAsync(); StreamingContainerManager dnmgr = lc.dnmgr; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java index 5f68c6a..718bf1b 100644 --- a/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/debug/TupleRecorderTest.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.engine.StreamingContainer; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.debug.TupleRecorder.PortInfo; @@ -210,6 +212,7 @@ public class TupleRecorderTest public void testRecordingFlow() throws Exception { LogicalPlan dag = new LogicalPlan(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir.getAbsolutePath() + "/localPath", testWorkDir.getAbsolutePath(), null)); dag.getAttributes().put(LogicalPlan.APPLICATION_PATH, "file://" + testWorkDir.getAbsolutePath()); dag.getAttributes().put(LogicalPlan.TUPLE_RECORDING_PART_FILE_SIZE, 1024); // 1KB per part http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java index a95956e..752adeb 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/AutoMetricTest.java @@ -41,6 +41,7 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.engine.AutoMetricTest.TestOperator.TestStatsListener; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -183,6 +184,7 @@ public class AutoMetricTest public void testMetricPropagation() throws Exception { LogicalPlan dag = new LogicalPlan(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testMeta.dir + "/localPath", testMeta.dir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java index 83bd61f..142f45f 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/InputOperatorTest.java @@ -20,6 +20,7 @@ import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.support.StramTestSupport; import com.datatorrent.stram.support.StramTestSupport.WaitCondition; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; @@ -28,13 +29,13 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.CircularBuffer; +import java.io.File; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; - import org.junit.Test; /** @@ -124,6 +125,8 @@ public class InputOperatorTest public void testSomeMethod() throws Exception { LogicalPlan dag = new LogicalPlan(); + String testWorkDir = new File("target").getAbsolutePath(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(testWorkDir + "/localBasePath", testWorkDir, null)); EvenOddIntegerGeneratorInputOperator generator = dag.addOperator("NumberGenerator", EvenOddIntegerGeneratorInputOperator.class); final CollectorModule<Number> collector = dag.addOperator("NumberCollector", new CollectorModule<Number>()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java index bada257..0393394 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/ProcessingModeTests.java @@ -15,7 +15,10 @@ */ package com.datatorrent.stram.engine; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; + +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; @@ -75,6 +78,8 @@ public class ProcessingModeTests dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); + String workingDir = new File("target/testLinearInputOperatorRecovery").getAbsolutePath(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class); rip.setMaximumTuples(maxTuples); rip.setSimulateFailure(true); @@ -97,6 +102,8 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testLinearOperatorRecovery").getAbsolutePath(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); @@ -121,6 +128,8 @@ public class ProcessingModeTests CollectorOperator.duplicates.clear(); LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/testLinearInlineOperatorsRecovery").getAbsolutePath(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300); dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java index 754b150..26515d4 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/SliderTest.java @@ -15,6 +15,9 @@ */ package com.datatorrent.stram.engine; +import java.io.File; + +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; import org.junit.Assert; import org.junit.Test; @@ -133,6 +136,8 @@ public class SliderTest private void test(int applicationWindowCount, int slideByWindowCount) throws Exception { LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/sliderTest").getAbsolutePath(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 100); Input input = dag.addOperator("Input", new Input()); Sum sum = dag.addOperator("Sum", new Sum()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java index 0019f56..0ededd4 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StatsTest.java @@ -15,6 +15,7 @@ */ package com.datatorrent.stram.engine; +import java.io.File; import java.io.Serializable; import java.util.*; @@ -23,12 +24,14 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.api.Stats.OperatorStats.PortStats; import com.datatorrent.api.StatsListener; +import com.datatorrent.common.util.AsyncFSStorageAgent; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.StramLocalCluster; import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.engine.StatsTest.TestCollector.TestCollectorStatsListener; @@ -170,7 +173,8 @@ public class StatsTest { int tupleCount = 10; LogicalPlan dag = new LogicalPlan(); - + String workingDir = new File("target").getAbsolutePath(); + dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); TestInputStatsListener testInputStatsListener = new TestInputStatsListener(); dag.setAttribute(testOper, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{testInputStatsListener})); @@ -225,6 +229,8 @@ public class StatsTest private void baseTestForQueueSize(int maxTuples, TestCollectorStatsListener statsListener, DAG.Locality locality) throws Exception { LogicalPlan dag = new LogicalPlan(); + String workingDir = new File("target/baseTestForQueueSize").getAbsolutePath(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200); TestOperator testOper = dag.addOperator("TestOperator", TestOperator.class); testOper.setMaxTuples(maxTuples); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java index a6897e0..4f7b842 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/WindowGeneratorTest.java @@ -15,6 +15,7 @@ */ package com.datatorrent.stram.engine; +import java.io.File; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -24,12 +25,11 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; + +import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Sink; import com.datatorrent.bufferserver.packet.MessageType; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; @@ -305,7 +305,8 @@ public class WindowGeneratorTest { logger.info("Testing Out of Sequence Error"); LogicalPlan dag = new LogicalPlan(); - + String workingDir = new File("target/testOutofSequenceError").getAbsolutePath(); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator()); MyLogger ml = dag.addOperator("logger", new MyLogger()); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29eb6c37/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java index 49c7844..9b8f0b2 100644 --- a/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java +++ b/engine/src/test/java/com/datatorrent/stram/webapp/StramWebServicesTest.java @@ -56,6 +56,8 @@ import static org.junit.Assert.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import com.datatorrent.api.Context; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.StramAppContext; import com.datatorrent.stram.StreamingContainerManager; import com.datatorrent.stram.plan.logical.LogicalPlan; @@ -125,7 +127,9 @@ public class StramWebServicesTest extends JerseyTest protected void configureServlets() { LogicalPlan dag = new LogicalPlan(); - dag.setAttribute(LogicalPlan.APPLICATION_PATH, new File("target", StramWebServicesTest.class.getName()).getAbsolutePath()); + String workingDir = new File("target", StramWebServicesTest.class.getName()).getAbsolutePath(); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, workingDir); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir + "/localPath", workingDir, null)); final DummyStreamingContainerManager streamingContainerManager = new DummyStreamingContainerManager(dag); appContext = new TestAppContext();
