This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 42a82ac  KAFKA-6360: Clear RocksDB Segments when store is closed
42a82ac is described below

commit 42a82ac4e9c83aea55cd65bf51d8b0c7f2aa0eb5
Author: Damian Guy <damian....@gmail.com>
AuthorDate: Thu Dec 14 09:51:56 2017 -0800

    KAFKA-6360: Clear RocksDB Segments when store is closed
    
    Now that we support re-initializing state stores, we need to clear the 
segments when the store is closed so that they can be re-opened.
    
    Author: Damian Guy <damian....@gmail.com>
    
    Reviewers: Bill Bejeck <bbej...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>, Ted Yu <yuzhih...@gmail.com>
    
    Closes #4324 from dguy/kafka-6360
---
 .../org/apache/kafka/streams/state/internals/Segments.java     |  1 +
 .../state/internals/RocksDBSegmentedBytesStoreTest.java        | 10 ++++++++++
 .../org/apache/kafka/streams/state/internals/SegmentsTest.java |  9 +++++++++
 3 files changed, 20 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 9c8653a..f0d4abc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -149,6 +149,7 @@ class Segments {
         for (Segment segment : segments.values()) {
             segment.close();
         }
+        segments.clear();
     }
 
     private Segment getSegment(long segmentId) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index df91cfb..f5bb1af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -147,6 +147,16 @@ public class RocksDBSegmentedBytesStoreTest {
 
     }
 
+    @Test
+    public void shouldBeAbleToWriteToReInitializedStore() {
+        final String key = "a";
+        // need to create a segment so we can attempt to write to it again.
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
+        bytesStore.close();
+        bytesStore.init(context, bytesStore);
+        bytesStore.put(serializeKey(new Windowed<>(key, new SessionWindow(0L, 
0L))), serializeValue(50L));
+    }
+
     private Set<String> segmentDirs() {
         File windowDir = new File(stateDir, storeName);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
index 9d367eb..71b1bcd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
@@ -29,6 +29,9 @@ import org.junit.Test;
 import java.io.File;
 import java.util.List;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -199,6 +202,12 @@ public class SegmentsTest {
         verifyCorrectSegments(2, 5);
     }
 
+    @Test
+    public void shouldClearSegmentsOnClose() {
+        segments.getOrCreateSegment(0, context);
+        segments.close();
+        assertThat(segments.getSegmentForTimestamp(0), is(nullValue()));
+    }
     private void verifyCorrectSegments(final long first, final int 
numSegments) {
         final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
         assertEquals(numSegments, result.size());

-- 
To stop receiving notification emails like this one, please contact
mj...@apache.org.

Reply via email to