This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da0c20756ea KAFKA-20134: Implement TimestampedWindowStoreWithHeaders 
(1/N) (#21465)
da0c20756ea is described below

commit da0c20756ead60aa912e6ebf1cbea2e75252af50
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 20 04:59:40 2026 +0000

    KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (1/N) (#21465)
    
    This PR adds `TimestampedSegmentWithHeaders`,
    `TimestampedSegmentsWithHeaders` and the corresponding  unit tests for
    the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
    
    Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
     <[email protected]>
---
 ...BTimestampedSegmentedBytesStoreWithHeaders.java |  36 ++++
 .../internals/TimestampedSegmentWithHeaders.java   |  90 ++++++++++
 .../internals/TimestampedSegmentsWithHeaders.java  |  70 ++++++++
 ...estampedSegmentedBytesStoreWithHeadersTest.java |  38 ++++
 .../TimestampedSegmentWithHeadersTest.java         | 140 +++++++++++++++
 .../TimestampedSegmentsWithHeadersTest.java        | 197 +++++++++++++++++++++
 6 files changed, 571 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..9c4fd502274
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+/**
+ * A RocksDB-backed segmented bytes store with timestamp and headers support.
+ * <p>
+ * This store uses {@link TimestampedSegmentsWithHeaders} to manage segments,
+ * where each segment is a {@link TimestampedSegmentWithHeaders} that extends
+ * {@link RocksDBTimestampedStoreWithHeaders}. This provides automatic dual-CF
+ * migration support from timestamp-only format to timestamp+headers format.
+ */
+public class RocksDBTimestampedSegmentedBytesStoreWithHeaders extends 
AbstractRocksDBSegmentedBytesStore<TimestampedSegmentWithHeaders> {
+
+    RocksDBTimestampedSegmentedBytesStoreWithHeaders(final String name,
+                                                     final String metricsScope,
+                                                     final long retention,
+                                                     final long 
segmentInterval,
+                                                     final KeySchema 
keySchema) {
+        super(name, retention, keySchema, new 
TimestampedSegmentsWithHeaders(name, metricsScope, retention, segmentInterval));
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
new file mode 100644
index 00000000000..aeba24d30e4
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores timestamped key-value pairs with headers.
+ * <p>
+ * This segment extends {@link RocksDBTimestampedStoreWithHeaders} to provide
+ * header-aware storage with dual-column-family migration support from
+ * timestamp-only format to timestamp+headers format.
+ */
+class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
+    implements Comparable<TimestampedSegmentWithHeaders>, Segment {
+
+    public final long id;
+
+    TimestampedSegmentWithHeaders(final String segmentName,
+                                  final String windowName,
+                                  final long id,
+                                  final Position position,
+                                  final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(segmentName, windowName, metricsRecorder);
+        this.id = id;
+        this.position = position;
+    }
+
+    @Override
+    public void destroy() throws IOException {
+        Utils.delete(dbDir);
+    }
+
+    @Override
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int compareTo(final TimestampedSegmentWithHeaders segment) {
+        return Long.compare(id, segment.id);
+    }
+
+    @Override
+    public void openDB(final Map<String, Object> configs, final File stateDir) 
{
+        super.openDB(configs, stateDir);
+        // skip the registering step
+    }
+
+    @Override
+    public String toString() {
+        return "TimestampedSegmentWithHeaders(id=" + id + ", name=" + name() + 
")";
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final TimestampedSegmentWithHeaders segment = 
(TimestampedSegmentWithHeaders) obj;
+        return id == segment.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
new file mode 100644
index 00000000000..4e770cc24bd
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages the {@link TimestampedSegmentWithHeaders}s that are used by the 
{@link RocksDBTimestampedSegmentedBytesStoreWithHeaders}.
+ */
+class TimestampedSegmentsWithHeaders extends 
AbstractSegments<TimestampedSegmentWithHeaders> {
+
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    TimestampedSegmentsWithHeaders(final String name,
+                                   final String metricsScope,
+                                   final long retentionPeriod,
+                                   final long segmentInterval) {
+        super(name, retentionPeriod, segmentInterval);
+        metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+    }
+
+    @Override
+    public TimestampedSegmentWithHeaders getOrCreateSegment(final long 
segmentId,
+                                                            final 
StateStoreContext context) {
+        if (segments.containsKey(segmentId)) {
+            return segments.get(segmentId);
+        } else {
+            final TimestampedSegmentWithHeaders newSegment =
+                new TimestampedSegmentWithHeaders(segmentName(segmentId), 
name, segmentId, position, metricsRecorder);
+
+            if (segments.put(segmentId, newSegment) != null) {
+                throw new IllegalStateException("TimestampedSegmentWithHeaders 
already exists. Possible concurrent access.");
+            }
+
+            newSegment.openDB(context.appConfigs(), context.stateDir());
+            return newSegment;
+        }
+    }
+
+    @Override
+    public TimestampedSegmentWithHeaders getOrCreateSegmentIfLive(final long 
segmentId,
+                                                                   final 
StateStoreContext context,
+                                                                   final long 
streamTime) {
+        final TimestampedSegmentWithHeaders segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+        cleanupExpiredSegments(streamTime);
+        return segment;
+    }
+
+    @Override
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
+        metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
+        super.openExisting(context, streamTime);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
new file mode 100644
index 00000000000..92463bb4703
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimestampedSegmentedBytesStoreWithHeadersTest
+    extends 
AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegmentWithHeaders> {
+
+    private static final String METRICS_SCOPE = "metrics-scope";
+
+    RocksDBTimestampedSegmentedBytesStoreWithHeaders getBytesStore() {
+        return new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+            storeName,
+            METRICS_SCOPE,
+            retention,
+            segmentInterval,
+            schema
+        );
+    }
+
+    @Override
+    TimestampedSegmentsWithHeaders newSegments() {
+        return new TimestampedSegmentsWithHeaders(storeName, METRICS_SCOPE, 
retention, segmentInterval);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
new file mode 100644
index 00000000000..007288409cf
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedSegmentWithHeadersTest {
+
+    private final RocksDBMetricsRecorder metricsRecorder =
+        new RocksDBMetricsRecorder("metrics-scope", "store-name");
+
+    @BeforeEach
+    public void setUp() {
+        metricsRecorder.init(
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
+            new TaskId(0, 0)
+        );
+    }
+
+    @Test
+    public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
+        final TimestampedSegmentWithHeaders segment =
+            new TimestampedSegmentWithHeaders("segment", "window", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final String directoryPath = 
TestUtils.tempDirectory().getAbsolutePath();
+        final File directory = new File(directoryPath);
+
+        segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), 
directory);
+
+        assertTrue(new File(directoryPath, "window").exists());
+        assertTrue(new File(directoryPath + File.separator + "window", 
"segment").exists());
+        assertTrue(new File(directoryPath + File.separator + "window", 
"segment").list().length > 0);
+        segment.destroy();
+        assertFalse(new File(directoryPath + File.separator + "window", 
"segment").exists());
+        assertTrue(new File(directoryPath, "window").exists());
+
+        segment.close();
+    }
+
+    @Test
+    public void shouldBeEqualIfIdIsEqual() {
+        final TimestampedSegmentWithHeaders segment =
+            new TimestampedSegmentWithHeaders("anyName", "anyName", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segmentSameId =
+            new TimestampedSegmentWithHeaders("someOtherName", 
"someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segmentDifferentId =
+            new TimestampedSegmentWithHeaders("anyName", "anyName", 1L, 
Position.emptyPosition(), metricsRecorder);
+
+        assertEquals(segment, segment);
+        assertEquals(segment, segmentSameId);
+        assertNotEquals(segment, segmentDifferentId);
+        assertNotEquals(segment, null);
+        assertNotEquals(segment, "anyName");
+
+        segment.close();
+        segmentSameId.close();
+        segmentDifferentId.close();
+    }
+
+    @Test
+    public void shouldHashOnSegmentIdOnly() {
+        final TimestampedSegmentWithHeaders segment =
+            new TimestampedSegmentWithHeaders("anyName", "anyName", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segmentSameId =
+            new TimestampedSegmentWithHeaders("someOtherName", 
"someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segmentDifferentId =
+            new TimestampedSegmentWithHeaders("anyName", "anyName", 1L, 
Position.emptyPosition(), metricsRecorder);
+
+        final Set<TimestampedSegmentWithHeaders> set = new HashSet<>();
+        assertTrue(set.add(segment));
+        assertFalse(set.add(segmentSameId));
+        assertTrue(set.add(segmentDifferentId));
+
+        segment.close();
+        segmentSameId.close();
+        segmentDifferentId.close();
+    }
+
+    @Test
+    public void shouldCompareSegmentIdOnly() {
+        final TimestampedSegmentWithHeaders segment1 =
+            new TimestampedSegmentWithHeaders("a", "C", 50L, 
Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segment2 =
+            new TimestampedSegmentWithHeaders("b", "B", 100L, 
Position.emptyPosition(), metricsRecorder);
+        final TimestampedSegmentWithHeaders segment3 =
+            new TimestampedSegmentWithHeaders("c", "A", 0L, 
Position.emptyPosition(), metricsRecorder);
+
+        assertEquals(0, segment1.compareTo(segment1));
+        assertEquals(-1, segment1.compareTo(segment2));
+        assertEquals(1, segment2.compareTo(segment1));
+        assertEquals(1, segment1.compareTo(segment3));
+        assertEquals(-1, segment3.compareTo(segment1));
+        assertEquals(1, segment2.compareTo(segment3));
+        assertEquals(-1, segment3.compareTo(segment2));
+
+        segment1.close();
+        segment2.close();
+        segment3.close();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
new file mode 100644
index 00000000000..1f580b3a601
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimestampedSegmentsWithHeadersTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String METRICS_SCOPE = "test-state-id";
+    private InternalMockProcessorContext context;
+    private TimestampedSegmentsWithHeaders segments;
+    private File stateDirectory;
+    private final String storeName = "test";
+
+    @BeforeEach
+    public void createContext() {
+        stateDirectory = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            stateDirectory,
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
+        );
+        segments = new TimestampedSegmentsWithHeaders(storeName, 
METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments.openExisting(context, -1L);
+    }
+
+    @AfterEach
+    public void close() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldGetSegmentNameFromId() {
+        assertEquals("test.0", segments.segmentName(0));
+        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final TimestampedSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final TimestampedSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final TimestampedSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        assertFalse(segment1.isOpen());
+        assertFalse(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
+        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
+        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        assertEquals(segment, segments.segmentForTimestamp(0L));
+    }
+
+    @Test
+    public void shouldGetCorrectSegmentString() {
+        final TimestampedSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
+    }
+
+    @Test
+    public void shouldCloseAllOpenSegments() {
+        final TimestampedSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final TimestampedSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final TimestampedSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.close();
+
+        assertFalse(first.isOpen());
+        assertFalse(second.isOpen());
+        assertFalse(third.isOpen());
+    }
+
+    @Test
+    public void shouldOpenExistingSegments() {
+        segments = new TimestampedSegmentsWithHeaders("test", METRICS_SCOPE, 
4, 1);
+        segments.openExisting(context, -1L);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.getOrCreateSegmentIfLive(3, context, -1L);
+        segments.getOrCreateSegmentIfLive(4, context, -1L);
+        // close existing.
+        segments.close();
+
+        segments = new TimestampedSegmentsWithHeaders("test", METRICS_SCOPE, 
4, 1);
+        segments.openExisting(context, -1L);
+
+        assertTrue(segments.segmentForTimestamp(0).isOpen());
+        assertTrue(segments.segmentForTimestamp(1).isOpen());
+        assertTrue(segments.segmentForTimestamp(2).isOpen());
+        assertTrue(segments.segmentForTimestamp(3).isOpen());
+        assertTrue(segments.segmentForTimestamp(4).isOpen());
+    }
+
+    @Test
+    public void shouldGetSegmentsWithinTimeRange() {
+        updateStreamTimeAndCreateSegment(0);
+        updateStreamTimeAndCreateSegment(1);
+        updateStreamTimeAndCreateSegment(2);
+        updateStreamTimeAndCreateSegment(3);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
+        segments.getOrCreateSegmentIfLive(0, context, streamTime);
+        segments.getOrCreateSegmentIfLive(1, context, streamTime);
+        segments.getOrCreateSegmentIfLive(2, context, streamTime);
+        segments.getOrCreateSegmentIfLive(3, context, streamTime);
+        segments.getOrCreateSegmentIfLive(4, context, streamTime);
+
+        final List<TimestampedSegmentWithHeaders> segments = 
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
+        assertEquals(3, segments.size());
+        assertEquals(0, segments.get(0).id);
+        assertEquals(1, segments.get(1).id);
+        assertEquals(2, segments.get(2).id);
+    }
+
+    @Test
+    public void shouldClearSegmentsOnClose() {
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.close();
+        assertNull(segments.segmentForTimestamp(0));
+    }
+
+    private long updateStreamTimeAndCreateSegment(final int segment) {
+        final long streamTime = SEGMENT_INTERVAL * segment;
+        segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+        return streamTime;
+    }
+}

Reply via email to