Repository: samza Updated Branches: refs/heads/master d17134e0d -> 7836bf08c
Close iterators to time-series store on deletes Author: Jagadish <[email protected]> Reviewers: Jagadish<[email protected]> Closes #787 from vjagadish1989/website-reorg29 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7836bf08 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7836bf08 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7836bf08 Branch: refs/heads/master Commit: 7836bf08c54f48cdac50f57e89c4eff8ec1925ea Parents: d17134e Author: Jagadish <[email protected]> Authored: Tue Oct 30 18:48:40 2018 -0700 Committer: Jagadish <[email protected]> Committed: Tue Oct 30 18:48:40 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/operators/impl/WindowOperatorImpl.java | 4 ++-- .../samza/operators/impl/store/TimeSeriesStoreImpl.java | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7836bf08/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index c09c5f8..0241d9e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -379,15 +379,15 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje * @return a list of all elements returned by the iterator */ static <V> List<V> toList(ClosableIterator<V> iterator) { + Preconditions.checkNotNull(iterator); + List<V> values = new ArrayList<>(); try { while (iterator.hasNext()) { values.add(iterator.next()); } } finally { - if (iterator != null) { iterator.close(); - } } return Collections.unmodifiableList(values); } http://git-wip-us.apache.org/repos/asf/samza/blob/7836bf08/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java index 10a5967..b8cd82f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/store/TimeSeriesStoreImpl.java @@ -154,10 +154,13 @@ public class TimeSeriesStoreImpl<K, V> implements TimeSeriesStore<K, V> { List<TimeSeriesKey<K>> keysToDelete = new LinkedList<>(); KeyValueIterator<TimeSeriesKey<K>, V> range = kvStore.range(fromKey, toKey); - while (range.hasNext()) { - keysToDelete.add(range.next().getKey()); + try { + while (range.hasNext()) { + keysToDelete.add(range.next().getKey()); + } + } finally { + range.close(); } - kvStore.deleteAll(keysToDelete); }
