This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef62dd3  KAFKA-3522: Generalize Segments (#6170)
ef62dd3 is described below

commit ef62dd3ef2f88d990bff21e3ff825c992e65fa10
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Fri Jan 25 23:56:39 2019 -0800

    KAFKA-3522: Generalize Segments (#6170)
    
    Reviewers: Bill Bejeck <b...@confluent.io>, John Roesler 
<j...@confluent.io>, Guozhang Wang <guozh...@confluent.io>
---
 .../{Segments.java => AbstractSegments.java}       |  76 ++++---
 .../state/internals/CachingWindowStore.java        |  23 +--
 .../{Segment.java => KeyValueSegment.java}         |  13 +-
 .../streams/state/internals/KeyValueSegments.java  |  45 +++++
 .../internals/RocksDBSegmentedBytesStore.java      |  63 +++---
 .../kafka/streams/state/internals/Segment.java     |  49 +----
 .../streams/state/internals/SegmentIterator.java   |  13 +-
 .../state/internals/SegmentedBytesStore.java       |   4 +-
 .../kafka/streams/state/internals/Segments.java    | 219 ++-------------------
 .../streams/state/internals/SessionKeySchema.java  |  40 ++--
 .../streams/state/internals/WindowKeySchema.java   |  59 +++---
 ...rTest.java => KeyValueSegmentIteratorTest.java} |  36 ++--
 ...SegmentsTest.java => KeyValueSegmentsTest.java} |  44 ++---
 .../internals/RocksDBSegmentedBytesStoreTest.java  |  18 +-
 .../state/internals/RocksDBWindowStoreTest.java    |   2 +-
 .../state/internals/WindowKeySchemaTest.java       |   8 +-
 16 files changed, 254 insertions(+), 458 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
similarity index 77%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
index d4aedce..4d60b08 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
@@ -34,19 +34,16 @@ import java.util.NavigableMap;
 import java.util.SimpleTimeZone;
 import java.util.TreeMap;
 
