Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 2f97e7a40 -> a4b999de1
MLHR-1902 #comment renamed IdempotentStorageManager to WindowDataManager Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dae30a03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dae30a03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dae30a03 Branch: refs/heads/devel-3 Commit: dae30a03e53bebc404a3852c2c8d23c7619c5fd1 Parents: 2f97e7a Author: Chandni Singh <[email protected]> Authored: Wed Nov 18 13:26:21 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Wed Nov 18 14:17:17 2015 -0800 ---------------------------------------------------------------------- .../lib/io/IdempotentStorageManager.java | 3 +- .../datatorrent/lib/util/WindowDataManager.java | 404 +++++++++++++++++++ .../lib/io/IdempotentStorageManagerTest.java | 1 + .../lib/util/WindowDataManagerTest.java | 188 +++++++++ 4 files changed, 595 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java index 545398e..dae417d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java +++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java @@ -54,8 +54,9 @@ import com.datatorrent.common.util.FSStorageAgent; * application window boundaries. * * @since 2.0.0 + * @deprecated use {@link com.datatorrent.lib.util.WindowDataManager} */ - +@Deprecated public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext> { /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java new file mode 100644 index 0000000..26a2e32 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; + +import com.datatorrent.api.Component; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StorageAgent; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. + * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails. + * + * The order of tuples is guaranteed for ordered input sources. + * + * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow + * checkpoints to occur within an application window and checkpoints must be aligned with + * application window boundaries. + * + * @since 2.0.0 + */ +public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext> +{ + /** + * Gets the largest window for which there is recovery data. + */ + long getLargestRecoveryWindow(); + + /** + * When an operator can partition itself dynamically then there is no guarantee that an input state which was being + * handled by one instance previously will be handled by the same instance after partitioning. <br/> + * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no + * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X + * is re-hashed to another instance. <br/> + * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the + * operators for the window being replayed and fix it's state. + * + * @param windowId window id. + * @return mapping of operator id to the corresponding state + * @throws IOException + */ + Map<Integer, Object> load(long windowId) throws IOException; + + /** + * Delete the artifacts of the operator for windows <= windowId. + * + * @param operatorId operator id + * @param windowId window id + * @throws IOException + */ + void deleteUpTo(int operatorId, long windowId) throws IOException; + + /** + * This informs the idempotent storage manager that operator is partitioned so that it can set properties and + * distribute state. + * + * @param newManagers all the new idempotent storage managers. + * @param removedOperatorIds set of operator ids which were removed after partitioning. + */ + void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds); + + /** + * An {@link WindowDataManager} that uses FS to persist state. + */ + class FSWindowDataManager implements WindowDataManager + { + private static final String DEF_RECOVERY_PATH = "idempotentState"; + + protected transient FSStorageAgent storageAgent; + + /** + * Recovery path relative to app path where state is saved. + */ + @NotNull + private String recoveryPath; + + /** + * largest window for which there is recovery data across all physical operator instances. + */ + protected transient long largestRecoveryWindow; + + /** + * This is not null only for one physical instance.<br/> + * It consists of operator ids which have been deleted but have some state that can be replayed. + * Only one of the instances would be handling (modifying) the files that belong to this state. + */ + protected Set<Integer> deletedOperators; + + /** + * Sorted mapping from window id to all the operators that have state to replay for that window. + */ + protected final transient TreeMultimap<Long, Integer> replayState; + + protected transient FileSystem fs; + protected transient Path appPath; + + public FSWindowDataManager() + { + replayState = TreeMultimap.create(); + largestRecoveryWindow = Stateless.WINDOW_ID; + recoveryPath = DEF_RECOVERY_PATH; + } + + @Override + public void setup(Context.OperatorContext context) + { + Configuration configuration = new Configuration(); + appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + + try { + storageAgent = new FSStorageAgent(appPath.toString(), configuration); + + fs = FileSystem.newInstance(appPath.toUri(), configuration); + + if (fs.exists(appPath)) { + FileStatus[] fileStatuses = fs.listStatus(appPath); + + for (FileStatus operatorDirStatus : fileStatuses) { + int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); + + for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { + String fileName = status.getPath().getName(); + if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { + continue; + } + long windowId = Long.parseLong(fileName, 16); + replayState.put(windowId, operatorId); + if (windowId > largestRecoveryWindow) { + largestRecoveryWindow = windowId; + } + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + storageAgent.save(object, operatorId, windowId); + } + + @Override + public Object load(int operatorId, long windowId) throws IOException + { + Set<Integer> operators = replayState.get(windowId); + if (operators == null || !operators.contains(operatorId)) { + return null; + } + return storageAgent.load(operatorId, windowId); + } + + @Override + public void delete(int operatorId, long windowId) throws IOException + { + storageAgent.delete(operatorId, windowId); + } + + @Override + public Map<Integer, Object> load(long windowId) throws IOException + { + Set<Integer> operators = replayState.get(windowId); + if (operators == null) { + return null; + } + Map<Integer, Object> data = Maps.newHashMap(); + for (int operatorId : operators) { + data.put(operatorId, load(operatorId, windowId)); + } + return data; + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + Path operatorPath = new Path(appPath, String.valueOf(operatorId)); + if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) { + return null; + } + return storageAgent.getWindowIds(operatorId); + } + + /** + * This deletes all the recovery files of window ids <= windowId. + * + * @param operatorId operator id. + * @param windowId the largest window id for which the states will be deleted. + * @throws IOException + */ + @Override + public void deleteUpTo(int operatorId, long windowId) throws IOException + { + //deleting the replay state + if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { + Iterator<Long> windowsIterator = replayState.keySet().iterator(); + while (windowsIterator.hasNext()) { + long lwindow = windowsIterator.next(); + if (lwindow > windowId) { + break; + } + for (Integer loperator : replayState.removeAll(lwindow)) { + + if (deletedOperators.contains(loperator)) { + storageAgent.delete(loperator, lwindow); + + Path loperatorPath = new Path(appPath, Integer.toString(loperator)); + if (fs.listStatus(loperatorPath).length == 0) { + //The operator was deleted and it has nothing to replay. + deletedOperators.remove(loperator); + fs.delete(loperatorPath, true); + } + } else if (loperator == operatorId) { + storageAgent.delete(loperator, lwindow); + } + } + } + } + + if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) { + long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId); + Arrays.sort(windowsAfterReplay); + for (long lwindow : windowsAfterReplay) { + if (lwindow <= windowId) { + storageAgent.delete(operatorId, lwindow); + } + } + } + } + + @Override + public long getLargestRecoveryWindow() + { + return largestRecoveryWindow; + } + + @Override + public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds) + { + Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), + "there has to be one idempotent storage manager"); + FSWindowDataManager deletedOperatorsManager = null; + + if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) { + if (this.deletedOperators == null) { + this.deletedOperators = Sets.newHashSet(); + } + this.deletedOperators.addAll(removedOperatorIds); + } + + for (WindowDataManager storageManager : newManagers) { + + FSWindowDataManager lmanager = (FSWindowDataManager)storageManager; + lmanager.recoveryPath = this.recoveryPath; + lmanager.storageAgent = this.storageAgent; + + if (lmanager.deletedOperators != null) { + deletedOperatorsManager = lmanager; + } + //only one physical instance can manage deleted operators so clearing this field for rest of the instances. + if (lmanager != deletedOperatorsManager) { + lmanager.deletedOperators = null; + } + } + + if (removedOperatorIds == null || removedOperatorIds.isEmpty()) { + //Nothing to do + return; + } + if (this.deletedOperators != null) { + + /*If some operators were removed then there needs to be a manager which can clean there state when it is not + needed.*/ + if (deletedOperatorsManager == null) { + //None of the managers were handling deleted operators data. + deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next(); + deletedOperatorsManager.deletedOperators = Sets.newHashSet(); + } + + deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds); + } + } + + @Override + public void teardown() + { + try { + fs.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public String getRecoveryPath() + { + return recoveryPath; + } + + public void setRecoveryPath(String recoveryPath) + { + this.recoveryPath = recoveryPath; + } + } + + /** + * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators + * can use the same logic for maintaining idempotency and avoiding idempotency. + */ + class NoopWindowDataManager implements WindowDataManager + { + @Override + public long getLargestRecoveryWindow() + { + return Stateless.WINDOW_ID; + } + + @Override + public Map<Integer, Object> load(long windowId) throws IOException + { + return null; + } + + @Override + public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds) + { + } + + @Override + public void setup(Context.OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + } + + @Override + public Object load(int operatorId, long windowId) throws IOException + { + return null; + } + + @Override + public void delete(int operatorId, long windowId) throws IOException + { + } + + @Override + public void deleteUpTo(int operatorId, long windowId) throws IOException + { + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + return new long[0]; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java index f2461e6..347dabf 100644 --- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java @@ -46,6 +46,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; /** * Tests for {@link IdempotentStorageManager} */ +@Deprecated public class IdempotentStorageManagerTest { private static class TestMeta extends TestWatcher http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java new file mode 100644 index 0000000..fdca73e --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.util; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.lib.helper.OperatorContextTestHelper; + +/** + * Tests for {@link WindowDataManager} + */ +public class WindowDataManagerTest +{ + private static class TestMeta extends TestWatcher + { + + String applicationPath; + WindowDataManager.FSWindowDataManager storageManager; + Context.OperatorContext context; + + @Override + protected void starting(Description description) + { + super.starting(description); + storageManager = new WindowDataManager.FSWindowDataManager(); + applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); + + Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); + attributes.put(DAG.APPLICATION_PATH, applicationPath); + context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes); + + storageManager.setup(context); + } + + @Override + protected void finished(Description description) + { + storageManager.teardown(); + try { + FileUtils.deleteDirectory(new File("target/" + description.getClassName())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Test + public void testLargestRecoveryWindow() + { + Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow()); + } + + @Test + public void testSave() throws IOException + { + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + testMeta.storageManager.save(data, 1, 1); + testMeta.storageManager.setup(testMeta.context); + @SuppressWarnings("unchecked") + Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1); + Assert.assertEquals("dataOf1", data, decoded); + } + + @Test + public void testLoad() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageManager.save(dataOf1, 1, 1); + testMeta.storageManager.save(dataOf2, 2, 1); + testMeta.storageManager.setup(testMeta.context); + Map<Integer, Object> decodedStates = testMeta.storageManager.load(1); + Assert.assertEquals("no of states", 2, decodedStates.size()); + for (Integer operatorId : decodedStates.keySet()) { + if (operatorId == 1) { + Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1)); + } else { + Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2)); + } + } + } + + @Test + public void testRecovery() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + testMeta.storageManager.save(dataOf1, 1, 1); + testMeta.storageManager.save(dataOf2, 2, 2); + + testMeta.storageManager.setup(testMeta.context); + Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow()); + } + + @Test + public void testDelete() throws IOException + { + Map<Integer, String> dataOf1 = Maps.newHashMap(); + dataOf1.put(1, "one"); + dataOf1.put(2, "two"); + dataOf1.put(3, "three"); + + Map<Integer, String> dataOf2 = Maps.newHashMap(); + dataOf2.put(4, "four"); + dataOf2.put(5, "five"); + dataOf2.put(6, "six"); + + Map<Integer, String> dataOf3 = Maps.newHashMap(); + dataOf2.put(7, "seven"); + dataOf2.put(8, "eight"); + dataOf2.put(9, "nine"); + + testMeta.storageManager.save(dataOf1, 1, 1); + testMeta.storageManager.save(dataOf2, 2, 1); + testMeta.storageManager.save(dataOf3, 3, 1); + + testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager), + Sets.newHashSet(2, 3)); + testMeta.storageManager.setup(testMeta.context); + testMeta.storageManager.deleteUpTo(1, 1); + + Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath()); + FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); + Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length); + Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); + Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); + } + +}
