This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 661e41c92f9 KAFKA-16302: Remove check for log message that is no 
longer present (fix builds) (#15422)
661e41c92f9 is described below

commit 661e41c92f9cee4ea97835de40ecf328330465f5
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Feb 22 17:10:11 2024 -0800

    KAFKA-16302: Remove check for log message that is no longer present (fix 
builds) (#15422)
    
    a3528a3 removed this log but not the test asserting it.
    
    Builds are currently red because for some reason these tests can't retry. 
We should address that as a followup.
    
    Reviewers:  Greg Harris <[email protected]>,  Matthias J. Sax 
<[email protected]>
---
 ...actDualSchemaRocksDBSegmentedBytesStoreTest.java | 21 +++++++--------------
 .../AbstractRocksDBSegmentedBytesStoreTest.java     | 21 +++++++--------------
 .../internals/AbstractSessionBytesStoreTest.java    | 19 +++++++------------
 .../internals/AbstractWindowBytesStoreTest.java     | 17 ++++++-----------
 4 files changed, 27 insertions(+), 51 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
index c81e57589a0..c7af25726a4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java
@@ -45,7 +45,6 @@ import 
org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -83,7 +82,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1579,7 +1577,7 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
 
 
     @Test
-    public void shouldLogAndMeasureExpiredRecords() {
+    public void shouldMeasureExpiredRecords() {
         final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
         final AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore = 
getBytesStore();
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(
@@ -1590,18 +1588,13 @@ public abstract class 
AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends
         context.setSystemTimeMs(time.milliseconds());
         bytesStore.init((StateStoreContext) context, bytesStore);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
-            // write a record to advance stream time, with a high enough 
timestamp
-            // that the subsequent record in windows[0] will already be 
expired.
-            bytesStore.put(serializeKey(new Windowed<>("dummy", 
nextSegmentWindow)), serializeValue(0));
+        // write a record to advance stream time, with a high enough timestamp
+        // that the subsequent record in windows[0] will already be expired.
+        bytesStore.put(serializeKey(new Windowed<>("dummy", 
nextSegmentWindow)), serializeValue(0));
 
-            final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
-            final byte[] value = serializeValue(5);
-            bytesStore.put(key, value);
-
-            final List<String> messages = appender.getMessages();
-            assertThat(messages, hasItem("Skipping record for expired 
segment."));
-        }
+        final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+        final byte[] value = serializeValue(5);
+        bytesStore.put(key, value);
 
         final Map<MetricName, ? extends Metric> metrics = 
context.metrics().metrics();
         final String threadId = Thread.currentThread().getName();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index 852f6fd3ceb..a20bebd0081 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
@@ -81,7 +80,6 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
 import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.hasEntry;
@@ -784,7 +782,7 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
 
 
     @Test
-    public void shouldLogAndMeasureExpiredRecords() {
+    public void shouldMeasureExpiredRecords() {
         final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
         final AbstractRocksDBSegmentedBytesStore<S> bytesStore = 
getBytesStore();
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(
@@ -795,18 +793,13 @@ public abstract class 
AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
         context.setSystemTimeMs(time.milliseconds());
         bytesStore.init((StateStoreContext) context, bytesStore);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
-            // write a record to advance stream time, with a high enough 
timestamp
-            // that the subsequent record in windows[0] will already be 
expired.
-            bytesStore.put(serializeKey(new Windowed<>("dummy", 
nextSegmentWindow)), serializeValue(0));
+        // write a record to advance stream time, with a high enough timestamp
+        // that the subsequent record in windows[0] will already be expired.
+        bytesStore.put(serializeKey(new Windowed<>("dummy", 
nextSegmentWindow)), serializeValue(0));
 
-            final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
-            final byte[] value = serializeValue(5);
-            bytesStore.put(key, value);
-
-            final List<String> messages = appender.getMessages();
-            assertThat(messages, hasItem("Skipping record for expired 
segment."));
-        }
+        final Bytes key = serializeKey(new Windowed<>("a", windows[0]));
+        final byte[] value = serializeValue(5);
+        bytesStore.put(key, value);
 
         final Map<MetricName, ? extends Metric> metrics = 
context.metrics().metrics();
         final String threadId = Thread.currentThread().getName();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 56c77ce4fe1..2d75f751344 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -793,7 +793,7 @@ public abstract class AbstractSessionBytesStoreTest {
     }
 
     @Test
-    public void shouldLogAndMeasureExpiredRecords() {
+    public void shouldMeasureExpiredRecords() {
         final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
         final SessionStore<String, Long> sessionStore = 
buildSessionStore(RETENTION_PERIOD, Serdes.String(), Serdes.Long());
         final InternalMockProcessorContext context = new 
InternalMockProcessorContext(
@@ -806,18 +806,13 @@ public abstract class AbstractSessionBytesStoreTest {
         context.setSystemTimeMs(time.milliseconds());
         sessionStore.init((StateStoreContext) context, sessionStore);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
-            // Advance stream time by inserting record with large enough 
timestamp that records with timestamp 0 are expired
-            // Note that rocksdb will only expire segments at a time (where 
segment interval = 60,000 for this retention period)
-            sessionStore.put(new Windowed<>("initial record", new 
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
+        // Advance stream time by inserting record with large enough timestamp 
that records with timestamp 0 are expired
+        // Note that rocksdb will only expire segments at a time (where 
segment interval = 60,000 for this retention period)
+        sessionStore.put(new Windowed<>("initial record", new SessionWindow(0, 
2 * SEGMENT_INTERVAL)), 0L);
 
-            // Try inserting a record with timestamp 0 -- should be dropped
-            sessionStore.put(new Windowed<>("late record", new 
SessionWindow(0, 0)), 0L);
-            sessionStore.put(new Windowed<>("another on-time record", new 
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
-
-            final List<String> messages = appender.getMessages();
-            assertThat(messages, hasItem("Skipping record for expired 
segment."));
-        }
+        // Try inserting a record with timestamp 0 -- should be dropped
+        sessionStore.put(new Windowed<>("late record", new SessionWindow(0, 
0)), 0L);
+        sessionStore.put(new Windowed<>("another on-time record", new 
SessionWindow(0, 2 * SEGMENT_INTERVAL)), 0L);
 
         final Map<MetricName, ? extends Metric> metrics = 
context.metrics().metrics();
         final String threadId = Thread.currentThread().getName();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index d29c6bf88d4..688efab857d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -984,7 +984,7 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    public void shouldLogAndMeasureExpiredRecords() {
+    public void shouldMeasureExpiredRecords() {
         final Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
         final WindowStore<Integer, String> windowStore =
             buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, false, 
Serdes.Integer(), Serdes.String());
@@ -998,17 +998,12 @@ public abstract class AbstractWindowBytesStoreTest {
         context.setTime(1L);
         windowStore.init((StateStoreContext) context, windowStore);
 
-        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
-            // Advance stream time by inserting record with large enough 
timestamp that records with timestamp 0 are expired
-            windowStore.put(1, "initial record", 2 * RETENTION_PERIOD);
+        // Advance stream time by inserting record with large enough timestamp 
that records with timestamp 0 are expired
+        windowStore.put(1, "initial record", 2 * RETENTION_PERIOD);
 
-            // Try inserting a record with timestamp 0 -- should be dropped
-            windowStore.put(1, "late record", 0L);
-            windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1);
-
-            final List<String> messages = appender.getMessages();
-            assertThat(messages, hasItem("Skipping record for expired 
segment."));
-        }
+        // Try inserting a record with timestamp 0 -- should be dropped
+        windowStore.put(1, "late record", 0L);
+        windowStore.put(1, "another on-time record", RETENTION_PERIOD + 1);
 
         final Map<MetricName, ? extends Metric> metrics = 
context.metrics().metrics();
 

Reply via email to