This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e0ee73e98ba KAFKA-16332 Remove Deprecated builder methods for
Time/Session/Join/SlidingWindows (#17126)
e0ee73e98ba is described below
commit e0ee73e98ba21b4740f4ca7066c460c7649db655
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Sep 12 05:12:56 2024 +0800
KAFKA-16332 Remove Deprecated builder methods for
Time/Session/Join/SlidingWindows (#17126)
Removed deprecated methods:
- TimeWindows#of
- TimeWindows#grace
- SessionWindows#with
- SessionWindows#grace
- SlidingWindows#withTimeDifferencAndGrace
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/kstream/SessionWindows.java | 52 +-------------------
.../kafka/streams/kstream/SlidingWindows.java | 22 ---------
.../kafka/streams/kstream/TimeWindowedKStream.java | 2 +-
.../apache/kafka/streams/kstream/TimeWindows.java | 56 ++--------------------
.../org/apache/kafka/streams/TopologyTest.java | 37 +++++++++-----
.../integration/AbstractResetIntegrationTest.java | 3 +-
.../integration/InternalTopicIntegrationTest.java | 6 +--
.../KStreamAggregationDedupIntegrationTest.java | 6 +--
.../KStreamAggregationIntegrationTest.java | 25 ++++------
.../integration/MetricsIntegrationTest.java | 5 +-
.../integration/QueryableStateIntegrationTest.java | 5 +-
.../integration/RocksDBMetricsIntegrationTest.java | 2 +-
.../kafka/streams/kstream/SessionWindowsTest.java | 19 --------
.../kafka/streams/kstream/SlidingWindowsTest.java | 2 -
.../kafka/streams/kstream/TimeWindowsTest.java | 20 --------
.../SessionWindowedCogroupedKStreamImplTest.java | 10 ++--
.../SlidingWindowedCogroupedKStreamImplTest.java | 7 ++-
.../kstream/internals/SuppressScenarioTest.java | 10 ++--
.../kstream/internals/SuppressTopologyTest.java | 5 +-
.../streams/kstream/internals/TimeWindowTest.java | 3 +-
.../TimeWindowedCogroupedKStreamImplTest.java | 9 ++--
.../kstream/internals/graph/StreamsGraphTest.java | 10 ++--
.../internals/StreamsPartitionAssignorTest.java | 4 +-
23 files changed, 76 insertions(+), 244 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 2feea6bc5bb..0bf5dcfebd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -24,7 +24,6 @@ import java.util.Objects;
import static java.time.Duration.ofMillis;
import static
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
/**
@@ -76,13 +75,9 @@ public final class SessionWindows {
private final long graceMs;
- // flag to check if the grace is already set via ofInactivityGapAndGrace
or ofInactivityGapWithNoGrace
- private final boolean hasSetGrace;
-
- private SessionWindows(final long gapMs, final long graceMs, final boolean
hasSetGrace) {
+ private SessionWindows(final long gapMs, final long graceMs) {
this.gapMs = gapMs;
this.graceMs = graceMs;
- this.hasSetGrace = hasSetGrace;
if (gapMs <= 0) {
throw new IllegalArgumentException("Gap time cannot be zero or
negative.");
@@ -136,50 +131,7 @@ public final class SessionWindows {
final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
- return new SessionWindows(inactivityGapMs, afterWindowEndMs, true);
- }
-
- /**
- * Create a new window specification with the specified inactivity gap.
- *
- * @param inactivityGap the gap of inactivity between sessions
- * @return a new window specification without specifying a grace period
(default to 24 hours minus {@code inactivityGap})
- * @throws IllegalArgumentException if {@code inactivityGap} is zero or
negative or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0. Use {@link
#ofInactivityGapWithNoGrace(Duration)} instead
- */
- @Deprecated
- public static SessionWindows with(final Duration inactivityGap) {
- final String msgPrefix =
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
- final long inactivityGapMs =
validateMillisecondDuration(inactivityGap, msgPrefix);
-
- return new SessionWindows(inactivityGapMs,
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0), false);
- }
-
- /**
- * Reject out-of-order events that arrive more than {@code afterWindowEnd}
- * after the end of its window.
- * <p>
- * Note that new events may change the boundaries of session windows, so
aggressive
- * close times can lead to surprising results in which an out-of-order
event is rejected and then
- * a subsequent event moves the window boundary forward.
- *
- * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
- * @return this updated builder
- * @throws IllegalArgumentException if the {@code afterWindowEnd} is
negative or can't be represented as {@code long milliseconds}
- * @throws IllegalStateException if {@link #grace(Duration)} is called
after {@link #ofInactivityGapAndGrace(Duration, Duration)} or {@link
#ofInactivityGapWithNoGrace(Duration)}
- * @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration,
Duration)} instead
- */
- @Deprecated
- public SessionWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
- if (this.hasSetGrace) {
- throw new IllegalStateException(
- "Cannot call grace() after setting grace value via
ofInactivityGapAndGrace or ofInactivityGapWithNoGrace.");
- }
-
- final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-
- return new SessionWindows(gapMs, afterWindowEndMs, false);
+ return new SessionWindows(inactivityGapMs, afterWindowEndMs);
}
public long gracePeriodMs() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 159fea71937..4161dd009e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -127,28 +127,6 @@ public final class SlidingWindows {
return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
}
- /**
- * Return a window definition with the window size based on the given
maximum time difference (inclusive) between
- * records in the same window and given window grace period. Reject
out-of-order events that arrive after {@code grace}.
- * A window is closed when {@code stream-time > window-end + grace-period}.
- *
- * @param timeDifference the max time difference (inclusive) between two
records in a window
- * @param grace the grace period to admit out-of-order events to a window
- * @return a new window definition
- * @throws IllegalArgumentException if the specified window size is < 0
or grace < 0, or either can't be represented as {@code long milliseconds}
- * @deprecated since 3.0. Use {@link
#ofTimeDifferenceWithNoGrace(Duration)} or {@link
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
- */
- @Deprecated
- public static SlidingWindows withTimeDifferenceAndGrace(final Duration
timeDifference, final Duration grace) throws IllegalArgumentException {
- final String msgPrefixSize =
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
- final long timeDifferenceMs =
validateMillisecondDuration(timeDifference, msgPrefixSize);
-
- final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace,
"grace");
- final long graceMs = validateMillisecondDuration(grace,
msgPrefixGrace);
-
- return new SlidingWindows(timeDifferenceMs, graceMs);
- }
-
public long timeDifferenceMs() {
return timeDifferenceMs;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 0a33c21bcc0..90ee1e911cd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -41,7 +41,7 @@ import java.time.Duration;
* materialized view) that can be queried using the name provided in the
{@link Materialized} instance.
* Furthermore, updates to the store are sent downstream into a windowed
{@link KTable} changelog stream, where
* "windowed" implies that the {@link KTable} key is a combined key of the
original record key and a window ID.
- * New events are added to {@link TimeWindows} until their grace period ends
(see {@link TimeWindows#grace(Duration)}).
+ * New events are added to {@link TimeWindows} until their grace period ends
(see {@link TimeWindows#ofSizeAndGrace(Duration, Duration)}).
* <p>
* A {@code TimeWindowedKStream} must be obtained from a {@link
KGroupedStream} via
* {@link KGroupedStream#windowedBy(Windows)}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index adae2ae11b3..3653eeea8d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -69,14 +69,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
private final long graceMs;
- // flag to check if the grace is already set via ofSizeAndGrace or
ofSizeWithNoGrace
- private final boolean hasSetGrace;
-
- private TimeWindows(final long sizeMs, final long advanceMs, final long
graceMs, final boolean hasSetGrace) {
+ private TimeWindows(final long sizeMs, final long advanceMs, final long
graceMs) {
this.sizeMs = sizeMs;
this.advanceMs = advanceMs;
this.graceMs = graceMs;
- this.hasSetGrace = hasSetGrace;
if (sizeMs <= 0) {
throw new IllegalArgumentException("Window size (sizeMs) must be
larger than zero.");
@@ -136,28 +132,7 @@ public final class TimeWindows extends Windows<TimeWindow>
{
final String afterWindowEndMsgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
- return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs, true);
- }
-
- /**
- * Return a window definition with the given window size, and with the
advance interval being equal to the window
- * size.
- * The time interval represented by the N-th window is: {@code [N * size,
N * size + size)}.
- * <p>
- * This provides the semantics of tumbling windows, which are fixed-sized,
gap-less, non-overlapping windows.
- * Tumbling windows are a special case of hopping windows with {@code
advance == size}.
- *
- * @param size The size of the window
- * @return a new window definition without specifying the grace period
(default to 24 hours minus window {@code size})
- * @throws IllegalArgumentException if the specified window size is zero
or negative or can't be represented as {@code long milliseconds}
- * @deprecated since 3.0. Use {@link #ofSizeWithNoGrace(Duration)} }
instead
- */
- @Deprecated
- public static TimeWindows of(final Duration size) throws
IllegalArgumentException {
- final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
- final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-
- return new TimeWindows(sizeMs, sizeMs,
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - sizeMs, 0), false);
+ return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
}
/**
@@ -174,7 +149,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
public TimeWindows advanceBy(final Duration advance) {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance,
"advance");
final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
- return new TimeWindows(sizeMs, advanceMs, graceMs, false);
+ return new TimeWindows(sizeMs, advanceMs, graceMs);
}
@Override
@@ -194,31 +169,6 @@ public final class TimeWindows extends Windows<TimeWindow>
{
return sizeMs;
}
- /**
- * Reject out-of-order events that arrive more than {@code
millisAfterWindowEnd}
- * after the end of its window.
- * <p>
- * Delay is defined as (stream_time - record_timestamp).
- *
- * @param afterWindowEnd The grace period to admit out-of-order events to
a window.
- * @return this updated builder
- * @throws IllegalArgumentException if {@code afterWindowEnd} is negative
or can't be represented as {@code long milliseconds}
- * @throws IllegalStateException if {@link #grace(Duration)} is called
after {@link #ofSizeAndGrace(Duration, Duration)} or {@link
#ofSizeWithNoGrace(Duration)}
- * @deprecated since 3.0. Use {@link #ofSizeAndGrace(Duration, Duration)}
instead
- */
- @Deprecated
- public TimeWindows grace(final Duration afterWindowEnd) throws
IllegalArgumentException {
- if (this.hasSetGrace) {
- throw new IllegalStateException(
- "Cannot call grace() after setting grace value via
ofSizeAndGrace or ofSizeWithNoGrace.");
- }
-
- final String msgPrefix =
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
- final long afterWindowEndMs =
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-
- return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, false);
- }
-
@Override
public long gracePeriodMs() {
return graceMs;
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 2f41bb724e6..7d886ffa233 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -80,7 +80,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@SuppressWarnings("deprecation")
@Timeout(600)
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -726,6 +725,7 @@ public class TopologyTest {
assertThat(topology.describe().hashCode(),
equalTo(expectedDescription.hashCode()));
}
+ @SuppressWarnings("deprecation")
@Test
public void streamStreamJoinTopologyWithDefaultStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -768,6 +768,7 @@ public class TopologyTest {
describe.toString());
}
+ @SuppressWarnings("deprecation")
@Test
public void streamStreamJoinTopologyWithCustomStoresNames() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -811,6 +812,7 @@ public class TopologyTest {
describe.toString());
}
+ @SuppressWarnings("deprecation")
@Test
public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -1267,6 +1269,7 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
kGroupedStreamZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
// override the default store into in-memory
@@ -1297,7 +1300,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count();
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1321,7 +1324,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count(Materialized.<Object, Long, WindowStore<Bytes,
byte[]>>as("count-store").withStoreType(Materialized.StoreType.IN_MEMORY));
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1345,7 +1348,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count(Materialized.<Object, Long, WindowStore<Bytes,
byte[]>>with(null, Serdes.Long())
.withStoreType(Materialized.StoreType.ROCKS_DB));
final Topology topology = builder.build();
@@ -1370,7 +1373,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count(Materialized.as(Materialized.StoreType.IN_MEMORY));
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1389,13 +1392,14 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
timeWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
// override the default store into in-memory
final StreamsBuilder builder = new
StreamsBuilder(overrideDefaultStore(StreamsConfig.IN_MEMORY));
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count();
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1419,7 +1423,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count();
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1443,7 +1447,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(1)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
.count(Materialized.<Object, Long, WindowStore<Bytes,
byte[]>>as("count-store").withStoreType(Materialized.StoreType.IN_MEMORY));
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1462,6 +1466,7 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
slidingWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
// override the default store into in-memory
@@ -1546,6 +1551,7 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
timeWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
{
// override the default store into in-memory
@@ -1635,6 +1641,7 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
slidingWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
{
// override the default store into in-memory
@@ -1723,6 +1730,7 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
sessionWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
{
// override the default store into in-memory
@@ -1757,7 +1765,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(SessionWindows.with(ofMillis(1)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.count();
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1781,7 +1789,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(SessionWindows.with(ofMillis(1)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.count(Materialized.<Object, Long, SessionStore<Bytes,
byte[]>>as("count-store")
.withStoreType(Materialized.StoreType.IN_MEMORY));
final Topology topology = builder.build();
@@ -1806,7 +1814,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(SessionWindows.with(ofMillis(1)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.count(Materialized.<Object, Long, SessionStore<Bytes,
byte[]>>with(null, Serdes.Long())
.withStoreType(Materialized.StoreType.ROCKS_DB));
final Topology topology = builder.build();
@@ -1831,7 +1839,7 @@ public class TopologyTest {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.groupByKey()
- .windowedBy(SessionWindows.with(ofMillis(1)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.count(Materialized.as(Materialized.StoreType.IN_MEMORY));
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1850,13 +1858,14 @@ public class TopologyTest {
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
sessionWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
// override the default store into in-memory
final StreamsBuilder builder = new
StreamsBuilder(overrideDefaultStore(StreamsConfig.IN_MEMORY));
builder.stream("input-topic")
.groupByKey()
- .windowedBy(SessionWindows.with(ofMillis(1)))
+ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.count();
final Topology topology = builder.build();
final TopologyDescription describe = topology.describe();
@@ -1960,6 +1969,7 @@ public class TopologyTest {
assertThat(processorTopology.stateStores().get(1).persistent(),
is(false));
}
+ @SuppressWarnings("deprecation")
@Test
public void
tableNamedMaterializedCountWithTopologyConfigShouldPreserveTopologyStructure() {
// override the default store into in-memory
@@ -2405,6 +2415,7 @@ public class TopologyTest {
assertThat(stateStoreFactory.loggingEnabled(), equalTo(false));
}
+ @SuppressWarnings("deprecation")
private TopologyConfig overrideDefaultStore(final String defaultStore) {
final Properties topologyOverrides = new Properties();
// change default store as in-memory
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 65c2daf5446..b92e2ad135a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -327,7 +327,6 @@ public abstract class AbstractResetIntegrationTest {
}
}
- @SuppressWarnings("deprecation")
private Topology setupTopologyWithIntermediateTopic(final boolean
useRepartitioned,
final String
outputTopic2) {
final StreamsBuilder builder = new StreamsBuilder();
@@ -349,7 +348,7 @@ public abstract class AbstractResetIntegrationTest {
stream = builder.stream(INTERMEDIATE_USER_TOPIC);
}
stream.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(35)).advanceBy(ofMillis(10)))
.count()
.toStream()
.map((key, value) -> new KeyValue<>(key.window().start() +
key.window().end(), value))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 7a3ac7365dd..a8764fca957 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -61,7 +61,6 @@ import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
@@ -71,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests related to internal topics in streams
*/
-@SuppressWarnings("deprecation")
@Timeout(600)
@Tag("integration")
public class InternalTopicIntegrationTest {
@@ -166,7 +164,7 @@ public class InternalTopicIntegrationTest {
(k, v) -> k,
Grouped.with("GroupName", Serdes.String(), Serdes.String())
)
- .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.aggregate(
() -> "",
(k, v, a) -> a + k)
@@ -233,7 +231,7 @@ public class InternalTopicIntegrationTest {
textLines.flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy(MockMapper.selectValueMapper())
- .windowedBy(TimeWindows.of(ofSeconds(1L)).grace(ofMillis(0L)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofSeconds(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("CountWindows").withRetention(ofSeconds(2L)));
try (final KafkaStreams streams = new KafkaStreams(builder.build(),
streamsProp)) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index e92ea85dde9..bb158837b8b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.List;
import java.util.Properties;
import static java.time.Duration.ofMillis;
+import static java.time.Duration.ofMinutes;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
/**
@@ -66,7 +67,6 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
*/
@Timeout(600)
@Tag("integration")
-@SuppressWarnings("deprecation")
public class KStreamAggregationDedupIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final long COMMIT_INTERVAL_MS = 300L;
@@ -157,7 +157,7 @@ public class KStreamAggregationDedupIntegrationTest {
produceMessages(secondBatchTimestamp);
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L),
ofMinutes(1L)))
.reduce(reducer, Materialized.as("reduce-time-windows"))
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
@@ -193,7 +193,7 @@ public class KStreamAggregationDedupIntegrationTest {
produceMessages(timestamp);
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))
.count(Materialized.as("count-windows"))
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start())
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index d4a53f1b222..abe5349e034 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -210,7 +210,6 @@ public class KStreamAggregationIntegrationTest {
return keyComparison;
}
- @SuppressWarnings("deprecation")
@Test
public void shouldReduceWindowed(final TestInfo testInfo) throws Exception
{
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -222,7 +221,7 @@ public class KStreamAggregationIntegrationTest {
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
groupedStream
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L),
ofMinutes(1L)))
.reduce(reducer)
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
@@ -328,7 +327,6 @@ public class KStreamAggregationIntegrationTest {
);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldAggregateWindowed(final TestInfo testInfo) throws
Exception {
final long firstTimestamp = mockTime.milliseconds();
@@ -339,7 +337,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(secondTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
- groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
+ groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L),
ofMinutes(1L)))
.aggregate(
initializer,
aggregator,
@@ -459,7 +457,6 @@ public class KStreamAggregationIntegrationTest {
shouldCountHelper(testInfo);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
final long timestamp = mockTime.milliseconds();
@@ -467,7 +464,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(timestamp);
stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
- .windowedBy(TimeWindows.of(ofMillis(500L)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))
.count()
.toStream((windowedKey, value) -> windowedKey.key() + "@" +
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
@@ -499,7 +496,6 @@ public class KStreamAggregationIntegrationTest {
);
}
- @SuppressWarnings("deprecation")
@Test
public void shouldReduceSlidingWindows(final TestInfo testInfo) throws
Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -512,7 +508,7 @@ public class KStreamAggregationIntegrationTest {
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
groupedStream
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(2000L)))
+
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference),
ofMillis(2000L)))
.reduce(reducer)
.toStream()
.to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
@@ -607,7 +603,6 @@ public class KStreamAggregationIntegrationTest {
}
}
- @SuppressWarnings("deprecation")
@Test
public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws
Exception {
final long firstBatchTimestamp = mockTime.milliseconds();
@@ -619,8 +614,7 @@ public class KStreamAggregationIntegrationTest {
produceMessages(thirdBatchTimestamp);
final Serde<Windowed<String>> windowedSerde =
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
- //noinspection deprecation
-
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
ofMinutes(5)))
+
groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L),
ofMinutes(5)))
.aggregate(
initializer,
aggregator,
@@ -720,7 +714,6 @@ public class KStreamAggregationIntegrationTest {
}
}
- @SuppressWarnings("deprecation")
@Test
public void shouldCountSessionWindows() throws Exception {
final long sessionGap = 5 * 60 * 1000L;
@@ -800,10 +793,9 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<Long, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
- //noinspection deprecation
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
- .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
.count()
.toStream()
.process(() -> (Processor<Windowed<String>, Long, Object, Object>)
record -> {
@@ -823,7 +815,6 @@ public class KStreamAggregationIntegrationTest {
assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3,
t3))), equalTo(KeyValue.pair(1L, t3)));
}
- @SuppressWarnings("deprecation")
@Test
public void shouldReduceSessionWindows() throws Exception {
final long sessionGap = 1000L; // something to do with time
@@ -903,9 +894,9 @@ public class KStreamAggregationIntegrationTest {
final Map<Windowed<String>, KeyValue<String, Long>> results = new
HashMap<>();
final CountDownLatch latch = new CountDownLatch(13);
final String userSessionsStore = "UserSessionsStore";
- //noinspection deprecation
+
builder.stream(userSessionsStream, Consumed.with(Serdes.String(),
Serdes.String()))
- .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2)
-> value1 + ":" + value2, Materialized.as(userSessionsStore))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap),
ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2,
Materialized.as(userSessionsStore))
.toStream()
.process(() -> (Processor<Windowed<String>, String, Object,
Object>) record -> {
results.put(record.key(), KeyValue.pair(record.value(),
record.timestamp()));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d13de4b402b..db9977901d9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -68,7 +68,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
@Timeout(600)
@Tag("integration")
-@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
@@ -375,7 +374,7 @@ public class MetricsIntegrationTest {
final Duration windowSize = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.groupByKey()
- .windowedBy(TimeWindows.of(windowSize).grace(Duration.ZERO))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize))
.aggregate(() -> 0L,
(aggKey, newValue, aggValue) -> aggValue,
Materialized.<Integer, Long, WindowStore<Bytes,
byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
@@ -403,7 +402,7 @@ public class MetricsIntegrationTest {
final Duration inactivityGap = Duration.ofMillis(50);
builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
.groupByKey()
-
.windowedBy(SessionWindows.with(inactivityGap).grace(Duration.ZERO))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(inactivityGap))
.aggregate(() -> 0L,
(aggKey, newValue, aggValue) -> aggValue,
(aggKey, leftAggValue, rightAggValue) -> leftAggValue,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 2dcf1ba54df..f16c3e57c39 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -120,7 +120,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
@Timeout(600)
@Tag("integration")
-@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
private static final Logger log =
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
@@ -271,7 +270,7 @@ public class QueryableStateIntegrationTest {
// Create a Windowed State Store that contains the word count for
every 1 minute
groupedByWord
- .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
.count(Materialized.as(windowStoreName + "-" + inputTopic))
.toStream((key, value) -> key.key())
.to(windowOutputTopic, Produced.with(Serdes.String(),
Serdes.Long()));
@@ -1000,7 +999,7 @@ public class QueryableStateIntegrationTest {
final String windowStoreName = "windowed-count";
s1.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
.count(Materialized.as(windowStoreName));
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
startApplicationAndWaitUntilRunning(kafkaStreams);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 17fd60d568f..2fc1a9a644f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -189,7 +189,7 @@ public class RocksDBMetricsIntegrationTest {
).toStream().to(STREAM_OUTPUT_ONE);
builder.stream(STREAM_INPUT_TWO, Consumed.with(Serdes.Integer(),
Serdes.String()))
.groupByKey()
- .windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
.aggregate(() -> 0L,
(aggKey, newValue, aggValue) -> aggValue,
Materialized.<Integer, Long, WindowStore<Bytes,
byte[]>>as("time-windowed-aggregated-stream-store")
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 7868dee5512..755ae96f1dc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -18,12 +18,9 @@ package org.apache.kafka.streams.kstream;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
-
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
@@ -68,15 +65,6 @@ public class SessionWindowsTest {
assertEquals(ANY_GRACE,
SessionWindows.ofInactivityGapAndGrace(ofMillis(ANY_OTHER_SIZE),
ofMillis(ANY_GRACE)).gracePeriodMs());
}
- @SuppressWarnings("deprecation")
- @Test
- public void oldAPIShouldSetDefaultGracePeriod() {
- assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
- assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L,
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
- assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
- assertEquals(0L,
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
- }
-
@Test
public void windowSizeMustNotBeNegative() {
assertThrows(IllegalArgumentException.class, () ->
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(-1)));
@@ -87,13 +75,6 @@ public class SessionWindowsTest {
assertThrows(IllegalArgumentException.class, () ->
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0)));
}
- @SuppressWarnings("deprecation")
- @Test
- public void graceShouldNotCalledAfterGraceSet() {
- assertThrows(IllegalStateException.class, () ->
SessionWindows.ofInactivityGapAndGrace(ofMillis(10),
ofMillis(10)).grace(ofMillis(10)));
- assertThrows(IllegalStateException.class, () ->
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)).grace(ofMillis(10)));
- }
-
@Test
public void equalsAndHashcodeShouldBeValidForPositiveCases() {
verifyEquality(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
index accd51cbd23..b0d8e2ed524 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -29,10 +29,8 @@ public class SlidingWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_GRACE = 1024L;
- @SuppressWarnings("deprecation")
@Test
public void shouldSetTimeDifference() {
- assertEquals(ANY_SIZE,
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE),
ofMillis(3)).timeDifferenceMs());
assertEquals(ANY_SIZE,
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE),
ofMillis(ANY_GRACE)).timeDifferenceMs());
assertEquals(ANY_SIZE,
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 9ad54dbf90e..5f3b2a5d39f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -20,13 +20,11 @@ import
org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
import java.util.Map;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -37,10 +35,8 @@ public class TimeWindowsTest {
private static final long ANY_SIZE = 123L;
private static final long ANY_GRACE = 1024L;
- @SuppressWarnings("deprecation")
@Test
public void shouldSetWindowSize() {
- assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);
assertEquals(ANY_SIZE,
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs);
assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE),
ofMillis(ANY_GRACE)).sizeMs);
}
@@ -61,13 +57,6 @@ public class TimeWindowsTest {
assertThrows(IllegalArgumentException.class, () ->
TimeWindows.ofSizeWithNoGrace(ofMillis(-1)));
}
- @SuppressWarnings("deprecation")
- @Test
- public void graceShouldNotCalledAfterGraceSet() {
- assertThrows(IllegalStateException.class, () ->
TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(10)).grace(ofMillis(10)));
- assertThrows(IllegalStateException.class, () ->
TimeWindows.ofSizeWithNoGrace(ofMillis(10)).grace(ofMillis(10)));
- }
-
@Test
public void advanceIntervalMustNotBeZero() {
final TimeWindows windowSpec =
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE));
@@ -113,15 +102,6 @@ public class TimeWindowsTest {
}
}
- @SuppressWarnings("deprecation")
- @Test
- public void oldAPIShouldSetDefaultGracePeriod() {
- assertEquals(Duration.ofDays(1).toMillis(),
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
- assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L,
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
- assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
- assertEquals(0L,
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD +
1L)).gracePeriodMs());
- }
-
@Test
public void shouldComputeWindowsForHoppingWindows() {
final TimeWindows windows =
TimeWindows.ofSizeWithNoGrace(ofMillis(12L)).advanceBy(ofMillis(5L));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index a3709dea91f..d3ad500ce2e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -47,12 +47,12 @@ import org.junit.jupiter.api.Test;
import java.util.Properties;
+import static java.time.Duration.ofDays;
import static java.time.Duration.ofMillis;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
-@SuppressWarnings("deprecation")
public class SessionWindowedCogroupedKStreamImplTest {
private final StreamsBuilder builder = new StreamsBuilder();
@@ -80,7 +80,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
- windowedCogroupedStream =
cogroupedStream.windowedBy(SessionWindows.with(ofMillis(100)));
+ windowedCogroupedStream =
cogroupedStream.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(100),
ofDays(1)));
}
@Test
@@ -147,7 +147,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(SessionWindows.with(ofMillis(1)))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
.aggregate(MockInitializer.STRING_INIT, sessionMerger,
Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo(
@@ -166,7 +166,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
@Test
public void sessionWindowAggregateTest() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(SessionWindows.with(ofMillis(500)))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
.aggregate(MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -190,7 +190,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
@Test
public void sessionWindowAggregate2Test() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(SessionWindows.with(ofMillis(500)))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
.aggregate(MockInitializer.STRING_INIT, sessionMerger,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 6ccc40f2df4..b0fdf879c86 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -56,7 +56,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-@SuppressWarnings("deprecation")
public class SlidingWindowedCogroupedKStreamImplTest {
private static final String TOPIC = "topic";
@@ -82,7 +81,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
final KGroupedStream<String, String> groupedStream2 =
stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final CogroupedKStream<String, String> cogroupedStream =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
- windowedCogroupedStream =
cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
+ windowedCogroupedStream =
cogroupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(
WINDOW_SIZE_MS), ofMillis(2000L)));
}
@@ -133,7 +132,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L)))
+
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo(
@@ -210,7 +209,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
public void slidingWindowAggregateOverlappingWindowsTest() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L))).aggregate(
+
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS),
ofMillis(2000L))).aggregate(
MockInitializer.STRING_INIT,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index d99bc5671ce..4c37fa2f5cf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -364,7 +364,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
- .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(2L), ofMillis(1L)))
.count(Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -415,7 +415,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
- .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(2L)))
+ .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(2L), ofMillis(2L)))
.count(Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -471,7 +471,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
-
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5L),
ofMillis(15L)))
+
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(5L),
ofMillis(15L)))
.count(Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -561,7 +561,7 @@ public class SuppressScenarioTest {
final KTable<Windowed<String>, Long> valueCounts = builder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
- .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
+
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(5L)))
.count(Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("counts").withCachingDisabled());
valueCounts
.suppress(untilWindowCloses(unbounded()))
@@ -817,7 +817,7 @@ public class SuppressScenarioTest {
final KGroupedStream<String, String> stream1 = builder.stream("one",
Consumed.with(Serdes.String(),
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KGroupedStream<String, String> stream2 = builder.stream("two",
Consumed.with(Serdes.String(),
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
final KStream<Windowed<String>, Object> cogrouped =
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2,
(key, value, aggregate) -> aggregate + value)
- .windowedBy(TimeWindows.of(Duration.ofMinutes(15)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(15)))
.aggregate(() -> "", Named.as("test"), Materialized.as("store"))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
index 3b45b9b8f35..2254f8c10af 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
@@ -45,7 +45,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
-@SuppressWarnings("deprecation")
public class SuppressTopologyTest {
private static final Serde<String> STRING_SERDE = Serdes.String();
@@ -154,7 +153,7 @@ public class SuppressTopologyTest {
anonymousNodeBuilder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
- .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
+ .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(5L),
ofMillis(5L)))
.count(Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("counts").withCachingDisabled())
.suppress(untilWindowCloses(unbounded()))
.toStream()
@@ -172,7 +171,7 @@ public class SuppressTopologyTest {
namedNodeBuilder
.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
.groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE,
STRING_SERDE))
- .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
+ .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(5L),
ofMillis(5L)))
.count(Materialized.<String, Long, SessionStore<Bytes,
byte[]>>as("counts").withCachingDisabled())
.suppress(untilWindowCloses(unbounded()).withName("myname"))
.toStream()
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index f43d56b53e3..17420e0e3c9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -125,10 +125,9 @@ public class TimeWindowTest {
assertThrows(IllegalArgumentException.class, () ->
window.overlap(sessionWindow));
}
- @SuppressWarnings("deprecation")
@Test
public void shouldReturnMatchedWindowsOrderedByTimestamp() {
- final TimeWindows windows =
TimeWindows.of(ofMillis(12L)).advanceBy(ofMillis(5L));
+ final TimeWindows windows =
TimeWindows.ofSizeWithNoGrace(ofMillis(12L)).advanceBy(ofMillis(5L));
final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
final Long[] expected = matched.keySet().toArray(new Long[0]);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
index a1a46ca5130..de157ce3fdc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
@@ -53,7 +53,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
-@SuppressWarnings("deprecation")
public class TimeWindowedCogroupedKStreamImplTest {
private static final Long WINDOW_SIZE = 500L;
@@ -81,7 +80,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
.cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
- windowedCogroupedStream =
cogroupedStream.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)));
+ windowedCogroupedStream =
cogroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)));
}
@Test
@@ -132,7 +131,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
.with(Serdes.String(), Serdes.String()));
groupedStream = stream.groupByKey(Grouped.with(Serdes.String(),
Serdes.String()));
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
.aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
assertThat(builder.build().describe().toString(), equalTo(
@@ -222,7 +221,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
public void timeWindowAggregateManyWindowsTest() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
- .windowedBy(TimeWindows.of(ofMillis(500L))).aggregate(
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))).aggregate(
MockInitializer.STRING_INIT,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
@@ -248,7 +247,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
public void timeWindowAggregateOverlappingWindowsTest() {
final KTable<Windowed<String>, String> customers =
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-
.windowedBy(TimeWindows.of(ofMillis(500L)).advanceBy(ofMillis(200L))).aggregate(
+
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)).advanceBy(ofMillis(200L))).aggregate(
MockInitializer.STRING_INIT,
Materialized.with(Serdes.String(), Serdes.String()));
customers.toStream().to(OUTPUT);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 5a4a16fa94f..79ea95118f4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
import static java.time.Duration.ofMillis;
import static org.junit.jupiter.api.Assertions.assertEquals;
-@SuppressWarnings("deprecation")
public class StreamsGraphTest {
private final Pattern repartitionTopicPattern = Pattern.compile("Sink:
.*-repartition");
@@ -64,6 +63,7 @@ public class StreamsGraphTest {
// Test builds topology in successive manner but only graph node not yet
processed written to topology
+ @SuppressWarnings("deprecation")
@Test
public void shouldBeAbleToBuildTopologyIncrementally() {
final StreamsBuilder builder = new StreamsBuilder();
@@ -107,7 +107,7 @@ public class StreamsGraphTest {
// second repartition
changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
- .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5)))
.count(Materialized.as("windowed-count-store"))
.toStream()
.map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count",
Produced.with(Serdes.String(), Serdes.Long()));
@@ -250,7 +250,7 @@ public class StreamsGraphTest {
final KStream<String, String> mappedKeyStream =
inputStream.selectKey((k, v) -> k + v);
mappedKeyStream.mapValues(v ->
v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
- mappedKeyStream.flatMapValues(v ->
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.of(ofMillis(5000))).count().toStream().to("windowed-output");
+ mappedKeyStream.flatMapValues(v ->
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
return builder.build(properties);
@@ -267,7 +267,7 @@ public class StreamsGraphTest {
final KStream<String, String> mappedKeyStream =
inputStream.selectKey((k, v) -> k + v).through("through-topic");
mappedKeyStream.groupByKey().count().toStream().to("output");
-
mappedKeyStream.groupByKey().windowedBy(TimeWindows.of(ofMillis(5000))).count().toStream().to("windowed-output");
+
mappedKeyStream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
return builder.build(properties);
@@ -290,7 +290,7 @@ public class StreamsGraphTest {
inputStream
.repartition()
.groupByKey()
- .windowedBy(TimeWindows.of(ofMillis(5000)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000)))
.count()
.toStream()
.to("windowed-output");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9460bbe83ac..4702681a650 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -176,7 +176,6 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@Timeout(30_000)
-@SuppressWarnings("deprecation")
public class StreamsPartitionAssignorTest {
private static final String CONSUMER_1 = "consumer1";
@@ -1837,6 +1836,7 @@ public class StreamsPartitionAssignorTest {
assertThrows(ConfigException.class, () ->
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
"localhost:j87yhk"), parameterizedConfig));
}
+ @SuppressWarnings("deprecation")
@ParameterizedTest
@MethodSource("parameter")
public void
shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks(final
Map<String, Object> parameterizedConfig) {
@@ -2710,7 +2710,7 @@ public class StreamsPartitionAssignorTest {
(k, v) -> k,
Grouped.with("GroupName", Serdes.String(), Serdes.String())
)
- .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
.aggregate(
() -> "",
(k, v, a) -> a + k)