Repository: incubator-apex-malhar Updated Branches: refs/heads/master 1d3e20c69 -> 5d3b209fc
APEXMALHAR-2065 added getWindowIds method to Window Data Manager Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/2a6e1a6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/2a6e1a6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/2a6e1a6b Branch: refs/heads/master Commit: 2a6e1a6b50413f6e2c8c2ced55f4c10b562375f1 Parents: 4ef0700 Author: Timothy Farkas <t...@datatorrent.com> Authored: Sun Apr 24 18:39:24 2016 -0700 Committer: Chandni Singh <csi...@apache.org> Committed: Tue May 17 11:41:47 2016 -0700 ---------------------------------------------------------------------- .../apex/malhar/lib/wal/WindowDataManager.java | 33 ++++++++++++++++++++ .../malhar/lib/wal/FSWindowDataManagerTest.java | 19 +++++++++++ 2 files changed, 52 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a6e1a6b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java index fd7948a..296238b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.SortedSet; import javax.validation.constraints.NotNull; @@ -99,6 +100,16 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds); /** + * Returns an array of windowIds for which data was stored by atleast one partition. The array + * of winodwIds is sorted. + * + * @return An array of windowIds for which data was stored by atleast one partition. The array + * of winodwIds is sorted. + * @throws IOException + */ + long[] getWindowIds() throws IOException; + + /** * An {@link WindowDataManager} that uses FS to persist state. */ class FSWindowDataManager implements WindowDataManager @@ -227,6 +238,22 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera return storageAgent.getWindowIds(operatorId); } + @Override + public long[] getWindowIds() throws IOException + { + SortedSet<Long> windowIds = replayState.keySet(); + long[] windowIdsArray = new long[windowIds.size()]; + + int index = 0; + + for (Long windowId: windowIds) { + windowIdsArray[index] = windowId; + index++; + } + + return windowIdsArray; + } + /** * This deletes all the recovery files of window ids <= windowId. * @@ -436,5 +463,11 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera { return new long[0]; } + + @Override + public long[] getWindowIds() throws IOException + { + return new long[0]; + } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/2a6e1a6b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java index dff061a..7f3adc9 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java @@ -158,6 +158,25 @@ public class FSWindowDataManagerTest } @Test + public void testGetWindowIds() throws IOException + { + testMeta.storageManager.setup(testMeta.context); + Map<Integer, String> data = Maps.newHashMap(); + data.put(1, "one"); + data.put(2, "two"); + data.put(3, "three"); + + testMeta.storageManager.save(data, 1, 1); + testMeta.storageManager.save(data, 2, 2); + + testMeta.storageManager.setup(testMeta.context); + + Assert.assertArrayEquals(new long[] {1, 2}, testMeta.storageManager.getWindowIds()); + + testMeta.storageManager.teardown(); + } + + @Test public void testDelete() throws IOException { testMeta.storageManager.setup(testMeta.context);