Repository: apex-core Updated Branches: refs/heads/master 25e4c4c51 -> 6cb3e3510
APEXCORE-709 Refactor code chagnes made through APEXCORE-575 Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/6cb3e351 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/6cb3e351 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/6cb3e351 Branch: refs/heads/master Commit: 6cb3e3510060e23f1519d2f91a629d8df38e4431 Parents: 25e4c4c Author: Tushar R. Gosavi <[email protected]> Authored: Mon Apr 17 13:33:19 2017 +0530 Committer: Tushar R. Gosavi <[email protected]> Committed: Fri Apr 21 11:13:48 2017 +0530 ---------------------------------------------------------------------- .../common/util/AsyncFSStorageAgent.java | 2 +- .../datatorrent/common/util/FSStorageAgent.java | 15 +- .../apex/common/util/AsyncStorageAgent.java | 2 +- .../apex/common/util/CascadeStorageAgent.java | 202 ------------------ .../common/util/CascadeStorageAgentTest.java | 116 ----------- .../stram/StreamingContainerManager.java | 2 +- .../java/com/datatorrent/stram/engine/Node.java | 4 +- .../stram/plan/physical/PhysicalPlan.java | 2 +- .../apex/engine/util/CascadeStorageAgent.java | 208 +++++++++++++++++++ .../datatorrent/stram/StramRecoveryTest.java | 2 +- .../stram/StreamingContainerManagerTest.java | 20 -- .../engine/util/CascadeStorageAgentTest.java | 116 +++++++++++ 12 files changed, 337 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java index 0c389a4..c8275ea 100644 --- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java @@ -147,7 +147,7 @@ public class AsyncFSStorageAgent extends FSStorageAgent implements AsyncStorageA } @Override - public void finalize(int operatorId, long windowId) throws IOException + public void flush(int operatorId, long windowId) throws IOException { // Checkpoint already present in HDFS during save, when syncCheckpoint is true. if (isSyncCheckpoint()) { http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java index b5a43fe..4b71761 100644 --- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; @@ -157,27 +158,23 @@ public class FSStorageAgent implements StorageAgent, Serializable try { FileStatus status = fileContext.getFileStatus(lPath); if (!status.isDirectory()) { - throw new IOException("Checkpoint location is not a directory "); + throw new RuntimeException("Checkpoint location is not a directory"); } } catch (FileNotFoundException ex) { - // During initialization this directory may not exists. - // return an empty array. - return new long[0]; + // During initialization checkpoint directory may not exists. + fileContext.mkdir(lPath, FsPermission.getDirDefault(), true); } RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath); - if (!fileStatusRemoteIterator.hasNext()) { - throw new IOException("Storage Agent has not saved anything yet!"); - } List<Long> lwindows = new ArrayList<>(); - do { + while (fileStatusRemoteIterator.hasNext()) { FileStatus fileStatus = fileStatusRemoteIterator.next(); String name = fileStatus.getPath().getName(); if (name.equals(TMP_FILE)) { continue; } lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16)); - } while (fileStatusRemoteIterator.hasNext()); + } long[] windowIds = new long[lwindows.size()]; for (int i = 0; i < windowIds.length; i++) { windowIds[i] = lwindows.get(i); http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java index 337ccdd..f797b92 100644 --- a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java +++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java @@ -41,7 +41,7 @@ public interface AsyncStorageAgent extends StorageAgent * @param windowId * @throws IOException */ - void finalize(int operatorId, long windowId) throws IOException; + void flush(int operatorId, long windowId) throws IOException; /** * Check if StorageAgent is configured to take synchronous checkpoints. http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java deleted file mode 100644 index d6fec8e..0000000 --- a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java +++ /dev/null @@ -1,202 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.common.util; - -import java.io.IOException; -import java.io.ObjectStreamException; -import java.io.Serializable; -import java.util.Arrays; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.StorageAgent; - -/** - * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store - * the checkpoint, and use the parent agent to read old checkpoints. For application having - * large number of physical operators, the size and number of files to be copied could be - * large impacting application restart time. This storage-agent is used during application - * restart to avoiding copying checkpoints from old application directory to improve application - * restart time. - */ -public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable -{ - private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class); - private final StorageAgent parent; - private final StorageAgent current; - private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap; - - public CascadeStorageAgent(StorageAgent parent, StorageAgent current) - { - this.parent = parent; - this.current = current; - oldOperatorToWindowIdsMap = Maps.newConcurrentMap(); - } - - /** - * does the checkpoint belong to parent - */ - private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException - { - long[] wids = getParentWindowIds(operatorId); - if (wids.length != 0) { - return (wid <= wids[wids.length - 1]); - } - return false; - } - - /** - * Return window-id of checkpoints available in old storage agent. This function - * will call getWindowIds of old storage agent only once for the fist time, and - * return cached data for next calls for same operator. - * - * @param operatorId - * @return - * @throws IOException - */ - private long[] getParentWindowIds(int operatorId) throws IOException - { - long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId); - if (oldWindowIds == null) { - oldWindowIds = parent.getWindowIds(operatorId); - if (oldWindowIds == null) { - oldWindowIds = new long[0]; - } - Arrays.sort(oldWindowIds); - oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds); - logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds)); - } - return oldWindowIds; - } - - /** - * Save object in current storage agent. This should not modify old storage agent - * in any way. - * - * @param object - The operator whose state needs to be saved. - * @param operatorId - Identifier of the operator. - * @param windowId - Identifier for the specific state of the operator. - * @throws IOException - */ - @Override - public void save(Object object, int operatorId, long windowId) throws IOException - { - current.save(object, operatorId, windowId); - } - - /** - * Delete old checkpoints from the storage agent. - * - * The checkpoints are deleted from current directory if it is present in current - * storage agent. and cached state for old storage agent is removed. - * - * @param operatorId - * @param windowId - * @throws IOException - */ - @Override - public void delete(int operatorId, long windowId) throws IOException - { - if (!isCheckpointFromParent(operatorId, windowId)) { - current.delete(operatorId, windowId); - } - } - - /** - * Load checkpoint from storage agents. Do a basic comparision of windowIds - * to check the storage agent which has the checkpoint. - * - * @param operatorId Id for which the object was previously saved - * @param windowId WindowId for which the object was previously saved - * @return - * @throws IOException - */ - @Override - public Object load(int operatorId, long windowId) throws IOException - { - long[] oldWindowIds = getParentWindowIds(operatorId); - if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) { - return parent.load(operatorId, windowId); - } - return current.load(operatorId, windowId); - } - - @Override - public long[] getWindowIds(int operatorId) throws IOException - { - long[] currentIds = current.getWindowIds(operatorId); - long[] oldWindowIds = getParentWindowIds(operatorId); - return merge(currentIds, oldWindowIds); - } - - private static final long[] EMPTY_LONG_ARRAY = new long[0]; - private long[] merge(long[] currentIds, long[] oldWindowIds) - { - if (currentIds == null && oldWindowIds == null) { - return EMPTY_LONG_ARRAY; - } - if (currentIds == null) { - return oldWindowIds; - } - if (oldWindowIds == null) { - return currentIds; - } - long[] mergedArray = new long[currentIds.length + oldWindowIds.length]; - System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length); - System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length); - Arrays.sort(mergedArray); - return mergedArray; - } - - @Override - public void finalize(int operatorId, long windowId) throws IOException - { - if (current instanceof AsyncStorageAgent) { - ((AsyncStorageAgent)current).finalize(operatorId, windowId); - } - } - - @Override - public boolean isSyncCheckpoint() - { - if (parent instanceof AsyncStorageAgent) { - return ((AsyncStorageAgent)parent).isSyncCheckpoint(); - } - return true; - } - - public Object readResolve() throws ObjectStreamException - { - return new CascadeStorageAgent(parent, current); - } - - public StorageAgent getCurrentStorageAgent() - { - return current; - } - - public StorageAgent getParentStorageAgent() - { - return parent; - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java deleted file mode 100644 index 40f24f0..0000000 --- a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.common.util; - -import java.io.File; -import java.io.IOException; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.apex.common.util.CascadeStorageAgent; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; - -import com.datatorrent.api.Attribute; -import com.datatorrent.api.DAG; - -public class CascadeStorageAgentTest -{ - - static class TestMeta extends TestWatcher - { - String applicationPath; - - @Override - protected void starting(Description description) - { - super.starting(description); - applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); - try { - FileUtils.forceMkdir(new File("target/" + description.getClassName())); - } catch (IOException e) { - throw new RuntimeException(e); - } - Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); - attributes.put(DAG.APPLICATION_PATH, applicationPath); - } - - @Override - protected void finished(Description description) - { - try { - FileUtils.deleteDirectory(new File("target/" + description.getClassName())); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Test - public void testSingleIndirection() throws IOException - { - String oldAppPath = testMeta.applicationPath; - FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null); - storageAgent.save("1", 1, 1); - storageAgent.save("2", 1, 2); - storageAgent.save("3", 2, 1); - - String newAppPath = oldAppPath + ".new"; - CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); - long[] operatorIds = cascade.getWindowIds(1); - Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L}); - - operatorIds = cascade.getWindowIds(2); - Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L}); - - /* save should happen to new location */ - cascade.save("4", 1, 4); - FileContext fileContext = FileContext.getFileContext(); - Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 4))); - Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); - - // check for delete, - // delete for old checkpoint should be ignored - cascade.save("5", 1, 5); - cascade.delete(1, 2L); - Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 2))); - cascade.delete(1, 4L); - Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); - - /* chaining of storage agent */ - String latestAppPath = oldAppPath + ".latest"; - cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); - CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath, null)); - operatorIds = latest.getWindowIds(1); - Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5}); - - latest.save("6", 1, 6); - Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 6))); - Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 6))); - Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new Path(latestAppPath + "/" + 1 + "/" + 6))); - } -} http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 92fce54..ca2bd88 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -65,9 +65,9 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.common.util.CascadeStorageAgent; import org.apache.apex.engine.plugin.ApexPluginDispatcher; import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher; +import org.apache.apex.engine.util.CascadeStorageAgent; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ToStringBuilder; http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index c84a249..88b002f 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -539,7 +539,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpointStats = null; return; } else { - asyncStorageAgent.finalize(id, windowId); + asyncStorageAgent.flush(id, windowId); } } } @@ -688,7 +688,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera @Override public Stats.CheckpointStats call() throws Exception { - agent.finalize(id, windowId); + agent.flush(id, windowId); stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime; return stats; } http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java index f4e2100..ecc010c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java @@ -1227,7 +1227,7 @@ public class PhysicalPlan implements Serializable StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT); agent.save(oo, oper.id, windowId); if (agent instanceof AsyncStorageAgent) { - ((AsyncStorageAgent)agent).finalize(oper.id, windowId); + ((AsyncStorageAgent)agent).flush(oper.id, windowId); } } catch (IOException e) { // inconsistent state, no recovery option, requires shutdown http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java new file mode 100644 index 0000000..9903010 --- /dev/null +++ b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.util; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.Serializable; +import java.util.Arrays; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.common.util.AsyncStorageAgent; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.StorageAgent; + +/** + * A StorageAgent which chains two StorageAgent. It use the current storage-agent to store + * the checkpoint, and use the parent agent to read old checkpoints. For application having + * large number of physical operators, the size and number of files to be copied could be + * large impacting application restart time. This storage-agent is used during application + * restart to avoiding copying checkpoints from old application directory to improve application + * restart time. + */ [email protected] +public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable +{ + private static final long serialVersionUID = 985557590735264920L; + private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class); + private final StorageAgent parent; + private final StorageAgent current; + private transient Map<Integer, long[]> oldOperatorToWindowIdsMap; + + public CascadeStorageAgent(StorageAgent parent, StorageAgent current) + { + this.parent = parent; + this.current = current; + oldOperatorToWindowIdsMap = Maps.newConcurrentMap(); + } + + /** + * does the checkpoint belong to parent + */ + private boolean isCheckpointFromParent(int operatorId, long wid) throws IOException + { + long[] wids = getParentWindowIds(operatorId); + if (wids.length != 0) { + return (wid <= wids[wids.length - 1]); + } + return false; + } + + /** + * Return window-id of checkpoints available in old storage agent. This function + * will call getWindowIds of old storage agent only once for the fist time, and + * return cached data for next calls for same operator. + * + * @param operatorId + * @return + * @throws IOException + */ + private long[] getParentWindowIds(int operatorId) throws IOException + { + long[] oldWindowIds = oldOperatorToWindowIdsMap.get(operatorId); + if (oldWindowIds == null) { + oldWindowIds = parent.getWindowIds(operatorId); + if (oldWindowIds == null) { + oldWindowIds = new long[0]; + } + Arrays.sort(oldWindowIds); + oldOperatorToWindowIdsMap.put(operatorId, oldWindowIds); + logger.debug("CascadeStorageAgent window ids from old storage agent op {} wids {}", operatorId, Arrays.toString(oldWindowIds)); + } + return oldWindowIds; + } + + /** + * Save object in current storage agent. This should not modify old storage agent + * in any way. + * + * @param object - The operator whose state needs to be saved. + * @param operatorId - Identifier of the operator. + * @param windowId - Identifier for the specific state of the operator. + * @throws IOException + */ + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + current.save(object, operatorId, windowId); + } + + /** + * Delete old checkpoints from the storage agent. + * + * The checkpoints are deleted from current directory if it is present in current + * storage agent. and cached state for old storage agent is removed. + * + * @param operatorId + * @param windowId + * @throws IOException + */ + @Override + public void delete(int operatorId, long windowId) throws IOException + { + if (!isCheckpointFromParent(operatorId, windowId)) { + current.delete(operatorId, windowId); + } + } + + /** + * Load checkpoint from storage agents. Do a basic comparision of windowIds + * to check the storage agent which has the checkpoint. + * + * @param operatorId Id for which the object was previously saved + * @param windowId WindowId for which the object was previously saved + * @return + * @throws IOException + */ + @Override + public Object load(int operatorId, long windowId) throws IOException + { + long[] oldWindowIds = getParentWindowIds(operatorId); + if (oldWindowIds.length >= 1 && windowId <= oldWindowIds[oldWindowIds.length - 1]) { + return parent.load(operatorId, windowId); + } + return current.load(operatorId, windowId); + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + long[] currentIds = current.getWindowIds(operatorId); + long[] oldWindowIds = getParentWindowIds(operatorId); + return merge(currentIds, oldWindowIds); + } + + private static final long[] EMPTY_LONG_ARRAY = new long[0]; + private long[] merge(long[] currentIds, long[] oldWindowIds) + { + if (currentIds == null && oldWindowIds == null) { + return EMPTY_LONG_ARRAY; + } + if (currentIds == null) { + return oldWindowIds; + } + if (oldWindowIds == null) { + return currentIds; + } + long[] mergedArray = new long[currentIds.length + oldWindowIds.length]; + System.arraycopy(currentIds, 0, mergedArray, 0, currentIds.length); + System.arraycopy(oldWindowIds, 0, mergedArray, currentIds.length, oldWindowIds.length); + Arrays.sort(mergedArray); + return mergedArray; + } + + @Override + public void flush(int operatorId, long windowId) throws IOException + { + if (current instanceof AsyncStorageAgent) { + ((AsyncStorageAgent)current).flush(operatorId, windowId); + } + } + + @Override + public boolean isSyncCheckpoint() + { + if (parent instanceof AsyncStorageAgent) { + return ((AsyncStorageAgent)parent).isSyncCheckpoint(); + } + return true; + } + + private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException + { + input.defaultReadObject(); + oldOperatorToWindowIdsMap = Maps.newConcurrentMap(); + } + + public StorageAgent getCurrentStorageAgent() + { + return current; + } + + public StorageAgent getParentStorageAgent() + { + return parent; + } +} http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java index 2f46049..177e3fa 100644 --- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java @@ -44,7 +44,7 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.common.util.CascadeStorageAgent; +import org.apache.apex.engine.util.CascadeStorageAgent; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index cb2d760..53f18f9 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -468,16 +468,6 @@ public class StreamingContainerManagerTest long[] windowsIds = sa.getWindowIds(1); Arrays.sort(windowsIds); Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds); - - for (long windowId : windowIds) { - sa.delete(1, windowId); - } - try { - sa.getWindowIds(1); - Assert.fail("There should not be any most recently saved windowId!"); - } catch (IOException io) { - Assert.assertTrue("No State Saved", true); - } } @Test @@ -495,16 +485,6 @@ public class StreamingContainerManagerTest long[] windowsIds = sa.getWindowIds(1); Arrays.sort(windowsIds); Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds); - - for (long windowId : windowIds) { - sa.delete(1, windowId); - } - try { - sa.getWindowIds(1); - Assert.fail("There should not be any most recently saved windowId!"); - } catch (IOException io) { - Assert.assertTrue("No State Saved", true); - } } @Test http://git-wip-us.apache.org/repos/asf/apex-core/blob/6cb3e351/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java new file mode 100644 index 0000000..43b5636 --- /dev/null +++ b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.engine.util; + +import java.io.File; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.DAG; +import com.datatorrent.common.util.FSStorageAgent; + +public class CascadeStorageAgentTest +{ + + static class TestMeta extends TestWatcher + { + String applicationPath; + + @Override + protected void starting(Description description) + { + super.starting(description); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + try { + FileUtils.forceMkdir(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testSingleIndirection() throws IOException + { + String oldAppPath = testMeta.applicationPath; + FSStorageAgent storageAgent = new FSStorageAgent(oldAppPath, null); + storageAgent.save("1", 1, 1); + storageAgent.save("2", 1, 2); + storageAgent.save("3", 2, 1); + + String newAppPath = oldAppPath + ".new"; + CascadeStorageAgent cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); + long[] operatorIds = cascade.getWindowIds(1); + Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L, 2L}); + + operatorIds = cascade.getWindowIds(2); + Assert.assertArrayEquals("Returned window ids ", operatorIds, new long[]{1L}); + + /* save should happen to new location */ + cascade.save("4", 1, 4); + FileContext fileContext = FileContext.getFileContext(); + Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 4))); + Assert.assertTrue("operator 1 window 4 file exists in new directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); + + // check for delete, + // delete for old checkpoint should be ignored + cascade.save("5", 1, 5); + cascade.delete(1, 2L); + Assert.assertTrue("operator 1 window 2 file exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 2))); + cascade.delete(1, 4L); + Assert.assertFalse("operator 1 window 4 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 4))); + + /* chaining of storage agent */ + String latestAppPath = oldAppPath + ".latest"; + cascade = new CascadeStorageAgent(storageAgent, new FSStorageAgent(newAppPath, null)); + CascadeStorageAgent latest = new CascadeStorageAgent(cascade, new FSStorageAgent(latestAppPath, null)); + operatorIds = latest.getWindowIds(1); + Assert.assertArrayEquals("Window ids ", operatorIds, new long[] {1,2,5}); + + latest.save("6", 1, 6); + Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(oldAppPath + "/" + 1 + "/" + 6))); + Assert.assertFalse("operator 1 window 6 file does not exists in old directory", fileContext.util().exists(new Path(newAppPath + "/" + 1 + "/" + 6))); + Assert.assertTrue("operator 1 window 6 file exists in new directory", fileContext.util().exists(new Path(latestAppPath + "/" + 1 + "/" + 6))); + } +}
