This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 661e41c92f9 KAFKA-16302: Remove check for log message that is no
longer present (fix builds) (#15422)
661e41c92f9 is described below
commit 661e41c92f9cee4ea97835de40ecf328330465f5
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Feb 22 17:10:11 2024 -0800
KAFKA-16302: Remove check for log message that is no longer present (fix
builds) (#15422)
a3528a3 removed this log but not the test asserting it.
Builds are currently red because for some reason these tests can't retry.
We should address that as a followup.
Reviewers: Greg Harris <[email protected]>, Matthias J. Sax
<[email protected]>
---
...actDualSchemaRocksDBSegmentedBytesStoreTest.java | 21 +++++++--------------
.../AbstractRocksDBSegmentedBytesStoreTest.java | 21 +++++++--------------
.../internals/AbstractSessionBytesStoreTest.java | 19 +++++++------------
.../internals/AbstractWindowBytesStoreTest.java | 17 ++++++-----------
4 files changed, 27 insertions(+), 51 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index c81e57589a0..c7af25726a4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -45,7 +45,6 @@ import
org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.Task.TaskType;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
@@ -83,7 +82,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -1579,7 +1577,7 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
@Test
- public void shouldLogAndMeasureExpiredRecords() {
+ public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore =
getBytesStore();
final InternalMockProcessorContext context = new
InternalMockProcessorContext(
@@ -1590,18 +1588,13 @@ public abstract class
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
- // write a record to advance stream time, with a high enough
timestamp
- // that the subsequent record in windows[0] will already be
expired.
- bytesStore.put(serializeKey(new Windowed<>("dummy",
nextSegmentWindow)), serializeValue(0));
+ // write a record to advance stream time, with a high enough timestamp
+ // that the subsequent record in windows[0] will already be expired.
+ bytesStore.put(serializeKey(new Windowed<>("dummy",
nextSegmentWindow)), serializeValue(0));
- final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
- final byte[] value = serializeValue(5);
- bytesStore.put(key, value);
-
- final List<String> messages = appender.getMessages();
- assertThat(messages, hasItem("Skipping record for expired
segment."));
- }
+ final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+ final byte[] value = serializeValue(5);
+ bytesStore.put(key, value);
final Map<MetricName, ? extends Metric> metrics =
context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 852f6fd3ceb..a20bebd0081 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
@@ -81,7 +80,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
@@ -784,7 +782,7 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
@Test
- public void shouldLogAndMeasureExpiredRecords() {
+ public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final AbstractRocksDBSegmentedBytesStore<S> bytesStore =
getBytesStore();
final InternalMockProcessorContext context = new
InternalMockProcessorContext(
@@ -795,18 +793,13 @@ public abstract class
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
context.setSystemTimeMs(time.milliseconds());
bytesStore.init((StateStoreContext) context, bytesStore);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
- // write a record to advance stream time, with a high enough
timestamp
- // that the subsequent record in windows[0] will already be
expired.
- bytesStore.put(serializeKey(new Windowed<>("dummy",
nextSegmentWindow)), serializeValue(0));
+ // write a record to advance stream time, with a high enough timestamp
+ // that the subsequent record in windows[0] will already be expired.
+ bytesStore.put(serializeKey(new Windowed<>("dummy",
nextSegmentWindow)), serializeValue(0));
- final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
- final byte[] value = serializeValue(5);
- bytesStore.put(key, value);
-
- final List<String> messages = appender.getMessages();
- assertThat(messages, hasItem("Skipping record for expired
segment."));
- }
+ final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+ final byte[] value = serializeValue(5);
+ bytesStore.put(key, value);
final Map<MetricName, ? extends Metric> metrics =
context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 56c77ce4fe1..2d75f751344 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -793,7 +793,7 @@ public abstract class AbstractSessionBytesStoreTest {
}
@Test
- public void shouldLogAndMeasureExpiredRecords() {
+ public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final SessionStore<String, Long> sessionStore =
buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
final InternalMockProcessorContext context = new
InternalMockProcessorContext(
@@ -806,18 +806,13 @@ public abstract class AbstractSessionBytesStoreTest {
context.setSystemTimeMs(time.milliseconds());
sessionStore.init((StateStoreContext) context, sessionStore);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
- // Advance stream time by inserting record with large enough
timestamp that records with timestamp 0 are expired
- // Note that rocksdb will only expire segments at a time (where
segment interval = 60,000 for this retention period)
- sessionStore.put(new Windowed<>("initial record", new
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
+ // Advance stream time by inserting record with large enough timestamp
that records with timestamp 0 are expired
+ // Note that rocksdb will only expire segments at a time (where
segment interval = 60,000 for this retention period)
+ sessionStore.put(new Windowed<>("initial record", new SessionWindow(0,
2 * SEGMENT_INTERVAL)), 0L);
- // Try inserting a record with timestamp 0 -- should be dropped
- sessionStore.put(new Windowed<>("late record", new
SessionWindow(0, 0)), 0L);
- sessionStore.put(new Windowed<>("another on-time record", new
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
-
- final List<String> messages = appender.getMessages();
- assertThat(messages, hasItem("Skipping record for expired
segment."));
- }
+ // Try inserting a record with timestamp 0 -- should be dropped
+ sessionStore.put(new Windowed<>("late record", new SessionWindow(0,
0)), 0L);
+ sessionStore.put(new Windowed<>("another on-time record", new
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
final Map<MetricName, ? extends Metric> metrics =
context.metrics().metrics();
final String threadId = Thread.currentThread().getName();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index d29c6bf88d4..688efab857d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -984,7 +984,7 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- public void shouldLogAndMeasureExpiredRecords() {
+ public void shouldMeasureExpiredRecords() {
final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
final WindowStore<Integer, String> windowStore =
buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false,
Serdes.Integer(), Serdes.String());
@@ -998,17 +998,12 @@ public abstract class AbstractWindowBytesStoreTest {
context.setTime(1L);
windowStore.init((StateStoreContext) context, windowStore);
- try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
- // Advance stream time by inserting record with large enough
timestamp that records with timestamp 0 are expired
- windowStore.put(1, "initial record", 2 * RETENTION_PERIOD);
+ // Advance stream time by inserting record with large enough timestamp
that records with timestamp 0 are expired
+ windowStore.put(1, "initial record", 2 * RETENTION_PERIOD);
- // Try inserting a record with timestamp 0 -- should be dropped
- windowStore.put(1, "late record", 0L);
- windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1);
-
- final List<String> messages = appender.getMessages();
- assertThat(messages, hasItem("Skipping record for expired
segment."));
- }
+ // Try inserting a record with timestamp 0 -- should be dropped
+ windowStore.put(1, "late record", 0L);
+ windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1);
final Map<MetricName, ? extends Metric> metrics =
context.metrics().metrics();