Repository: kafka Updated Branches: refs/heads/trunk 4fd60c612 -> b5e6b8671
HOTFIX: open window segments in order, add segment id check in getSegment * During window store initialization, we have to open segments in the segment id order and update ```currentSegmentId```, otherwise cleanup won't work. * ```getSegment()``` should not create a segment and clean up old segments if the segment id is greater than ```currentSegmentId```. Segment maintenance should be driven not by query but only by data insertion. Author: Yasuhiro Matsuda <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #891 from ymatsuda/hotfix2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b5e6b867 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b5e6b867 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b5e6b867 Branch: refs/heads/trunk Commit: b5e6b8671a5b6d97d5026261ae8d62b54f068e53 Parents: 4fd60c6 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Feb 9 13:22:46 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Feb 9 13:22:46 2016 -0800 ---------------------------------------------------------------------- .../state/internals/RocksDBWindowStore.java | 21 ++- .../state/internals/RocksDBWindowStoreTest.java | 166 +++++++++++++++++++ 2 files changed, 182 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java index 825c70d..2758e6e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java @@ -33,6 +33,7 @@ import org.apache.kafka.streams.state.WindowStoreUtils; import java.io.File; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.NoSuchElementException; @@ -181,10 +182,20 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { File dir = new File(context.stateDir(), name); if (dir.exists()) { - for (String segmentName : dir.list()) { - long segmentId = segmentIdFromSegmentName(segmentName); - if (segmentId >= 0) - getSegment(segmentId); + String[] list = dir.list(); + if (list != null) { + long[] segmentIds = new long[list.length]; + for (int i = 0; i < list.length; i++) + segmentIds[i] = segmentIdFromSegmentName(list[i]); + + // open segments in the id order + Arrays.sort(segmentIds); + for (long segmentId : segmentIds) { + if (segmentId >= 0) { + currentSegmentId = segmentId; + getSegment(segmentId); + } + } } } else { dir.mkdir(); @@ -314,7 +325,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> { } private Segment getSegment(long segmentId) { - if (segmentId > currentSegmentId - segments.length) { + if (segmentId <= currentSegmentId && segmentId > currentSegmentId - segments.length) { int index = (int) (segmentId % segments.length); if (segments[index] != null && segments[index].id != segmentId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/b5e6b867/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 3eda1be..fd55944 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -638,6 +638,172 @@ public class RocksDBWindowStoreTest { } } + @Test + public void testSegmentMaintenance() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + // do nothing + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + RocksDBWindowStore<Integer, String> inner = + (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); + + try { + + context.setTime(0L); + store.put(0, "v"); + assertEquals( + Utils.mkSet(inner.segmentName(0L)), + segmentDirs(baseDir) + ); + + context.setTime(59999L); + store.put(0, "v"); + context.setTime(59999L); + store.put(0, "v"); + assertEquals( + Utils.mkSet(inner.segmentName(0L)), + segmentDirs(baseDir) + ); + + context.setTime(60000L); + store.put(0, "v"); + assertEquals( + Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)), + segmentDirs(baseDir) + ); + + WindowStoreIterator iter; + int fetchedCount; + + iter = store.fetch(0, 0L, 240000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(4, fetchedCount); + + assertEquals( + Utils.mkSet(inner.segmentName(0L), inner.segmentName(1L)), + segmentDirs(baseDir) + ); + + context.setTime(180000L); + store.put(0, "v"); + + iter = store.fetch(0, 0L, 240000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(2, fetchedCount); + + assertEquals( + Utils.mkSet(inner.segmentName(1L), inner.segmentName(2L), inner.segmentName(3L)), + segmentDirs(baseDir) + ); + + context.setTime(300000L); + store.put(0, "v"); + + iter = store.fetch(0, 240000L, 1000000L); + fetchedCount = 0; + while (iter.hasNext()) { + iter.next(); + fetchedCount++; + } + assertEquals(1, fetchedCount); + + assertEquals( + Utils.mkSet(inner.segmentName(3L), inner.segmentName(4L), inner.segmentName(5L)), + segmentDirs(baseDir) + ); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testInitialLoading() throws IOException { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); + RecordCollector recordCollector = new RecordCollector(producer) { + @Override + public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { + // do nothing + } + }; + + MockProcessorContext context = new MockProcessorContext( + null, baseDir, + byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer, + recordCollector); + + File storeDir = new File(baseDir, windowName); + + WindowStore<Integer, String> store = createWindowStore(context, serdes); + RocksDBWindowStore<Integer, String> inner = + (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); + + try { + new File(storeDir, inner.segmentName(0L)).mkdir(); + new File(storeDir, inner.segmentName(1L)).mkdir(); + new File(storeDir, inner.segmentName(2L)).mkdir(); + new File(storeDir, inner.segmentName(3L)).mkdir(); + new File(storeDir, inner.segmentName(4L)).mkdir(); + new File(storeDir, inner.segmentName(5L)).mkdir(); + new File(storeDir, inner.segmentName(6L)).mkdir(); + } finally { + store.close(); + } + + store = createWindowStore(context, serdes); + inner = (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner(); + + try { + assertEquals( + Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), + segmentDirs(baseDir) + ); + + WindowStoreIterator iter = store.fetch(0, 0L, 1000000L); + while (iter.hasNext()) { + iter.next(); + } + + assertEquals( + Utils.mkSet(inner.segmentName(4L), inner.segmentName(5L), inner.segmentName(6L)), + segmentDirs(baseDir) + ); + + } finally { + store.close(); + } + + } finally { + Utils.delete(baseDir); + } + } + private <E> List<E> toList(WindowStoreIterator<E> iterator) { ArrayList<E> list = new ArrayList<>(); while (iterator.hasNext()) {
