This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push: new 42a82ac KAFKA-6360: Clear RocksDB Segments when store is closed 42a82ac is described below commit 42a82ac4e9c83aea55cd65bf51d8b0c7f2aa0eb5 Author: Damian Guy <damian....@gmail.com> AuthorDate: Thu Dec 14 09:51:56 2017 -0800 KAFKA-6360: Clear RocksDB Segments when store is closed Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened. Author: Damian Guy <damian....@gmail.com> Reviewers: Bill Bejeck <bbej...@gmail.com>, Guozhang Wang <wangg...@gmail.com>, Ted Yu <yuzhih...@gmail.com> Closes #4324 from dguy/kafka-6360 --- .../org/apache/kafka/streams/state/internals/Segments.java | 1 + .../state/internals/RocksDBSegmentedBytesStoreTest.java | 10 ++++++++++ .../org/apache/kafka/streams/state/internals/SegmentsTest.java | 9 +++++++++ 3 files changed, 20 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 9c8653a..f0d4abc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -149,6 +149,7 @@ class Segments { for (Segment segment : segments.values()) { segment.close(); } + segments.clear(); } private Segment getSegment(long segmentId) { diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index df91cfb..f5bb1af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -147,6 +147,16 @@ public class RocksDBSegmentedBytesStoreTest { } + @Test + public void shouldBeAbleToWriteToReInitializedStore() { + final String key = "a"; + // need to create a segment so we can attempt to write to it again. + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + bytesStore.close(); + bytesStore.init(context, bytesStore); + bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 0L))), serializeValue(50L)); + } + private Set<String> segmentDirs() { File windowDir = new File(stateDir, storeName); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index 9d367eb..71b1bcd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -29,6 +29,9 @@ import org.junit.Test; import java.io.File; import java.util.List; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -199,6 +202,12 @@ public class SegmentsTest { verifyCorrectSegments(2, 5); } + @Test + public void shouldClearSegmentsOnClose() { + segments.getOrCreateSegment(0, context); + segments.close(); + assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); + } private void verifyCorrectSegments(final long first, final int numSegments) { final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE); assertEquals(numSegments, result.size()); -- To stop receiving notification emails like this one, please contact mj...@apache.org.