Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 0a3e00ce0 -> 9c557fca1
Using iterator to remove entries instead of directly accessing data structure to avoid concurrent modification exceptions Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c39f5655 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c39f5655 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c39f5655 Branch: refs/heads/devel-3 Commit: c39f56555831249ce144b29c2835bc68952c1689 Parents: a029c5f Author: Pramod Immaneni <[email protected]> Authored: Fri Feb 5 13:19:17 2016 -0800 Committer: Pramod Immaneni <[email protected]> Committed: Fri Feb 12 11:35:01 2016 -0800 ---------------------------------------------------------------------- .../lib/io/IdempotentStorageManager.java | 25 ++++++++--------- .../datatorrent/lib/util/WindowDataManager.java | 11 +++++--- .../lib/io/IdempotentStorageManagerTest.java | 28 ++++++++++++++------ .../lib/util/WindowDataManagerTest.java | 18 ++++++++++--- 4 files changed, 55 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java index dae417d..4eac924 100644 --- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java +++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java @@ -23,25 +23,23 @@ import java.util.*; import javax.validation.constraints.NotNull; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultimap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; + import com.datatorrent.api.Component; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.annotation.Stateless; - -import com.datatorrent.lib.io.fs.AbstractFileInputOperator; - import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent @@ -61,6 +59,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex { /** * Gets the largest window for which there is recovery data. + * @return Returns the window id */ long getLargestRecoveryWindow(); @@ -233,13 +232,14 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex { //deleting the replay state if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { - Iterator<Long> windowsIterator = replayState.keySet().iterator(); - while (windowsIterator.hasNext()) { - long lwindow = windowsIterator.next(); + Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next(); + long lwindow = windowEntry.getKey(); if (lwindow > windowId) { break; } - for (Integer loperator : replayState.removeAll(lwindow)) { + for (Integer loperator : windowEntry.getValue()) { if (deletedOperators.contains(loperator)) { storageAgent.delete(loperator, lwindow); @@ -255,6 +255,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex storageAgent.delete(loperator, lwindow); } } + iterator.remove(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java index 26a2e32..7517cd4 100644 --- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java +++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java @@ -61,6 +61,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera { /** * Gets the largest window for which there is recovery data. + * @return Returns the window id */ long getLargestRecoveryWindow(); @@ -232,13 +233,14 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera { //deleting the replay state if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { - Iterator<Long> windowsIterator = replayState.keySet().iterator(); - while (windowsIterator.hasNext()) { - long lwindow = windowsIterator.next(); + Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next(); + long lwindow = windowEntry.getKey(); if (lwindow > windowId) { break; } - for (Integer loperator : replayState.removeAll(lwindow)) { + for (Integer loperator : windowEntry.getValue()) { if (deletedOperators.contains(loperator)) { storageAgent.delete(loperator, lwindow); @@ -253,6 +255,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera storageAgent.delete(loperator, lwindow); } } + iterator.remove(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java index 347dabf..4b29830 100644 --- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java @@ -20,18 +20,22 @@ package com.datatorrent.lib.io; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Map; +import java.util.TreeSet; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -40,7 +44,6 @@ import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.lib.helper.OperatorContextTestHelper; /** @@ -172,18 +175,27 @@ public class IdempotentStorageManagerTest dataOf2.put(8, "eight"); dataOf2.put(9, "nine"); - testMeta.storageManager.save(dataOf1, 1, 1); + for (int i = 1; i <= 9; ++i) { + testMeta.storageManager.save(dataOf1, 1, i); + } + testMeta.storageManager.save(dataOf2, 2, 1); testMeta.storageManager.save(dataOf3, 3, 1); testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager), Sets.newHashSet(2, 3)); testMeta.storageManager.setup(testMeta.context); - testMeta.storageManager.deleteUpTo(1, 1); + testMeta.storageManager.deleteUpTo(1, 6); Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.recoveryPath); FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); - Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length); + FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1))); + Assert.assertEquals("number of windows for 1", 3, fileStatuses.length); + TreeSet<String> windows = Sets.newTreeSet(); + for (FileStatus fileStatus : fileStatuses) { + windows.add(fileStatus.getPath().getName()); + } + Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows); Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c39f5655/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java index fdca73e..845b992 100644 --- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java +++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java @@ -20,7 +20,9 @@ package com.datatorrent.lib.util; import java.io.File; import java.io.IOException; +import java.util.Arrays; import java.util.Map; +import java.util.TreeSet; import org.junit.Assert; import org.junit.Rule; @@ -30,6 +32,7 @@ import org.junit.runner.Description; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -169,18 +172,27 @@ public class WindowDataManagerTest dataOf2.put(8, "eight"); dataOf2.put(9, "nine"); - testMeta.storageManager.save(dataOf1, 1, 1); + for (int i = 1; i <= 9; ++i) { + testMeta.storageManager.save(dataOf1, 1, i); + } + testMeta.storageManager.save(dataOf2, 2, 1); testMeta.storageManager.save(dataOf3, 3, 1); testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager), Sets.newHashSet(2, 3)); testMeta.storageManager.setup(testMeta.context); - testMeta.storageManager.deleteUpTo(1, 1); + testMeta.storageManager.deleteUpTo(1, 6); Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath()); FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration()); - Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length); + FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1))); + Assert.assertEquals("number of windows for 1", 3, fileStatuses.length); + TreeSet<String> windows = Sets.newTreeSet(); + for (FileStatus fileStatus : fileStatuses) { + windows.add(fileStatus.getPath().getName()); + } + Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows); Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2)))); Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3)))); }
