This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new da0c20756ea KAFKA-20134: Implement TimestampedWindowStoreWithHeaders
(1/N) (#21465)
da0c20756ea is described below
commit da0c20756ead60aa912e6ebf1cbea2e75252af50
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 20 04:59:40 2026 +0000
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (1/N) (#21465)
This PR adds `TimestampedSegmentWithHeaders`,
`TimestampedSegmentsWithHeaders` and the corresponding unit tests for
the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
Reviewers: Alieh Saeedi <[email protected]>, Matthias J. Sax
<[email protected]>
---
...BTimestampedSegmentedBytesStoreWithHeaders.java | 36 ++++
.../internals/TimestampedSegmentWithHeaders.java | 90 ++++++++++
.../internals/TimestampedSegmentsWithHeaders.java | 70 ++++++++
...estampedSegmentedBytesStoreWithHeadersTest.java | 38 ++++
.../TimestampedSegmentWithHeadersTest.java | 140 +++++++++++++++
.../TimestampedSegmentsWithHeadersTest.java | 197 +++++++++++++++++++++
6 files changed, 571 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..9c4fd502274
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeaders.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+/**
+ * A RocksDB-backed segmented bytes store with timestamp and headers support.
+ * <p>
+ * This store uses {@link TimestampedSegmentsWithHeaders} to manage segments,
+ * where each segment is a {@link TimestampedSegmentWithHeaders} that extends
+ * {@link RocksDBTimestampedStoreWithHeaders}. This provides automatic dual-CF
+ * migration support from timestamp-only format to timestamp+headers format.
+ */
+public class RocksDBTimestampedSegmentedBytesStoreWithHeaders extends
AbstractRocksDBSegmentedBytesStore<TimestampedSegmentWithHeaders> {
+
+ RocksDBTimestampedSegmentedBytesStoreWithHeaders(final String name,
+ final String metricsScope,
+ final long retention,
+ final long
segmentInterval,
+ final KeySchema
keySchema) {
+ super(name, retention, keySchema, new
TimestampedSegmentsWithHeaders(name, metricsScope, retention, segmentInterval));
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
new file mode 100644
index 00000000000..aeba24d30e4
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeaders.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores timestamped key-value pairs with headers.
+ * <p>
+ * This segment extends {@link RocksDBTimestampedStoreWithHeaders} to provide
+ * header-aware storage with dual-column-family migration support from
+ * timestamp-only format to timestamp+headers format.
+ */
+class TimestampedSegmentWithHeaders extends RocksDBTimestampedStoreWithHeaders
+ implements Comparable<TimestampedSegmentWithHeaders>, Segment {
+
+ public final long id;
+
+ TimestampedSegmentWithHeaders(final String segmentName,
+ final String windowName,
+ final long id,
+ final Position position,
+ final RocksDBMetricsRecorder
metricsRecorder) {
+ super(segmentName, windowName, metricsRecorder);
+ this.id = id;
+ this.position = position;
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ Utils.delete(dbDir);
+ }
+
+ @Override
+ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int compareTo(final TimestampedSegmentWithHeaders segment) {
+ return Long.compare(id, segment.id);
+ }
+
+ @Override
+ public void openDB(final Map<String, Object> configs, final File stateDir)
{
+ super.openDB(configs, stateDir);
+ // skip the registering step
+ }
+
+ @Override
+ public String toString() {
+ return "TimestampedSegmentWithHeaders(id=" + id + ", name=" + name() +
")";
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final TimestampedSegmentWithHeaders segment =
(TimestampedSegmentWithHeaders) obj;
+ return id == segment.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
new file mode 100644
index 00000000000..4e770cc24bd
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages the {@link TimestampedSegmentWithHeaders}s that are used by the
{@link RocksDBTimestampedSegmentedBytesStoreWithHeaders}.
+ */
+class TimestampedSegmentsWithHeaders extends
AbstractSegments<TimestampedSegmentWithHeaders> {
+
+ private final RocksDBMetricsRecorder metricsRecorder;
+
+ TimestampedSegmentsWithHeaders(final String name,
+ final String metricsScope,
+ final long retentionPeriod,
+ final long segmentInterval) {
+ super(name, retentionPeriod, segmentInterval);
+ metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+ }
+
+ @Override
+ public TimestampedSegmentWithHeaders getOrCreateSegment(final long
segmentId,
+ final
StateStoreContext context) {
+ if (segments.containsKey(segmentId)) {
+ return segments.get(segmentId);
+ } else {
+ final TimestampedSegmentWithHeaders newSegment =
+ new TimestampedSegmentWithHeaders(segmentName(segmentId),
name, segmentId, position, metricsRecorder);
+
+ if (segments.put(segmentId, newSegment) != null) {
+ throw new IllegalStateException("TimestampedSegmentWithHeaders
already exists. Possible concurrent access.");
+ }
+
+ newSegment.openDB(context.appConfigs(), context.stateDir());
+ return newSegment;
+ }
+ }
+
+ @Override
+ public TimestampedSegmentWithHeaders getOrCreateSegmentIfLive(final long
segmentId,
+ final
StateStoreContext context,
+ final long
streamTime) {
+ final TimestampedSegmentWithHeaders segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+ cleanupExpiredSegments(streamTime);
+ return segment;
+ }
+
+ @Override
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
+ metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
+ super.openExisting(context, streamTime);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
new file mode 100644
index 00000000000..92463bb4703
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedSegmentedBytesStoreWithHeadersTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+public class RocksDBTimestampedSegmentedBytesStoreWithHeadersTest
+ extends
AbstractRocksDBSegmentedBytesStoreTest<TimestampedSegmentWithHeaders> {
+
+ private static final String METRICS_SCOPE = "metrics-scope";
+
+ RocksDBTimestampedSegmentedBytesStoreWithHeaders getBytesStore() {
+ return new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+ storeName,
+ METRICS_SCOPE,
+ retention,
+ segmentInterval,
+ schema
+ );
+ }
+
+ @Override
+ TimestampedSegmentsWithHeaders newSegments() {
+ return new TimestampedSegmentsWithHeaders(storeName, METRICS_SCOPE,
retention, segmentInterval);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
new file mode 100644
index 00000000000..007288409cf
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentWithHeadersTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class TimestampedSegmentWithHeadersTest {
+
+ private final RocksDBMetricsRecorder metricsRecorder =
+ new RocksDBMetricsRecorder("metrics-scope", "store-name");
+
+ @BeforeEach
+ public void setUp() {
+ metricsRecorder.init(
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
+ new TaskId(0, 0)
+ );
+ }
+
+ @Test
+ public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
+ final TimestampedSegmentWithHeaders segment =
+ new TimestampedSegmentWithHeaders("segment", "window", 0L,
Position.emptyPosition(), metricsRecorder);
+ final String directoryPath =
TestUtils.tempDirectory().getAbsolutePath();
+ final File directory = new File(directoryPath);
+
+ segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")),
directory);
+
+ assertTrue(new File(directoryPath, "window").exists());
+ assertTrue(new File(directoryPath + File.separator + "window",
"segment").exists());
+ assertTrue(new File(directoryPath + File.separator + "window",
"segment").list().length > 0);
+ segment.destroy();
+ assertFalse(new File(directoryPath + File.separator + "window",
"segment").exists());
+ assertTrue(new File(directoryPath, "window").exists());
+
+ segment.close();
+ }
+
+ @Test
+ public void shouldBeEqualIfIdIsEqual() {
+ final TimestampedSegmentWithHeaders segment =
+ new TimestampedSegmentWithHeaders("anyName", "anyName", 0L,
Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segmentSameId =
+ new TimestampedSegmentWithHeaders("someOtherName",
"someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segmentDifferentId =
+ new TimestampedSegmentWithHeaders("anyName", "anyName", 1L,
Position.emptyPosition(), metricsRecorder);
+
+ assertEquals(segment, segment);
+ assertEquals(segment, segmentSameId);
+ assertNotEquals(segment, segmentDifferentId);
+ assertNotEquals(segment, null);
+ assertNotEquals(segment, "anyName");
+
+ segment.close();
+ segmentSameId.close();
+ segmentDifferentId.close();
+ }
+
+ @Test
+ public void shouldHashOnSegmentIdOnly() {
+ final TimestampedSegmentWithHeaders segment =
+ new TimestampedSegmentWithHeaders("anyName", "anyName", 0L,
Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segmentSameId =
+ new TimestampedSegmentWithHeaders("someOtherName",
"someOtherName", 0L, Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segmentDifferentId =
+ new TimestampedSegmentWithHeaders("anyName", "anyName", 1L,
Position.emptyPosition(), metricsRecorder);
+
+ final Set<TimestampedSegmentWithHeaders> set = new HashSet<>();
+ assertTrue(set.add(segment));
+ assertFalse(set.add(segmentSameId));
+ assertTrue(set.add(segmentDifferentId));
+
+ segment.close();
+ segmentSameId.close();
+ segmentDifferentId.close();
+ }
+
+ @Test
+ public void shouldCompareSegmentIdOnly() {
+ final TimestampedSegmentWithHeaders segment1 =
+ new TimestampedSegmentWithHeaders("a", "C", 50L,
Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segment2 =
+ new TimestampedSegmentWithHeaders("b", "B", 100L,
Position.emptyPosition(), metricsRecorder);
+ final TimestampedSegmentWithHeaders segment3 =
+ new TimestampedSegmentWithHeaders("c", "A", 0L,
Position.emptyPosition(), metricsRecorder);
+
+ assertEquals(0, segment1.compareTo(segment1));
+ assertEquals(-1, segment1.compareTo(segment2));
+ assertEquals(1, segment2.compareTo(segment1));
+ assertEquals(1, segment1.compareTo(segment3));
+ assertEquals(-1, segment3.compareTo(segment1));
+ assertEquals(1, segment2.compareTo(segment3));
+ assertEquals(-1, segment3.compareTo(segment2));
+
+ segment1.close();
+ segment2.close();
+ segment3.close();
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
new file mode 100644
index 00000000000..1f580b3a601
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedSegmentsWithHeadersTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimestampedSegmentsWithHeadersTest {
+
+ private static final long SEGMENT_INTERVAL = 100L;
+ private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+ private static final String METRICS_SCOPE = "test-state-id";
+ private InternalMockProcessorContext context;
+ private TimestampedSegmentsWithHeaders segments;
+ private File stateDirectory;
+ private final String storeName = "test";
+
+ @BeforeEach
+ public void createContext() {
+ stateDirectory = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ stateDirectory,
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
+ );
+ segments = new TimestampedSegmentsWithHeaders(storeName,
METRICS_SCOPE, RETENTION_PERIOD, SEGMENT_INTERVAL);
+ segments.openExisting(context, -1L);
+ }
+
+ @AfterEach
+ public void close() {
+ segments.close();
+ }
+
+ @Test
+ public void shouldGetSegmentIdsFromTimestamp() {
+ assertEquals(0, segments.segmentId(0));
+ assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+ assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+ assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+ }
+
+ @Test
+ public void shouldGetSegmentNameFromId() {
+ assertEquals("test.0", segments.segmentName(0));
+ assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+ assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
+ }
+
+ @Test
+ public void shouldCreateSegments() {
+ final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+ assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(segment1.isOpen());
+ assertTrue(segment2.isOpen());
+ assertTrue(segment3.isOpen());
+ }
+
+ @Test
+ public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+ final long streamTime = updateStreamTimeAndCreateSegment(7);
+ assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
+ }
+
+ @Test
+ public void shouldCleanupSegmentsThatHaveExpired() {
+ final TimestampedSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final TimestampedSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final TimestampedSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+ assertFalse(segment1.isOpen());
+ assertFalse(segment2.isOpen());
+ assertTrue(segment3.isOpen());
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
+ assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
+ assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
+ }
+
+ @Test
+ public void shouldGetSegmentForTimestamp() {
+ final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.getOrCreateSegmentIfLive(1, context, -1L);
+ assertEquals(segment, segments.segmentForTimestamp(0L));
+ }
+
+ @Test
+ public void shouldGetCorrectSegmentString() {
+ final TimestampedSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ assertEquals("TimestampedSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
+ }
+
+ @Test
+ public void shouldCloseAllOpenSegments() {
+ final TimestampedSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final TimestampedSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final TimestampedSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
+ segments.close();
+
+ assertFalse(first.isOpen());
+ assertFalse(second.isOpen());
+ assertFalse(third.isOpen());
+ }
+
+ @Test
+ public void shouldOpenExistingSegments() {
+ segments = new TimestampedSegmentsWithHeaders("test", METRICS_SCOPE,
4, 1);
+ segments.openExisting(context, -1L);
+ segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.getOrCreateSegmentIfLive(1, context, -1L);
+ segments.getOrCreateSegmentIfLive(2, context, -1L);
+ segments.getOrCreateSegmentIfLive(3, context, -1L);
+ segments.getOrCreateSegmentIfLive(4, context, -1L);
+ // close existing.
+ segments.close();
+
+ segments = new TimestampedSegmentsWithHeaders("test", METRICS_SCOPE,
4, 1);
+ segments.openExisting(context, -1L);
+
+ assertTrue(segments.segmentForTimestamp(0).isOpen());
+ assertTrue(segments.segmentForTimestamp(1).isOpen());
+ assertTrue(segments.segmentForTimestamp(2).isOpen());
+ assertTrue(segments.segmentForTimestamp(3).isOpen());
+ assertTrue(segments.segmentForTimestamp(4).isOpen());
+ }
+
+ @Test
+ public void shouldGetSegmentsWithinTimeRange() {
+ updateStreamTimeAndCreateSegment(0);
+ updateStreamTimeAndCreateSegment(1);
+ updateStreamTimeAndCreateSegment(2);
+ updateStreamTimeAndCreateSegment(3);
+ final long streamTime = updateStreamTimeAndCreateSegment(4);
+ segments.getOrCreateSegmentIfLive(0, context, streamTime);
+ segments.getOrCreateSegmentIfLive(1, context, streamTime);
+ segments.getOrCreateSegmentIfLive(2, context, streamTime);
+ segments.getOrCreateSegmentIfLive(3, context, streamTime);
+ segments.getOrCreateSegmentIfLive(4, context, streamTime);
+
+ final List<TimestampedSegmentWithHeaders> segments =
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
+ assertEquals(3, segments.size());
+ assertEquals(0, segments.get(0).id);
+ assertEquals(1, segments.get(1).id);
+ assertEquals(2, segments.get(2).id);
+ }
+
+ @Test
+ public void shouldClearSegmentsOnClose() {
+ segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.close();
+ assertNull(segments.segmentForTimestamp(0));
+ }
+
+ private long updateStreamTimeAndCreateSegment(final int segment) {
+ final long streamTime = SEGMENT_INTERVAL * segment;
+ segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+ return streamTime;
+ }
+}