-/**
- * Manages the {@link Segment}s that are used by the {@link 
RocksDBSegmentedBytesStore}
- */
-class Segments {
-    private static final Logger log = LoggerFactory.getLogger(Segments.class);
+abstract class AbstractSegments<S extends Segment> implements Segments<S> {
+    private static final Logger log = 
LoggerFactory.getLogger(AbstractSegments.class);
 
-    private final TreeMap<Long, Segment> segments = new TreeMap<>();
-    private final String name;
+    final TreeMap<Long, S> segments = new TreeMap<>();
+    final String name;
     private final long retentionPeriod;
     private final long segmentInterval;
     private final SimpleDateFormat formatter;
 
-    Segments(final String name, final long retentionPeriod, final long 
segmentInterval) {
+    AbstractSegments(final String name, final long retentionPeriod, final long 
segmentInterval) {
         this.name = name;
         this.segmentInterval = segmentInterval;
         this.retentionPeriod = retentionPeriod;
@@ -55,11 +52,13 @@ class Segments {
         this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
     }
 
-    long segmentId(final long timestamp) {
+    @Override
+    public long segmentId(final long timestamp) {
         return timestamp / segmentInterval;
     }
 
-    String segmentName(final long segmentId) {
+    @Override
+    public String segmentName(final long segmentId) {
         // (1) previous format used - as a separator so if this changes in the 
future
         // then we should use something different.
         // (2) previous format used : as a separator (which did break 
KafkaStreams on Windows OS)
@@ -67,15 +66,17 @@ class Segments {
         return name + "." + segmentId * segmentInterval;
     }
 
-    Segment getSegmentForTimestamp(final long timestamp) {
+    @Override
+    public S getSegmentForTimestamp(final long timestamp) {
         return segments.get(segmentId(timestamp));
     }
 
-    Segment getOrCreateSegmentIfLive(final long segmentId, final 
InternalProcessorContext context) {
+    @Override
+    public S getOrCreateSegmentIfLive(final long segmentId, final 
InternalProcessorContext context) {
         final long minLiveTimestamp = context.streamTime() - retentionPeriod;
         final long minLiveSegment = segmentId(minLiveTimestamp);
 
-        final Segment toReturn;
+        final S toReturn;
         if (segmentId >= minLiveSegment) {
             // The segment is live. get it, ensure it's open, and return it.
             toReturn = getOrCreateSegment(segmentId, context);
@@ -87,23 +88,8 @@ class Segments {
         return toReturn;
     }
 
-    private Segment getOrCreateSegment(final long segmentId, final 
InternalProcessorContext context) {
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            final Segment newSegment = new Segment(segmentName(segmentId), 
name, segmentId);
-            final Segment shouldBeNull = segments.put(segmentId, newSegment);
-
-            if (shouldBeNull != null) {
-                throw new IllegalStateException("Segment already exists. 
Possible concurrent access.");
-            }
-
-            newSegment.openDB(context);
-            return newSegment;
-        }
-    }
-
-    void openExisting(final InternalProcessorContext context) {
+    @Override
+    public void openExisting(final InternalProcessorContext context) {
         try {
             final File dir = new File(context.stateDir(), name);
             if (dir.exists()) {
@@ -135,13 +121,14 @@ class Segments {
         cleanupEarlierThan(minLiveSegment);
     }
 
-    List<Segment> segments(final long timeFrom, final long timeTo) {
-        final List<Segment> result = new ArrayList<>();
-        final NavigableMap<Long, Segment> segmentsInRange = segments.subMap(
+    @Override
+    public List<S> segments(final long timeFrom, final long timeTo) {
+        final List<S> result = new ArrayList<>();
+        final NavigableMap<Long, S> segmentsInRange = segments.subMap(
             segmentId(timeFrom), true,
             segmentId(timeTo), true
         );
-        for (final Segment segment : segmentsInRange.values()) {
+        for (final S segment : segmentsInRange.values()) {
             if (segment.isOpen()) {
                 result.add(segment);
             }
@@ -149,9 +136,10 @@ class Segments {
         return result;
     }
 
-    List<Segment> allSegments() {
-        final List<Segment> result = new ArrayList<>();
-        for (final Segment segment : segments.values()) {
+    @Override
+    public List<S> allSegments() {
+        final List<S> result = new ArrayList<>();
+        for (final S segment : segments.values()) {
             if (segment.isOpen()) {
                 result.add(segment);
             }
@@ -159,27 +147,29 @@ class Segments {
         return result;
     }
 
-    void flush() {
-        for (final Segment segment : segments.values()) {
+    @Override
+    public void flush() {
+        for (final S segment : segments.values()) {
             segment.flush();
         }
     }
 
+    @Override
     public void close() {
-        for (final Segment segment : segments.values()) {
+        for (final S segment : segments.values()) {
             segment.close();
         }
         segments.clear();
     }
 
     private void cleanupEarlierThan(final long minLiveSegment) {
-        final Iterator<Map.Entry<Long, Segment>> toRemove =
+        final Iterator<Map.Entry<Long, S>> toRemove =
             segments.headMap(minLiveSegment, false).entrySet().iterator();
 
         while (toRemove.hasNext()) {
-            final Map.Entry<Long, Segment> next = toRemove.next();
+            final Map.Entry<Long, S> next = toRemove.next();
             toRemove.remove();
-            final Segment segment = next.getValue();
+            final S segment = next.getValue();
             segment.close();
             try {
                 segment.destroy();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index afe9b34..6112544 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -30,8 +30,6 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
-import java.util.List;
-
 class CachingWindowStore<K, V> extends WrappedStateStore.AbstractStateStore 
implements WindowStore<Bytes, byte[]>, CachedStateStore<Windowed<K>, V> {
 
     private final WindowStore<Bytes, byte[]> underlying;
@@ -84,18 +82,15 @@ class CachingWindowStore<K, V> extends 
WrappedStateStore.AbstractStateStore impl
         name = context.taskId() + "-" + underlying.name();
         cache = this.context.getCache();
 
-        cache.addDirtyEntryFlushListener(name, new 
ThreadCache.DirtyEntryFlushListener() {
-            @Override
-            public void apply(final List<ThreadCache.DirtyEntry> entries) {
-                for (final ThreadCache.DirtyEntry entry : entries) {
-                    final byte[] binaryWindowKey = 
cacheFunction.key(entry.key()).get();
-                    final long timestamp = 
WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
-
-                    final Windowed<K> windowedKey = 
WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, 
serdes.keyDeserializer(), serdes.topic());
-                    final Bytes key = 
Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
-                    maybeForward(entry, key, windowedKey, 
(InternalProcessorContext) context);
-                    underlying.put(key, entry.newValue(), timestamp);
-                }
+        cache.addDirtyEntryFlushListener(name, entries -> {
+            for (final ThreadCache.DirtyEntry entry : entries) {
+                final byte[] binaryWindowKey = 
cacheFunction.key(entry.key()).get();
+                final long timestamp = 
WindowKeySchema.extractStoreTimestamp(binaryWindowKey);
+
+                final Windowed<K> windowedKey = 
WindowKeySchema.fromStoreKey(binaryWindowKey, windowSize, 
serdes.keyDeserializer(), serdes.topic());
+                final Bytes key = 
Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(binaryWindowKey));
+                maybeForward(entry, key, windowedKey, 
(InternalProcessorContext) context);
+                underlying.put(key, entry.newValue(), timestamp);
             }
         });
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
similarity index 79%
copy from 
streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
copy to 
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
index f95e395..697b67a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegment.java
@@ -22,20 +22,21 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import java.io.IOException;
 import java.util.Objects;
 
-class Segment extends RocksDBStore implements Comparable<Segment> {
+class KeyValueSegment extends RocksDBStore implements 
Comparable<KeyValueSegment>, Segment {
     public final long id;
 
-    Segment(final String segmentName, final String windowName, final long id) {
+    KeyValueSegment(final String segmentName, final String windowName, final 
long id) {
         super(segmentName, windowName);
         this.id = id;
     }
 
-    void destroy() throws IOException {
+    @Override
+    public void destroy() throws IOException {
         Utils.delete(dbDir);
     }
 
     @Override
-    public int compareTo(final Segment segment) {
+    public int compareTo(final KeyValueSegment segment) {
         return Long.compare(id, segment.id);
     }
 
@@ -48,7 +49,7 @@ class Segment extends RocksDBStore implements 
Comparable<Segment> {
 
     @Override
     public String toString() {
-        return "Segment(id=" + id + ", name=" + name() + ")";
+        return "KeyValueSegment(id=" + id + ", name=" + name() + ")";
     }
 
     @Override
@@ -56,7 +57,7 @@ class Segment extends RocksDBStore implements 
Comparable<Segment> {
         if (obj == null || getClass() != obj.getClass()) {
             return false;
         }
-        final Segment segment = (Segment) obj;
+        final KeyValueSegment segment = (KeyValueSegment) obj;
         return id == segment.id;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
new file mode 100644
index 0000000..0664551
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueSegments.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+
+/**
+ * Manages the {@link KeyValueSegment}s that are used by the {@link 
RocksDBSegmentedBytesStore}
+ */
+class KeyValueSegments extends AbstractSegments<KeyValueSegment> {
+
+    KeyValueSegments(final String name, final long retentionPeriod, final long 
segmentInterval) {
+        super(name, retentionPeriod, segmentInterval);
+    }
+
+    @Override
+    public KeyValueSegment getOrCreateSegment(final long segmentId, final 
InternalProcessorContext context) {
+        if (segments.containsKey(segmentId)) {
+            return segments.get(segmentId);
+        } else {
+            final KeyValueSegment newSegment = new 
KeyValueSegment(segmentName(segmentId), name, segmentId);
+
+            if (segments.put(segmentId, newSegment) != null) {
+                throw new IllegalStateException("KeyValueSegment already 
exists. Possible concurrent access.");
+            }
+
+            newSegment.openDB(context);
+            return newSegment;
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
index 17079b9..79e6b95 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java
@@ -41,15 +41,15 @@ import java.util.Set;
 
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCount;
 
-class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
+public class RocksDBSegmentedBytesStore implements SegmentedBytesStore {
     private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBSegmentedBytesStore.class);
     private final String name;
-    private final Segments segments;
+    private final KeyValueSegments segments;
     private final String metricScope;
     private final KeySchema keySchema;
     private InternalProcessorContext context;
     private volatile boolean open;
-    private Set<Segment> bulkLoadSegments;
+    private Set<KeyValueSegment> bulkLoadSegments;
     private Sensor expiredRecordSensor;
 
     RocksDBSegmentedBytesStore(final String name,
@@ -60,55 +60,54 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
         this.name = name;
         this.metricScope = metricScope;
         this.keySchema = keySchema;
-        this.segments = new Segments(name, retention, segmentInterval);
+        this.segments = new KeyValueSegments(name, retention, segmentInterval);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes key, final long 
from, final long to) {
-        final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, 
from, to);
+        final List<KeyValueSegment> searchSpace = 
keySchema.segmentsToSearch(segments, from, to);
 
         final Bytes binaryFrom = keySchema.lowerRangeFixedSize(key, from);
         final Bytes binaryTo = keySchema.upperRangeFixedSize(key, to);
 
-        return new SegmentIterator(searchSpace.iterator(),
-                                   keySchema.hasNextCondition(key, key, from, 
to),
-                                   binaryFrom, binaryTo);
+        return new SegmentIterator<>(searchSpace.iterator(),
+                                     keySchema.hasNextCondition(key, key, 
from, to),
+                                     binaryFrom, binaryTo);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> fetch(final Bytes keyFrom, final 
Bytes keyTo, final long from, final long to) {
-        final List<Segment> searchSpace = keySchema.segmentsToSearch(segments, 
from, to);
+        final List<KeyValueSegment> searchSpace = 
keySchema.segmentsToSearch(segments, from, to);
 
         final Bytes binaryFrom = keySchema.lowerRange(keyFrom, from);
         final Bytes binaryTo = keySchema.upperRange(keyTo, to);
 
-        return new SegmentIterator(searchSpace.iterator(),
-                                   keySchema.hasNextCondition(keyFrom, keyTo, 
from, to),
-                                   binaryFrom, binaryTo);
+        return new SegmentIterator<>(searchSpace.iterator(),
+                                     keySchema.hasNextCondition(keyFrom, 
keyTo, from, to),
+                                     binaryFrom, binaryTo);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> all() {
+        final List<KeyValueSegment> searchSpace = segments.allSegments();
 
-        final List<Segment> searchSpace = segments.allSegments();
-
-        return new SegmentIterator(searchSpace.iterator(),
-                                   keySchema.hasNextCondition(null, null, 0, 
Long.MAX_VALUE),
-                                   null, null);
+        return new SegmentIterator<>(searchSpace.iterator(),
+                                     keySchema.hasNextCondition(null, null, 0, 
Long.MAX_VALUE),
+                                     null, null);
     }
 
     @Override
     public KeyValueIterator<Bytes, byte[]> fetchAll(final long timeFrom, final 
long timeTo) {
-        final List<Segment> searchSpace = segments.segments(timeFrom, timeTo);
+        final List<KeyValueSegment> searchSpace = segments.segments(timeFrom, 
timeTo);
 
-        return new SegmentIterator(searchSpace.iterator(),
-                                   keySchema.hasNextCondition(null, null, 
timeFrom, timeTo),
-                                   null, null);
+        return new SegmentIterator<>(searchSpace.iterator(),
+                                     keySchema.hasNextCondition(null, null, 
timeFrom, timeTo),
+                                     null, null);
     }
 
     @Override
     public void remove(final Bytes key) {
-        final Segment segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final KeyValueSegment segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
         if (segment == null) {
             return;
         }
@@ -119,7 +118,7 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
     public void put(final Bytes key, final byte[] value) {
         final long timestamp = keySchema.segmentTimestamp(key);
         final long segmentId = segments.segmentId(timestamp);
-        final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, 
context);
+        final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, context);
         if (segment == null) {
             expiredRecordSensor.record();
             LOG.debug("Skipping record for expired segment.");
@@ -130,7 +129,7 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
 
     @Override
     public byte[] get(final Bytes key) {
-        final Segment segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
+        final KeyValueSegment segment = 
segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key));
         if (segment == null) {
             return null;
         }
@@ -195,16 +194,16 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
     }
 
     // Visible for testing
-    List<Segment> getSegments() {
+    List<KeyValueSegment> getSegments() {
         return segments.allSegments();
     }
 
     // Visible for testing
     void restoreAllInternal(final Collection<KeyValue<byte[], byte[]>> 
records) {
         try {
-            final Map<Segment, WriteBatch> writeBatchMap = 
getWriteBatches(records);
-            for (final Map.Entry<Segment, WriteBatch> entry : 
writeBatchMap.entrySet()) {
-                final Segment segment = entry.getKey();
+            final Map<KeyValueSegment, WriteBatch> writeBatchMap = 
getWriteBatches(records);
+            for (final Map.Entry<KeyValueSegment, WriteBatch> entry : 
writeBatchMap.entrySet()) {
+                final KeyValueSegment segment = entry.getKey();
                 final WriteBatch batch = entry.getValue();
                 segment.write(batch);
             }
@@ -214,11 +213,11 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
     }
 
     // Visible for testing
-    Map<Segment, WriteBatch> getWriteBatches(final Collection<KeyValue<byte[], 
byte[]>> records) {
-        final Map<Segment, WriteBatch> writeBatchMap = new HashMap<>();
+    Map<KeyValueSegment, WriteBatch> getWriteBatches(final 
Collection<KeyValue<byte[], byte[]>> records) {
+        final Map<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<>();
         for (final KeyValue<byte[], byte[]> record : records) {
             final long segmentId = 
segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key)));
-            final Segment segment = 
segments.getOrCreateSegmentIfLive(segmentId, context);
+            final KeyValueSegment segment = 
segments.getOrCreateSegmentIfLive(segmentId, context);
             if (segment != null) {
                 // This handles the case that state store is moved to a new 
client and does not
                 // have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
@@ -247,7 +246,7 @@ class RocksDBSegmentedBytesStore implements 
SegmentedBytesStore {
     }
 
     private void toggleForBulkLoading(final boolean prepareForBulkload) {
-        for (final Segment segment: segments.allSegments()) {
+        for (final KeyValueSegment segment: segments.allSegments()) {
             segment.toggleDbForBulkLoading(prepareForBulkload);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
index f95e395..8687ffc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java
@@ -16,52 +16,17 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.io.IOException;
-import java.util.Objects;
 
-class Segment extends RocksDBStore implements Comparable<Segment> {
-    public final long id;
+public interface Segment extends StateStore {
 
-    Segment(final String segmentName, final String windowName, final long id) {
-        super(segmentName, windowName);
-        this.id = id;
-    }
+    void destroy() throws IOException;
 
-    void destroy() throws IOException {
-        Utils.delete(dbDir);
-    }
+    KeyValueIterator<Bytes, byte[]> all();
 
-    @Override
-    public int compareTo(final Segment segment) {
-        return Long.compare(id, segment.id);
-    }
-
-    @Override
-    public void openDB(final ProcessorContext context) {
-        super.openDB(context);
-        // skip the registering step
-        internalProcessorContext = context;
-    }
-
-    @Override
-    public String toString() {
-        return "Segment(id=" + id + ", name=" + name() + ")";
-    }
-
-    @Override
-    public boolean equals(final Object obj) {
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final Segment segment = (Segment) obj;
-        return id == segment.id;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id);
-    }
+    KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes to);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
index 5b07812..0d90bd8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java
@@ -20,25 +20,24 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
 
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 /**
- * Iterate over multiple Segments
+ * Iterate over multiple KeyValueSegments
  */
-class SegmentIterator implements KeyValueIterator<Bytes, byte[]> {
+class SegmentIterator<S extends Segment> implements KeyValueIterator<Bytes, 
byte[]> {
 
     private final Bytes from;
     private final Bytes to;
-    protected final Iterator<Segment> segments;
+    protected final Iterator<S> segments;
     protected final HasNextCondition hasNextCondition;
 
-    protected KeyValueStore<Bytes, byte[]> currentSegment;
-    protected KeyValueIterator<Bytes, byte[]> currentIterator;
+    private S currentSegment;
+    KeyValueIterator<Bytes, byte[]> currentIterator;
 
-    SegmentIterator(final Iterator<Segment> segments,
+    SegmentIterator(final Iterator<S> segments,
                     final HasNextCondition hasNextCondition,
                     final Bytes from,
                     final Bytes to) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
index ce528ed..068dc5f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedBytesStore.java
@@ -154,7 +154,7 @@ public interface SegmentedBytesStore extends StateStore {
 
         /**
          * Create an implementation of {@link HasNextCondition} that knows when
-         * to stop iterating over the Segments. Used during {@link 
SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations
+         * to stop iterating over the KeyValueSegments. Used during {@link 
SegmentedBytesStore#fetch(Bytes, Bytes, long, long)} operations
          * @param binaryKeyFrom the first key in the range
          * @param binaryKeyTo   the last key in the range
          * @param from          starting time range
@@ -171,6 +171,6 @@ public interface SegmentedBytesStore extends StateStore {
          * @param to
          * @return  List of segments to search
          */
-        List<Segment> segmentsToSearch(Segments segments, long from, long to);
+        <S extends Segment> List<S> segmentsToSearch(Segments<S> segments, 
long from, long to);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index d4aedce..71d0c46 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -16,224 +16,29 @@
  */
 package org.apache.kafka.streams.state.internals;
 
-import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.SimpleTimeZone;
-import java.util.TreeMap;
 
-/**
- * Manages the {@link Segment}s that are used by the {@link 
RocksDBSegmentedBytesStore}
- */
-class Segments {
-    private static final Logger log = LoggerFactory.getLogger(Segments.class);
-
-    private final TreeMap<Long, Segment> segments = new TreeMap<>();
-    private final String name;
-    private final long retentionPeriod;
-    private final long segmentInterval;
-    private final SimpleDateFormat formatter;
-
-    Segments(final String name, final long retentionPeriod, final long 
segmentInterval) {
-        this.name = name;
-        this.segmentInterval = segmentInterval;
-        this.retentionPeriod = retentionPeriod;
-        // Create a date formatter. Formatted timestamps are used as segment 
name suffixes
-        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
-        this.formatter.setTimeZone(new SimpleTimeZone(0, "UTC"));
-    }
-
-    long segmentId(final long timestamp) {
-        return timestamp / segmentInterval;
-    }
-
-    String segmentName(final long segmentId) {
-        // (1) previous format used - as a separator so if this changes in the 
future
-        // then we should use something different.
-        // (2) previous format used : as a separator (which did break 
KafkaStreams on Windows OS)
-        // so if this changes in the future then we should use something 
different.
-        return name + "." + segmentId * segmentInterval;
-    }
-
-    Segment getSegmentForTimestamp(final long timestamp) {
-        return segments.get(segmentId(timestamp));
-    }
-
-    Segment getOrCreateSegmentIfLive(final long segmentId, final 
InternalProcessorContext context) {
-        final long minLiveTimestamp = context.streamTime() - retentionPeriod;
-        final long minLiveSegment = segmentId(minLiveTimestamp);
-
-        final Segment toReturn;
-        if (segmentId >= minLiveSegment) {
-            // The segment is live. get it, ensure it's open, and return it.
-            toReturn = getOrCreateSegment(segmentId, context);
-        } else {
-            toReturn = null;
-        }
-
-        cleanupEarlierThan(minLiveSegment);
-        return toReturn;
-    }
-
-    private Segment getOrCreateSegment(final long segmentId, final 
InternalProcessorContext context) {
-        if (segments.containsKey(segmentId)) {
-            return segments.get(segmentId);
-        } else {
-            final Segment newSegment = new Segment(segmentName(segmentId), 
name, segmentId);
-            final Segment shouldBeNull = segments.put(segmentId, newSegment);
-
-            if (shouldBeNull != null) {
-                throw new IllegalStateException("Segment already exists. 
Possible concurrent access.");
-            }
-
-            newSegment.openDB(context);
-            return newSegment;
-        }
-    }
-
-    void openExisting(final InternalProcessorContext context) {
-        try {
-            final File dir = new File(context.stateDir(), name);
-            if (dir.exists()) {
-                final String[] list = dir.list();
-                if (list != null) {
-                    final long[] segmentIds = new long[list.length];
-                    for (int i = 0; i < list.length; i++) {
-                        segmentIds[i] = segmentIdFromSegmentName(list[i], dir);
-                    }
-
-                    // open segments in the id order
-                    Arrays.sort(segmentIds);
-                    for (final long segmentId : segmentIds) {
-                        if (segmentId >= 0) {
-                            getOrCreateSegment(segmentId, context);
-                        }
-                    }
-                }
-            } else {
-                if (!dir.mkdir()) {
-                    throw new ProcessorStateException(String.format("dir %s 
doesn't exist and cannot be created for segments %s", dir, name));
-                }
-            }
-        } catch (final Exception ex) {
-            // ignore
-        }
-
-        final long minLiveSegment = segmentId(context.streamTime() - 
retentionPeriod);
-        cleanupEarlierThan(minLiveSegment);
-    }
-
-    List<Segment> segments(final long timeFrom, final long timeTo) {
-        final List<Segment> result = new ArrayList<>();
-        final NavigableMap<Long, Segment> segmentsInRange = segments.subMap(
-            segmentId(timeFrom), true,
-            segmentId(timeTo), true
-        );
-        for (final Segment segment : segmentsInRange.values()) {
-            if (segment.isOpen()) {
-                result.add(segment);
-            }
-        }
-        return result;
-    }
-
-    List<Segment> allSegments() {
-        final List<Segment> result = new ArrayList<>();
-        for (final Segment segment : segments.values()) {
-            if (segment.isOpen()) {
-                result.add(segment);
-            }
-        }
-        return result;
-    }
-
-    void flush() {
-        for (final Segment segment : segments.values()) {
-            segment.flush();
-        }
-    }
+interface Segments<S extends Segment> {
 
-    public void close() {
-        for (final Segment segment : segments.values()) {
-            segment.close();
-        }
-        segments.clear();
-    }
+    long segmentId(final long timestamp);
 
-    private void cleanupEarlierThan(final long minLiveSegment) {
-        final Iterator<Map.Entry<Long, Segment>> toRemove =
-            segments.headMap(minLiveSegment, false).entrySet().iterator();
+    String segmentName(final long segmentId);
 
-        while (toRemove.hasNext()) {
-            final Map.Entry<Long, Segment> next = toRemove.next();
-            toRemove.remove();
-            final Segment segment = next.getValue();
-            segment.close();
-            try {
-                segment.destroy();
-            } catch (final IOException e) {
-                log.error("Error destroying {}", segment, e);
-            }
-        }
-    }
+    S getSegmentForTimestamp(final long timestamp);
 
-    private long segmentIdFromSegmentName(final String segmentName,
-                                          final File parent) {
-        final int segmentSeparatorIndex = name.length();
-        final char segmentSeparator = 
segmentName.charAt(segmentSeparatorIndex);
-        final String segmentIdString = 
segmentName.substring(segmentSeparatorIndex + 1);
-        final long segmentId;
+    S getOrCreateSegmentIfLive(final long segmentId, final 
InternalProcessorContext context);
 
-        // old style segment name with date
-        if (segmentSeparator == '-') {
-            try {
-                segmentId = formatter.parse(segmentIdString).getTime() / 
segmentInterval;
-            } catch (final ParseException e) {
-                log.warn("Unable to parse segmentName {} to a date. This 
segment will be skipped", segmentName);
-                return -1L;
-            }
-            renameSegmentFile(parent, segmentName, segmentId);
-        } else {
-            // for both new formats (with : or .) parse segment ID identically
-            try {
-                segmentId = Long.parseLong(segmentIdString) / segmentInterval;
-            } catch (final NumberFormatException e) {
-                throw new ProcessorStateException("Unable to parse segment id 
as long from segmentName: " + segmentName);
-            }
+    S getOrCreateSegment(final long segmentId, final InternalProcessorContext 
context);
 
-            // intermediate segment name with : breaks KafkaStreams on Windows 
OS -> rename segment file to new name with .
-            if (segmentSeparator == ':') {
-                renameSegmentFile(parent, segmentName, segmentId);
-            }
-        }
+    void openExisting(final InternalProcessorContext context);
 
-        return segmentId;
+    List<S> segments(final long timeFrom, final long timeTo);
 
-    }
+    List<S> allSegments();
 
-    private void renameSegmentFile(final File parent,
-                                   final String segmentName,
-                                   final long segmentId) {
-        final File newName = new File(parent, segmentName(segmentId));
-        final File oldName = new File(parent, segmentName);
-        if (!oldName.renameTo(newName)) {
-            throw new ProcessorStateException("Unable to rename old style 
segment from: "
-                + oldName
-                + " to new name: "
-                + newName);
-        }
-    }
+    void flush();
 
-}
+    void close();
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
index 8ba78cc..6e99860 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.SessionWindow;
-import org.apache.kafka.streams.state.KeyValueIterator;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -69,29 +68,26 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
 
     @Override
     public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-        return new HasNextCondition() {
-            @Override
-            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                while (iterator.hasNext()) {
-                    final Bytes bytes = iterator.peekNextKey();
-                    final Windowed<Bytes> windowedKey = 
SessionKeySchema.from(bytes);
-                    if ((binaryKeyFrom == null || 
windowedKey.key().compareTo(binaryKeyFrom) >= 0)
-                        && (binaryKeyTo == null || 
windowedKey.key().compareTo(binaryKeyTo) <= 0)
-                        && windowedKey.window().end() >= from
-                        && windowedKey.window().start() <= to) {
-                        return true;
-                    }
-                    iterator.next();
+        return iterator -> {
+            while (iterator.hasNext()) {
+                final Bytes bytes = iterator.peekNextKey();
+                final Windowed<Bytes> windowedKey = 
SessionKeySchema.from(bytes);
+                if ((binaryKeyFrom == null || 
windowedKey.key().compareTo(binaryKeyFrom) >= 0)
+                    && (binaryKeyTo == null || 
windowedKey.key().compareTo(binaryKeyTo) <= 0)
+                    && windowedKey.window().end() >= from
+                    && windowedKey.window().start() <= to) {
+                    return true;
                 }
-                return false;
+                iterator.next();
             }
+            return false;
         };
     }
 
     @Override
-    public List<Segment> segmentsToSearch(final Segments segments,
-                                          final long from,
-                                          final long to) {
+    public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments,
+                                                        final long from,
+                                                        final long to) {
         return segments.segments(from, Long.MAX_VALUE);
     }
 
@@ -101,21 +97,21 @@ public class SessionKeySchema implements 
SegmentedBytesStore.KeySchema {
         return deserializer.deserialize(topic, extractKeyBytes(binaryKey));
     }
 
-    public static byte[] extractKeyBytes(final byte[] binaryKey) {
+    static byte[] extractKeyBytes(final byte[] binaryKey) {
         final byte[] bytes = new byte[binaryKey.length - 2 * TIMESTAMP_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
         return bytes;
     }
 
-    public static long extractEndTimestamp(final byte[] binaryKey) {
+    static long extractEndTimestamp(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 2 * 
TIMESTAMP_SIZE);
     }
 
-    public static long extractStartTimestamp(final byte[] binaryKey) {
+    static long extractStartTimestamp(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 
TIMESTAMP_SIZE);
     }
 
-    public static Window extractWindow(final byte[] binaryKey) {
+    static Window extractWindow(final byte[] binaryKey) {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE);
         final long end = buffer.getLong(binaryKey.length - 2 * TIMESTAMP_SIZE);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
index 0b3ba9e..f960b01 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 
 import java.nio.ByteBuffer;
@@ -66,29 +65,31 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
     }
 
     @Override
-    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom, final 
Bytes binaryKeyTo, final long from, final long to) {
-        return new HasNextCondition() {
-            @Override
-            public boolean hasNext(final KeyValueIterator<Bytes, ?> iterator) {
-                while (iterator.hasNext()) {
-                    final Bytes bytes = iterator.peekNextKey();
-                    final Bytes keyBytes = 
Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get()));
-                    final long time = 
WindowKeySchema.extractStoreTimestamp(bytes.get());
-                    if ((binaryKeyFrom == null || 
keyBytes.compareTo(binaryKeyFrom) >= 0)
-                        && (binaryKeyTo == null || 
keyBytes.compareTo(binaryKeyTo) <= 0)
-                        && time >= from
-                        && time <= to) {
-                        return true;
-                    }
-                    iterator.next();
+    public HasNextCondition hasNextCondition(final Bytes binaryKeyFrom,
+                                             final Bytes binaryKeyTo,
+                                             final long from,
+                                             final long to) {
+        return iterator -> {
+            while (iterator.hasNext()) {
+                final Bytes bytes = iterator.peekNextKey();
+                final Bytes keyBytes = 
Bytes.wrap(WindowKeySchema.extractStoreKeyBytes(bytes.get()));
+                final long time = 
WindowKeySchema.extractStoreTimestamp(bytes.get());
+                if ((binaryKeyFrom == null || 
keyBytes.compareTo(binaryKeyFrom) >= 0)
+                    && (binaryKeyTo == null || keyBytes.compareTo(binaryKeyTo) 
<= 0)
+                    && time >= from
+                    && time <= to) {
+                    return true;
                 }
-                return false;
+                iterator.next();
             }
+            return false;
         };
     }
 
     @Override
-    public List<Segment> segmentsToSearch(final Segments segments, final long 
from, final long to) {
+    public <S extends Segment> List<S> segmentsToSearch(final Segments<S> 
segments,
+                                                        final long from,
+                                                        final long to) {
         return segments.segments(from, to);
     }
 
@@ -96,8 +97,8 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
      * Safely construct a time window of the given size,
      * taking care of bounding endMs to Long.MAX_VALUE if necessary
      */
-    public static TimeWindow timeWindowForSize(final long startMs,
-                                               final long windowSize) {
+    static TimeWindow timeWindowForSize(final long startMs,
+                                        final long windowSize) {
         final long endMs = startMs + windowSize;
         return new TimeWindow(startMs, endMs < 0 ? Long.MAX_VALUE : endMs);
     }
@@ -175,24 +176,24 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
         return Bytes.wrap(buf.array());
     }
 
-    public static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
+    static byte[] extractStoreKeyBytes(final byte[] binaryKey) {
         final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - 
SEQNUM_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
         return bytes;
     }
 
-    public static <K> K extractStoreKey(final byte[] binaryKey,
-                                        final StateSerdes<K, ?> serdes) {
+    static <K> K extractStoreKey(final byte[] binaryKey,
+                                 final StateSerdes<K, ?> serdes) {
         final byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - 
SEQNUM_SIZE];
         System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
         return serdes.keyFrom(bytes);
     }
 
-    public static long extractStoreTimestamp(final byte[] binaryKey) {
+    static long extractStoreTimestamp(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - 
TIMESTAMP_SIZE - SEQNUM_SIZE);
     }
 
-    public static int extractStoreSequence(final byte[] binaryKey) {
+    static int extractStoreSequence(final byte[] binaryKey) {
         return ByteBuffer.wrap(binaryKey).getInt(binaryKey.length - 
SEQNUM_SIZE);
     }
 
@@ -205,15 +206,15 @@ public class WindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
         return new Windowed<>(key, window);
     }
 
-    public static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
-                                               final long windowSize) {
+    static Windowed<Bytes> fromStoreBytesKey(final byte[] binaryKey,
+                                             final long windowSize) {
         final Bytes key = Bytes.wrap(extractStoreKeyBytes(binaryKey));
         final Window window = extractStoreWindow(binaryKey, windowSize);
         return new Windowed<>(key, window);
     }
 
-    public static Window extractStoreWindow(final byte[] binaryKey,
-                                            final long windowSize) {
+    static Window extractStoreWindow(final byte[] binaryKey,
+                                     final long windowSize) {
         final ByteBuffer buffer = ByteBuffer.wrap(binaryKey);
         final long start = buffer.getLong(binaryKey.length - TIMESTAMP_SIZE - 
SEQNUM_SIZE);
         return timeWindowForSize(start, windowSize);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java
similarity index 86%
rename from 
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java
index db5f083..68bd815 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentIteratorTest.java
@@ -38,25 +38,25 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-public class SegmentIteratorTest {
+public class KeyValueSegmentIteratorTest {
 
-    private final Segment segmentOne = new Segment("one", "one", 0);
-    private final Segment segmentTwo = new Segment("two", "window", 1);
+    private final KeyValueSegment segmentOne = new KeyValueSegment("one", 
"one", 0);
+    private final KeyValueSegment segmentTwo = new KeyValueSegment("two", 
"window", 1);
     private final HasNextCondition hasNextCondition = Iterator::hasNext;
 
-    private SegmentIterator iterator = null;
+    private SegmentIterator<KeyValueSegment> iterator = null;
 
     @Before
     public void before() {
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            Serdes.String(),
-            Serdes.String(),
-            new NoOpRecordCollector(),
-            new ThreadCache(
-                new LogContext("testCache "),
-                0,
-                new MockStreamsMetrics(new Metrics())));
+                TestUtils.tempDirectory(),
+                Serdes.String(),
+                Serdes.String(),
+                new NoOpRecordCollector(),
+                new ThreadCache(
+                    new LogContext("testCache "),
+                    0,
+                    new MockStreamsMetrics(new Metrics())));
         segmentOne.openDB(context);
         segmentTwo.openDB(context);
         segmentOne.put(Bytes.wrap("a".getBytes()), "1".getBytes());
@@ -77,7 +77,7 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldIterateOverAllSegments() {
-        iterator = new SegmentIterator(
+        iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
             hasNextCondition,
             Bytes.wrap("a".getBytes()),
@@ -104,7 +104,7 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
-        iterator = new SegmentIterator(
+        iterator = new SegmentIterator<>(
             Collections.singletonList(segmentOne).iterator(),
             hasNextCondition,
             Bytes.wrap("a".getBytes()),
@@ -117,7 +117,7 @@ public class SegmentIteratorTest {
 
     @Test
     public void shouldOnlyIterateOverSegmentsInRange() {
-        iterator = new SegmentIterator(
+        iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
             hasNextCondition,
             Bytes.wrap("a".getBytes()),
@@ -136,7 +136,7 @@ public class SegmentIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnPeekNextKeyIfNoNext() {
-        iterator = new SegmentIterator(
+        iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
             hasNextCondition,
             Bytes.wrap("f".getBytes()),
@@ -147,7 +147,7 @@ public class SegmentIteratorTest {
 
     @Test(expected = NoSuchElementException.class)
     public void shouldThrowNoSuchElementOnNextIfNoNext() {
-        iterator = new SegmentIterator(
+        iterator = new SegmentIterator<>(
             Arrays.asList(segmentOne, segmentTwo).iterator(),
             hasNextCondition,
             Bytes.wrap("f".getBytes()),
@@ -159,4 +159,4 @@ public class SegmentIteratorTest {
     private KeyValue<String, String> toStringKeyValue(final KeyValue<Bytes, 
byte[]> binaryKv) {
         return KeyValue.pair(new String(binaryKv.key.get()), new 
String(binaryKv.value));
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
similarity index 86%
rename from 
streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
rename to 
streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
index efed24f..7c85134 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/KeyValueSegmentsTest.java
@@ -41,13 +41,13 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-public class SegmentsTest {
+public class KeyValueSegmentsTest {
 
     private static final int NUM_SEGMENTS = 5;
     private static final long SEGMENT_INTERVAL = 100L;
     private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
     private InternalMockProcessorContext context;
-    private Segments segments;
+    private KeyValueSegments segments;
     private File stateDirectory;
     private final String storeName = "test";
 
@@ -61,7 +61,7 @@ public class SegmentsTest {
             new NoOpRecordCollector(),
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
         );
-        segments = new Segments(storeName, RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments = new KeyValueSegments(storeName, RETENTION_PERIOD, 
SEGMENT_INTERVAL);
     }
 
     @After
@@ -79,7 +79,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldBaseSegmentIntervalOnRetentionAndNumSegments() {
-        final Segments segments = new Segments("test", 8 * SEGMENT_INTERVAL, 2 
* SEGMENT_INTERVAL);
+        final KeyValueSegments segments = new KeyValueSegments("test", 8 * 
SEGMENT_INTERVAL, 2 * SEGMENT_INTERVAL);
         assertEquals(0, segments.segmentId(0));
         assertEquals(0, segments.segmentId(SEGMENT_INTERVAL));
         assertEquals(1, segments.segmentId(2 * SEGMENT_INTERVAL));
@@ -94,9 +94,9 @@ public class SegmentsTest {
 
     @Test
     public void shouldCreateSegments() {
-        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
-        final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context);
+        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context);
+        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context);
+        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(2, 
context);
         assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
         assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
@@ -114,10 +114,10 @@ public class SegmentsTest {
 
     @Test
     public void shouldCleanupSegmentsThatHaveExpired() {
-        final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context);
+        final KeyValueSegment segment1 = segments.getOrCreateSegmentIfLive(0, 
context);
+        final KeyValueSegment segment2 = segments.getOrCreateSegmentIfLive(1, 
context);
         context.setStreamTime(SEGMENT_INTERVAL * 7);
-        final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context);
+        final KeyValueSegment segment3 = segments.getOrCreateSegmentIfLive(7, 
context);
         assertFalse(segment1.isOpen());
         assertFalse(segment2.isOpen());
         assertTrue(segment3.isOpen());
@@ -128,22 +128,22 @@ public class SegmentsTest {
 
     @Test
     public void shouldGetSegmentForTimestamp() {
-        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
+        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context);
         segments.getOrCreateSegmentIfLive(1, context);
         assertEquals(segment, segments.getSegmentForTimestamp(0L));
     }
 
     @Test
     public void shouldGetCorrectSegmentString() {
-        final Segment segment = segments.getOrCreateSegmentIfLive(0, context);
-        assertEquals("Segment(id=0, name=test.0)", segment.toString());
+        final KeyValueSegment segment = segments.getOrCreateSegmentIfLive(0, 
context);
+        assertEquals("KeyValueSegment(id=0, name=test.0)", segment.toString());
     }
 
     @Test
     public void shouldCloseAllOpenSegments() {
-        final Segment first = segments.getOrCreateSegmentIfLive(0, context);
-        final Segment second = segments.getOrCreateSegmentIfLive(1, context);
-        final Segment third = segments.getOrCreateSegmentIfLive(2, context);
+        final KeyValueSegment first = segments.getOrCreateSegmentIfLive(0, 
context);
+        final KeyValueSegment second = segments.getOrCreateSegmentIfLive(1, 
context);
+        final KeyValueSegment third = segments.getOrCreateSegmentIfLive(2, 
context);
         segments.close();
 
         assertFalse(first.isOpen());
@@ -153,7 +153,7 @@ public class SegmentsTest {
 
     @Test
     public void shouldOpenExistingSegments() {
-        segments = new Segments("test", 4, 1);
+        segments = new KeyValueSegments("test", 4, 1);
         segments.getOrCreateSegmentIfLive(0, context);
         segments.getOrCreateSegmentIfLive(1, context);
         segments.getOrCreateSegmentIfLive(2, context);
@@ -162,7 +162,7 @@ public class SegmentsTest {
         // close existing.
         segments.close();
 
-        segments = new Segments("test", 4, 1);
+        segments = new KeyValueSegments("test", 4, 1);
         segments.openExisting(context);
 
         assertTrue(segments.getSegmentForTimestamp(0).isOpen());
@@ -185,7 +185,7 @@ public class SegmentsTest {
         segments.getOrCreateSegmentIfLive(3, context);
         segments.getOrCreateSegmentIfLive(4, context);
 
-        final List<Segment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL);
+        final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
         assertEquals(0, segments.get(0).id);
         assertEquals(1, segments.get(1).id);
@@ -200,7 +200,7 @@ public class SegmentsTest {
         updateStreamTimeAndCreateSegment(1);
         updateStreamTimeAndCreateSegment(3);
 
-        final List<Segment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL);
+        final List<KeyValueSegment> segments = this.segments.segments(0, 2 * 
SEGMENT_INTERVAL);
         assertEquals(3, segments.size());
         assertEquals(0, segments.get(0).id);
         assertEquals(1, segments.get(1).id);
@@ -252,7 +252,7 @@ public class SegmentsTest {
     public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() 
throws Exception {
         final long segmentInterval = 60_000L; // the old segment file's naming 
system maxes out at 1 minute granularity.
 
-        segments = new Segments(storeName, NUM_SEGMENTS * segmentInterval, 
segmentInterval);
+        segments = new KeyValueSegments(storeName, NUM_SEGMENTS * 
segmentInterval, segmentInterval);
 
         final String storeDirectoryPath = stateDirectory.getAbsolutePath() + 
File.separator + storeName;
         final File storeDirectory = new File(storeDirectoryPath);
@@ -306,7 +306,7 @@ public class SegmentsTest {
     }
 
     private void verifyCorrectSegments(final long first, final int 
numSegments) {
-        final List<Segment> result = this.segments.segments(0, Long.MAX_VALUE);
+        final List<KeyValueSegment> result = this.segments.segments(0, 
Long.MAX_VALUE);
         assertEquals(numSegments, result.size());
         for (int i = 0; i < numSegments; i++) {
             assertEquals(i + first, result.get(i).id);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
index 0b9d66d..8097d74 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java
@@ -178,7 +178,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldRollSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, 
segmentInterval);
+        final KeyValueSegments segments = new KeyValueSegments(storeName, 
retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50));
@@ -206,7 +206,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldGetAllSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, 
segmentInterval);
+        final KeyValueSegments segments = new KeyValueSegments(storeName, 
retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
@@ -235,7 +235,7 @@ public class RocksDBSegmentedBytesStoreTest {
     @Test
     public void shouldFetchAllSegments() {
         // just to validate directories
-        final Segments segments = new Segments(storeName, retention, 
segmentInterval);
+        final KeyValueSegments segments = new KeyValueSegments(storeName, 
retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
@@ -263,7 +263,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Test
     public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
-        final Segments segments = new Segments(storeName, retention, 
segmentInterval);
+        final KeyValueSegments segments = new KeyValueSegments(storeName, 
retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
@@ -304,7 +304,7 @@ public class RocksDBSegmentedBytesStoreTest {
 
     @Test
     public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
-        final Segments segments = new Segments(storeName, retention, 
segmentInterval);
+        final KeyValueSegments segments = new KeyValueSegments(storeName, 
retention, segmentInterval);
         final String key = "a";
 
         bytesStore.put(serializeKey(new Windowed<>(key, windows[0])), 
serializeValue(50L));
@@ -355,7 +355,7 @@ public class RocksDBSegmentedBytesStoreTest {
         final Collection<KeyValue<byte[], byte[]>> records = new ArrayList<>();
         records.add(new KeyValue<>(serializeKey(new Windowed<>(key, 
windows[0])).get(), serializeValue(50L)));
         records.add(new KeyValue<>(serializeKey(new Windowed<>(key, 
windows[3])).get(), serializeValue(100L)));
-        final Map<Segment, WriteBatch> writeBatchMap = 
bytesStore.getWriteBatches(records);
+        final Map<KeyValueSegment, WriteBatch> writeBatchMap = 
bytesStore.getWriteBatches(records);
         assertEquals(2, writeBatchMap.size());
         for (final WriteBatch batch : writeBatchMap.values()) {
             assertEquals(1, batch.count());
@@ -376,7 +376,7 @@ public class RocksDBSegmentedBytesStoreTest {
         assertEquals(2, bytesStore.getSegments().size());
 
         // Bulk loading is enabled during recovery.
-        for (final Segment segment : bytesStore.getSegments()) {
+        for (final KeyValueSegment segment : bytesStore.getSegments()) {
             
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), 
equalTo(1 << 30));
         }
 
@@ -400,12 +400,12 @@ public class RocksDBSegmentedBytesStoreTest {
 
         restoreListener.onRestoreStart(null, bytesStore.name(), 0L, 0L);
 
-        for (final Segment segment : bytesStore.getSegments()) {
+        for (final KeyValueSegment segment : bytesStore.getSegments()) {
             
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), 
equalTo(1 << 30));
         }
 
         restoreListener.onRestoreEnd(null, bytesStore.name(), 0L);
-        for (final Segment segment : bytesStore.getSegments()) {
+        for (final KeyValueSegment segment : bytesStore.getSegments()) {
             
Assert.assertThat(segment.getOptions().level0FileNumCompactionTrigger(), 
equalTo(4));
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index c155a83..2666f5f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -74,7 +74,7 @@ public class RocksDBWindowStoreTest {
     private final long segmentInterval = 60_000L;
     private final long retentionPeriod = segmentInterval * (numSegments - 1);
     private final String windowName = "window";
-    private final Segments segments = new Segments(windowName, 
retentionPeriod, segmentInterval);
+    private final KeyValueSegments segments = new KeyValueSegments(windowName, 
retentionPeriod, segmentInterval);
     private final StateSerdes<Integer, String> serdes = new StateSerdes<>("", 
Serdes.Integer(), Serdes.String());
 
     private final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
index 8112080..8824a94 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/WindowKeySchemaTest.java
@@ -30,16 +30,16 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.KeyValueIteratorStub;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
 public class WindowKeySchemaTest {
 
     final private String key = "key";

Reply via email to