Repository: apex-core Updated Branches: refs/heads/master 412a3bd81 -> 88bf33627
APEXCORE-575 Improve application restart time. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8825f5fa Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8825f5fa Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8825f5fa Branch: refs/heads/master Commit: 8825f5fa3e22beaf360f111f37ec0c4dba24ad1c Parents: 9054fd2 Author: Tushar R. Gosavi <[email protected]> Authored: Mon Feb 27 17:23:20 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Tue Apr 4 01:03:21 2017 +0530 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 14 +- .../datatorrent/common/util/FSStorageAgent.java | 11 + .../apex/common/util/AsyncStorageAgent.java | 54 +++++ .../apex/common/util/CascadeStorageAgent.java | 202 +++++++++++++++++++ .../common/util/CascadeStorageAgentTest.java | 116 +++++++++++ .../java/com/datatorrent/stram/StramClient.java | 12 +- .../stram/StreamingContainerManager.java | 49 +++-- .../java/com/datatorrent/stram/engine/Node.java | 17 +- .../stram/plan/physical/PhysicalPlan.java | 9 +- .../datatorrent/stram/StramRecoveryTest.java | 45 ++++- 10 files changed, 493 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index 24d850e..0c389a4 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -30,6 +30,7 @@ import java.util.EnumSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.AsyncStorageAgent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; @@ -45,7 +46,7 @@ import com.google.common.base.Throwables; * * @since 3.1.0 */ -public class AsyncFSStorageAgent extends FSStorageAgent +public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageAgent { private final transient Configuration conf; private transient volatile String localBasePath; @@ -146,6 +147,16 @@ public class AsyncFSStorageAgent extends FSStorageAgent } @Override + public void finalize(int operatorId, long windowId) throws IOException + { + // Checkpoint already present in HDFS during save, when syncCheckpoint is true. + if (isSyncCheckpoint()) { + return; + } + copyToHDFS(operatorId, windowId); + } + + @Override public Object readResolve() throws ObjectStreamException { AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.path, null); @@ -153,6 +164,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent return asyncFSStorageAgent; } + @Override public boolean isSyncCheckpoint() { return syncCheckpoint; http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java index fe90b86..b5a43fe 100644 --- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java @@ -18,6 +18,7 @@ */ package com.datatorrent.common.util; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.ObjectStreamException; @@ -153,6 +154,16 @@ public class FSStorageAgent implements StorageAgent, Serializable public long[] getWindowIds(int operatorId) throws IOException { Path lPath = new Path(path + Path.SEPARATOR + String.valueOf(operatorId)); + try { + FileStatus status = fileContext.getFileStatus(lPath); + if (!status.isDirectory()) { + throw new IOException("Checkpoint location is not a directory "); + } + } catch (FileNotFoundException ex) { + // During initialization this directory may not exists. + // return an empty array. + return new long[0]; + } RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath); if (!fileStatusRemoteIterator.hasNext()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java new file mode 100644 index 0000000..632a7f2 --- /dev/null +++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.common.util; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.datatorrent.api.StorageAgent; + +/** + * Storage agent which can take checkpoints asynchronously. + * An AsyncStorageAgent enables quick checkpoints by taking local snapshot of an operator + * and unblocking the operator to process more data, while storage engine is pushing local snapshot to + * the distributed or globally accessible location for recovery. + */ [email protected] +public interface AsyncStorageAgent extends StorageAgent +{ + /** + * Make checkpoint for given windowID final. i.e after this method returns, + * the checkpoint is accessible for recovery. + * + * @param operatorId + * @param windowId + * @throws IOException + */ + public void finalize(int operatorId, long windowId) throws IOException; + + /** + * Check if StorageAgent is configured to take synchronous checkpoints. + * + * @return true if StorageAgent is configured to take synchronous checkpoints. + * @return false otherwise. + */ + public boolean isSyncCheckpoint(); + +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java new file mode 100644 index 0000000..d6fec8e --- /dev/null +++ b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.common.util; + +import java.io.IOException; +import java.io.ObjectStreamException; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.StorageAgent; + +/** + * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store + * the checkpoint, and use the parent agent to read old checkpoints. For application having + * large number of physical operators, the size and number of files to be copied could be + * large impacting application restart time. This storage-agent is used during application + * restart to avoiding copying checkpoints from old application directory to improve application + * restart time. + */ +public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable +{ + private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class); + private final StorageAgent parent; + private final StorageAgent current; + private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap; + + public CascadeStorageAgent(StorageAgent parent, StorageAgent current) + { + this.parent = parent; + this.current = current; + oldOperatorToWindowIdsMap = Maps.newConcurrentMap(); + } + + /** + * does the checkpoint belong to parent + */ + private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException + { + long[] wids = getParentWindowIds(operatorId); + if (wids.length != 0) { + return (wid <= wids[wids.length - 1]); + } + return false; + } + + /** + * Return window-id of checkpoints available in old storage agent. This function + * will call getWindowIds of old storage agent only once for the fist time, and + * return cached data for next calls for same operator. + * + * @param operatorId + * @return + * @throws IOException + */ + private long[] getParentWindowIds(int operatorId) throws IOException + { + long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId); + if (oldWindowIds == null) { + oldWindowIds = parent.getWindowIds(operatorId); + if (oldWindowIds == null) { + oldWindowIds = new long[0]; + } + Arrays.sort(oldWindowIds); + oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds); + logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds)); + } + return oldWindowIds; + } + + /** + * Save object in current storage agent. This should not modify old storage agent + * in any way. + * + * @param object - The operator whose state needs to be saved. + * @param operatorId - Identifier of the operator. + * @param windowId - Identifier for the specific state of the operator. + * @throws IOException + */ + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + current.save(object, operatorId, windowId); + } + + /** + * Delete old checkpoints from the storage agent. + * + * The checkpoints are deleted from current directory if it is present in current + * storage agent. and cached state for old storage agent is removed. + * + * @param operatorId + * @param windowId + * @throws IOException + */ + @Override + public void delete(int operatorId, long windowId) throws IOException + { + if (!isCheckpointFromParent(operatorId, windowId)) { + current.delete(operatorId, windowId); + } + } + + /** + * Load checkpoint from storage agents. Do a basic comparision of windowIds + * to check the storage agent which has the checkpoint. + * + * @param operatorId Id for which the object was previously saved + * @param windowId WindowId for which the object was previously saved + * @return + * @throws IOException + */ + @Override + public Object load(int operatorId, long windowId) throws IOException + { + long[] oldWindowIds = getParentWindowIds(operatorId); + if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) { + return parent.load(operatorId, windowId); + } + return current.load(operatorId, windowId); + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + long[] currentIds = current.getWindowIds(operatorId); + long[] oldWindowIds = getParentWindowIds(operatorId); + return merge(currentIds, oldWindowIds); + } + + private static final long[] EMPTY_LONG_ARRAY = new long[0]; + private long[] merge(long[] currentIds, long[] oldWindowIds) + { + if (currentIds == null && oldWindowIds == null) { + return EMPTY_LONG_ARRAY; + } + if (currentIds == null) { + return oldWindowIds; + } + if (oldWindowIds == null) { + return currentIds; + } + long[] mergedArray = new long[currentIds.length + oldWindowIds.length]; + System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length); + System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length); + Arrays.sort(mergedArray); + return mergedArray; + } + + @Override + public void finalize(int operatorId, long windowId) throws IOException + { + if (current instanceof AsyncStorageAgent) { + ((AsyncStorageAgent)current).finalize(operatorId, windowId); + } + } + + @Override + public boolean isSyncCheckpoint() + { + if (parent instanceof AsyncStorageAgent) { + return ((AsyncStorageAgent)parent).isSyncCheckpoint(); + } + return true; + } + + public Object readResolve() throws ObjectStreamException + { + return new CascadeStorageAgent(parent, current); + } + + public StorageAgent getCurrentStorageAgent() + { + return current; + } + + public StorageAgent getParentStorageAgent() + { + return parent; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java new file mode 100644 index 0000000..40f24f0 --- /dev/null +++ b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.util; + +import java.io.File; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.apex.common.util.CascadeStorageAgent; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; + +public class CascadeStorageAgentTest +{ + + static class TestMeta extends TestWatcher + { + String applicationPath; + + @Override + protected void starting(Description description) + { + super.starting(description); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + try { + FileUtils.forceMkdir(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSingleIndirection() throws IOException + { + String oldAppPath = testMeta.applicationPath; + FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null); + storageAgent.save("1", 1, 1); + storageAgent.save("2", 1, 2); + storageAgent.save("3", 2, 1); + + String newAppPath = oldAppPath + ".new"; + CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); + long[] operatorIds = cascade.getWindowIds(1); + Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L}); + + operatorIds = cascade.getWindowIds(2); + Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L}); + + /* save should happen to new location */ + cascade.save("4", 1, 4); + FileContext fileContext = FileContext.getFileContext(); + Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 4))); + Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); + + // check for delete, + // delete for old checkpoint should be ignored + cascade.save("5", 1, 5); + cascade.delete(1, 2L); + Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 2))); + cascade.delete(1, 4L); + Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); + + /* chaining of storage agent */ + String latestAppPath = oldAppPath + ".latest"; + cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); + CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath, null)); + operatorIds = latest.getWindowIds(1); + Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5}); + + latest.save("6", 1, 6); + Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 6))); + Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 6))); + Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new Path(latestAppPath + "/" + 1 + "/" + 6))); + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StramClient.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index dad42e3..b280aad 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -257,6 +257,7 @@ public class StramClient public void copyInitialState(Path origAppDir) throws IOException { // locate previous snapshot + long copyStart = System.currentTimeMillis(); String newAppDir = this.dag.assertAppPath(); FSRecoveryHandler recoveryHandler = new FSRecoveryHandler(origAppDir.toString(), conf); @@ -284,6 +285,7 @@ public class StramClient logOs.close(); logIs.close(); + List<String> excludeDirs = Arrays.asList(LogicalPlan.SUBDIR_CHECKPOINTS, LogicalPlan.SUBDIR_EVENTS, LogicalPlan.SUBDIR_STATS); // copy sub directories that are not present in target FileStatus[] lFiles = fs.listStatus(origAppDir); @@ -298,19 +300,19 @@ public class StramClient String newAppDirPath = Path.getPathWithoutSchemeAndAuthority(new Path(newAppDir)).toString(); for (FileStatus f : lFiles) { - if (f.isDirectory()) { + if (f.isDirectory() && !excludeDirs.contains(f.getPath().getName())) { String targetPath = f.getPath().toString().replace(origAppDirPath, newAppDirPath); if (!fs.exists(new Path(targetPath))) { - LOG.debug("Copying {} to {}", f.getPath(), targetPath); + LOG.debug("Copying {} size {} to {}", f.getPath(), f.getLen(), targetPath); + long start = System.currentTimeMillis(); FileUtil.copy(fs, f.getPath(), fs, new Path(targetPath), false, conf); - //FSUtil.copy(fs, f, fs, new Path(targetPath), false, false, conf); + LOG.debug("Copying {} to {} took {} ms", f.getPath(), f.getLen(), targetPath, System.currentTimeMillis() - start); } else { LOG.debug("Ignoring {} as it already exists under {}", f.getPath(), targetPath); - //FSUtil.setPermission(fs, new Path(targetPath), new FsPermission((short)0777)); } } } - + LOG.info("Copying initial state took {} ms", System.currentTimeMillis() - copyStart); } /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index c68df14..51e85f7 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -65,6 +65,7 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.CascadeStorageAgent; import org.apache.apex.engine.plugin.ApexPluginDispatcher; import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; import org.apache.commons.io.IOUtils; @@ -3238,23 +3239,43 @@ public class StreamingContainerManager implements PlanContext this.finals = new FinalVars(finals, lp); StorageAgent sa = lp.getValue(OperatorContext.STORAGE_AGENT); - if (sa instanceof AsyncFSStorageAgent) { - // replace the default storage agent, if present - AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa; - if (fssa.path.contains(oldAppId)) { - fssa = new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf); - lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa); - } - } else if (sa instanceof FSStorageAgent) { - // replace the default storage agent, if present - FSStorageAgent fssa = (FSStorageAgent)sa; - if (fssa.path.contains(oldAppId)) { - fssa = new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf); - lp.setAttribute(OperatorContext.STORAGE_AGENT, fssa); - } + lp.setAttribute(OperatorContext.STORAGE_AGENT, updateStorageAgent(sa, oldAppId, appId, conf)); + } + } + + private static StorageAgent updateStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf) + { + if (sa instanceof AsyncFSStorageAgent || sa instanceof FSStorageAgent) { + FSStorageAgent newAgent = (FSStorageAgent)updateFSStorageAgent(sa, oldAppId, appId, conf); + if (newAgent != sa) { + return new CascadeStorageAgent(sa, newAgent); } + } else if (sa instanceof CascadeStorageAgent) { + CascadeStorageAgent csa = (CascadeStorageAgent)sa; + StorageAgent currentStorageAgent = csa.getCurrentStorageAgent(); + return new CascadeStorageAgent(csa, updateFSStorageAgent(currentStorageAgent, oldAppId, appId, conf)); } + return sa; + } + /** + * Return updated FileSystem based storage agent. Storage agent is updated only when + * they use application directory to store the checkpoints. + */ + private static StorageAgent updateFSStorageAgent(StorageAgent sa, String oldAppId, String appId, Configuration conf) + { + if (sa instanceof AsyncFSStorageAgent) { + AsyncFSStorageAgent fssa = (AsyncFSStorageAgent)sa; + if (fssa.path.contains(oldAppId)) { + return new AsyncFSStorageAgent(fssa.path.replace(oldAppId, appId), conf); + } + } else if (sa instanceof FSStorageAgent) { + FSStorageAgent fssa = (FSStorageAgent)sa; + if (fssa.path.contains(oldAppId)) { + return new FSStorageAgent(fssa.path.replace(oldAppId, appId), conf); + } + } + return sa; } public interface RecoveryHandler http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index d779afe..c84a249 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -48,6 +48,7 @@ import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.AsyncStorageAgent; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Throwables; @@ -70,7 +71,6 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.StatsListener.OperatorRequest; import com.datatorrent.api.StorageAgent; import com.datatorrent.bufferserver.util.Codec; -import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.Pair; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.OperatorDeployInfo; @@ -519,16 +519,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpointStats = new Stats.CheckpointStats(); checkpointStats.checkpointStartTime = System.currentTimeMillis(); ba.save(operator, id, windowId); - if (ba instanceof AsyncFSStorageAgent) { - AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)ba; - if (!asyncFSStorageAgent.isSyncCheckpoint()) { + if (ba instanceof AsyncStorageAgent) { + AsyncStorageAgent asyncStorageAgent = (AsyncStorageAgent)ba; + if (!asyncStorageAgent.isSyncCheckpoint()) { if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo(); checkpointWindowInfo.windowId = windowId; checkpointWindowInfo.applicationWindowCount = applicationWindowCount; checkpointWindowInfo.checkpointWindowCount = checkpointWindowCount; CheckpointHandler checkpointHandler = new CheckpointHandler(); - checkpointHandler.agent = asyncFSStorageAgent; + checkpointHandler.agent = asyncStorageAgent; checkpointHandler.operatorId = id; checkpointHandler.windowId = windowId; checkpointHandler.stats = checkpointStats; @@ -539,7 +539,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpointStats = null; return; } else { - asyncFSStorageAgent.copyToHDFS(id, windowId); + asyncStorageAgent.finalize(id, windowId); } } } @@ -680,8 +680,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera private class CheckpointHandler implements Callable<Stats.CheckpointStats> { - - public AsyncFSStorageAgent agent; + public AsyncStorageAgent agent; public int operatorId; public long windowId; public Stats.CheckpointStats stats; @@ -689,7 +688,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera @Override public Stats.CheckpointStats call() throws Exception { - agent.copyToHDFS(id, windowId); + agent.finalize(id, windowId); stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime; return stats; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index ce22bfd..7547654 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.AsyncStorageAgent; import org.apache.commons.lang.StringUtils; import com.google.common.annotations.VisibleForTesting; @@ -68,7 +69,6 @@ import com.datatorrent.api.StatsListener.OperatorRequest; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.StreamCodec; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.stram.Journal.Recoverable; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.StramEvent; @@ -1226,11 +1226,8 @@ public class PhysicalPlan implements Serializable long windowId = oper.isOperatorStateLess() ? Stateless.WINDOW_ID : checkpoint.windowId; StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT); agent.save(oo, oper.id, windowId); - if (agent instanceof AsyncFSStorageAgent) { - AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent)agent; - if (!asyncFSStorageAgent.isSyncCheckpoint()) { - asyncFSStorageAgent.copyToHDFS(oper.id, windowId); - } + if (agent instanceof AsyncStorageAgent) { + ((AsyncStorageAgent)agent).finalize(oper.id, windowId); } } catch (IOException e) { // inconsistent state, no recovery option, requires shutdown http://git-wip-us.apache.org/repos/asf/apex-core/blob/8825f5fa/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 645598d..2f46049 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -44,6 +44,7 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.common.util.CascadeStorageAgent; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; @@ -428,6 +429,7 @@ public class StramRecoveryTest o1p1.getContainer().setExternalId("cid1"); scm.writeJournal(o1p1.getContainer().getSetContainerState()); + /* simulate application restart from app1 */ dag = new LogicalPlan(); dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath2); dag.setAttribute(LogicalPlan.APPLICATION_ID, appId2); @@ -447,9 +449,50 @@ public class StramRecoveryTest o1p1 = plan.getOperators(dag.getOperatorMeta("o1")).get(0); assertEquals("journal copied", "cid1", o1p1.getContainer().getExternalId()); - ids = new FSStorageAgent(appPath2 + "/" + LogicalPlan.SUBDIR_CHECKPOINTS, new Configuration()).getWindowIds(o1p1.getId()); + CascadeStorageAgent csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT); + Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class); + Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass()); + Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), agent.getClass()); + /* parent and current points to expected location */ + Assert.assertEquals(true, ((FSStorageAgent)csa.getParentStorageAgent()).path.contains("app1")); + Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app2")); + + ids = csa.getWindowIds(o1p1.getId()); Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids); + + /* simulate another application restart from app2 */ + String appId3 = "app3"; + String appPath3 = testMeta.getPath() + "/" + appId3; + dag = new LogicalPlan(); + dag.setAttribute(LogicalPlan.APPLICATION_PATH, appPath3); + dag.setAttribute(LogicalPlan.APPLICATION_ID, appId3); + sc = new StramClient(new Configuration(), dag); + try { + sc.start(); + sc.copyInitialState(new Path(appPath2)); // copy state from app2. + } finally { + sc.stop(); + } + scm = StreamingContainerManager.getInstance(new FSRecoveryHandler(dag.assertAppPath(), new Configuration(false)), dag, false); + plan = scm.getPhysicalPlan(); + dag = plan.getLogicalPlan(); + + csa = (CascadeStorageAgent)dag.getAttributes().get(OperatorContext.STORAGE_AGENT); + Assert.assertEquals("storage agent is replaced by cascade", csa.getClass(), CascadeStorageAgent.class); + Assert.assertEquals("current storage agent is of same type", csa.getCurrentStorageAgent().getClass(), agent.getClass()); + Assert.assertEquals("parent storage agent is of same type ", csa.getParentStorageAgent().getClass(), CascadeStorageAgent.class); + + CascadeStorageAgent parent = (CascadeStorageAgent)csa.getParentStorageAgent(); + Assert.assertEquals("current storage agent is of same type ", parent.getCurrentStorageAgent().getClass(), agent.getClass()); + Assert.assertEquals("parent storage agent is cascade ", parent.getParentStorageAgent().getClass(), agent.getClass()); + /* verify paths */ + Assert.assertEquals(true, ((FSStorageAgent)parent.getParentStorageAgent()).path.contains("app1")); + Assert.assertEquals(true, ((FSStorageAgent)parent.getCurrentStorageAgent()).path.contains("app2")); + Assert.assertEquals(true, ((FSStorageAgent)csa.getCurrentStorageAgent()).path.contains("app3")); + + ids = csa.getWindowIds(o1p1.getId()); + Assert.assertArrayEquals("checkpoints copied", new long[] {o1p1.getRecoveryCheckpoint().getWindowId()}, ids); } @Test
