This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e0ee73e98ba KAFKA-16332 Remove Deprecated builder methods for 
Time/Session/Join/SlidingWindows (#17126)
e0ee73e98ba is described below

commit e0ee73e98ba21b4740f4ca7066c460c7649db655
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Sep 12 05:12:56 2024 +0800

    KAFKA-16332 Remove Deprecated builder methods for 
Time/Session/Join/SlidingWindows (#17126)
    
    Removed deprecated methods:
     - TimeWindows#of
     - TimeWindows#grace
     - SessionWindows#with
     - SessionWindows#grace
     - SlidingWindows#withTimeDifferencAndGrace
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/kstream/SessionWindows.java      | 52 +-------------------
 .../kafka/streams/kstream/SlidingWindows.java      | 22 ---------
 .../kafka/streams/kstream/TimeWindowedKStream.java |  2 +-
 .../apache/kafka/streams/kstream/TimeWindows.java  | 56 ++--------------------
 .../org/apache/kafka/streams/TopologyTest.java     | 37 +++++++++-----
 .../integration/AbstractResetIntegrationTest.java  |  3 +-
 .../integration/InternalTopicIntegrationTest.java  |  6 +--
 .../KStreamAggregationDedupIntegrationTest.java    |  6 +--
 .../KStreamAggregationIntegrationTest.java         | 25 ++++------
 .../integration/MetricsIntegrationTest.java        |  5 +-
 .../integration/QueryableStateIntegrationTest.java |  5 +-
 .../integration/RocksDBMetricsIntegrationTest.java |  2 +-
 .../kafka/streams/kstream/SessionWindowsTest.java  | 19 --------
 .../kafka/streams/kstream/SlidingWindowsTest.java  |  2 -
 .../kafka/streams/kstream/TimeWindowsTest.java     | 20 --------
 .../SessionWindowedCogroupedKStreamImplTest.java   | 10 ++--
 .../SlidingWindowedCogroupedKStreamImplTest.java   |  7 ++-
 .../kstream/internals/SuppressScenarioTest.java    | 10 ++--
 .../kstream/internals/SuppressTopologyTest.java    |  5 +-
 .../streams/kstream/internals/TimeWindowTest.java  |  3 +-
 .../TimeWindowedCogroupedKStreamImplTest.java      |  9 ++--
 .../kstream/internals/graph/StreamsGraphTest.java  | 10 ++--
 .../internals/StreamsPartitionAssignorTest.java    |  4 +-
 23 files changed, 76 insertions(+), 244 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 2feea6bc5bb..0bf5dcfebd4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import static java.time.Duration.ofMillis;
 import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 import static 
org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
 
 /**
@@ -76,13 +75,9 @@ public final class SessionWindows {
 
     private final long graceMs;
 
-    // flag to check if the grace is already set via ofInactivityGapAndGrace 
or ofInactivityGapWithNoGrace
-    private final boolean hasSetGrace;
-
-    private SessionWindows(final long gapMs, final long graceMs, final boolean 
hasSetGrace) {
+    private SessionWindows(final long gapMs, final long graceMs) {
         this.gapMs = gapMs;
         this.graceMs = graceMs;
-        this.hasSetGrace = hasSetGrace;
 
         if (gapMs <= 0) {
             throw new IllegalArgumentException("Gap time cannot be zero or 
negative.");
@@ -136,50 +131,7 @@ public final class SessionWindows {
         final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
 
-        return new SessionWindows(inactivityGapMs, afterWindowEndMs, true);
-    }
-
-    /**
-     * Create a new window specification with the specified inactivity gap.
-     *
-     * @param inactivityGap the gap of inactivity between sessions
-     * @return a new window specification without specifying a grace period 
(default to 24 hours minus {@code inactivityGap})
-     * @throws IllegalArgumentException if {@code inactivityGap} is zero or 
negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0. Use {@link 
#ofInactivityGapWithNoGrace(Duration)} instead
-     */
-    @Deprecated
-    public static SessionWindows with(final Duration inactivityGap) {
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(inactivityGap, "inactivityGap");
-        final long inactivityGapMs = 
validateMillisecondDuration(inactivityGap, msgPrefix);
-
-        return new SessionWindows(inactivityGapMs, 
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - inactivityGapMs, 0), false);
-    }
-
-    /**
-     * Reject out-of-order events that arrive more than {@code afterWindowEnd}
-     * after the end of its window.
-     * <p>
-     * Note that new events may change the boundaries of session windows, so 
aggressive
-     * close times can lead to surprising results in which an out-of-order 
event is rejected and then
-     * a subsequent event moves the window boundary forward.
-     *
-     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
-     * @return this updated builder
-     * @throws IllegalArgumentException if the {@code afterWindowEnd} is 
negative or can't be represented as {@code long milliseconds}
-     * @throws IllegalStateException if {@link #grace(Duration)} is called 
after {@link #ofInactivityGapAndGrace(Duration, Duration)} or {@link 
#ofInactivityGapWithNoGrace(Duration)}
-     * @deprecated since 3.0. Use {@link #ofInactivityGapAndGrace(Duration, 
Duration)} instead
-     */
-    @Deprecated
-    public SessionWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
-        if (this.hasSetGrace) {
-            throw new IllegalStateException(
-                "Cannot call grace() after setting grace value via 
ofInactivityGapAndGrace or ofInactivityGapWithNoGrace.");
-        }
-
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-
-        return new SessionWindows(gapMs, afterWindowEndMs, false);
+        return new SessionWindows(inactivityGapMs, afterWindowEndMs);
     }
 
     public long gracePeriodMs() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
index 159fea71937..4161dd009e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java
@@ -127,28 +127,6 @@ public final class SlidingWindows {
         return new SlidingWindows(timeDifferenceMs, afterWindowEndMs);
     }
 
-    /**
-     * Return a window definition with the window size based on the given 
maximum time difference (inclusive) between
-     * records in the same window and given window grace period. Reject 
out-of-order events that arrive after {@code grace}.
-     * A window is closed when {@code stream-time > window-end + grace-period}.
-     *
-     * @param timeDifference the max time difference (inclusive) between two 
records in a window
-     * @param grace the grace period to admit out-of-order events to a window
-     * @return a new window definition
-     * @throws IllegalArgumentException if the specified window size is &lt; 0 
or grace &lt; 0, or either can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0. Use {@link 
#ofTimeDifferenceWithNoGrace(Duration)} or {@link 
#ofTimeDifferenceAndGrace(Duration, Duration)} instead
-     */
-    @Deprecated
-    public static SlidingWindows withTimeDifferenceAndGrace(final Duration 
timeDifference, final Duration grace) throws IllegalArgumentException {
-        final String msgPrefixSize = 
prepareMillisCheckFailMsgPrefix(timeDifference, "timeDifference");
-        final long timeDifferenceMs = 
validateMillisecondDuration(timeDifference, msgPrefixSize);
-
-        final String msgPrefixGrace = prepareMillisCheckFailMsgPrefix(grace, 
"grace");
-        final long graceMs = validateMillisecondDuration(grace, 
msgPrefixGrace);
-
-        return new SlidingWindows(timeDifferenceMs, graceMs);
-    }
-
     public long timeDifferenceMs() {
         return timeDifferenceMs;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
index 0a33c21bcc0..90ee1e911cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedKStream.java
@@ -41,7 +41,7 @@ import java.time.Duration;
  * materialized view) that can be queried using the name provided in the 
{@link Materialized} instance.
  * Furthermore, updates to the store are sent downstream into a windowed 
{@link KTable} changelog stream, where
  * "windowed" implies that the {@link KTable} key is a combined key of the 
original record key and a window ID.
- * New events are added to {@link TimeWindows} until their grace period ends 
(see {@link TimeWindows#grace(Duration)}).
+ * New events are added to {@link TimeWindows} until their grace period ends 
(see {@link TimeWindows#ofSizeAndGrace(Duration, Duration)}).
  * <p>
  * A {@code TimeWindowedKStream} must be obtained from a {@link 
KGroupedStream} via
  * {@link KGroupedStream#windowedBy(Windows)}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
index adae2ae11b3..3653eeea8d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java
@@ -69,14 +69,10 @@ public final class TimeWindows extends Windows<TimeWindow> {
 
     private final long graceMs;
 
-    // flag to check if the grace is already set via ofSizeAndGrace or 
ofSizeWithNoGrace
-    private final boolean hasSetGrace;
-
-    private TimeWindows(final long sizeMs, final long advanceMs, final long 
graceMs, final boolean hasSetGrace) {
+    private TimeWindows(final long sizeMs, final long advanceMs, final long 
graceMs) {
         this.sizeMs = sizeMs;
         this.advanceMs = advanceMs;
         this.graceMs = graceMs;
-        this.hasSetGrace = hasSetGrace;
 
         if (sizeMs <= 0) {
             throw new IllegalArgumentException("Window size (sizeMs) must be 
larger than zero.");
@@ -136,28 +132,7 @@ public final class TimeWindows extends Windows<TimeWindow> 
{
         final String afterWindowEndMsgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
         final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, afterWindowEndMsgPrefix);
 
-        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs, true);
-    }
-
-    /**
-     * Return a window definition with the given window size, and with the 
advance interval being equal to the window
-     * size.
-     * The time interval represented by the N-th window is: {@code [N * size, 
N * size + size)}.
-     * <p>
-     * This provides the semantics of tumbling windows, which are fixed-sized, 
gap-less, non-overlapping windows.
-     * Tumbling windows are a special case of hopping windows with {@code 
advance == size}.
-     *
-     * @param size The size of the window
-     * @return a new window definition without specifying the grace period 
(default to 24 hours minus window {@code size})
-     * @throws IllegalArgumentException if the specified window size is zero 
or negative or can't be represented as {@code long milliseconds}
-     * @deprecated since 3.0. Use {@link #ofSizeWithNoGrace(Duration)} } 
instead
-     */
-    @Deprecated
-    public static TimeWindows of(final Duration size) throws 
IllegalArgumentException {
-        final String msgPrefix = prepareMillisCheckFailMsgPrefix(size, "size");
-        final long sizeMs = validateMillisecondDuration(size, msgPrefix);
-
-        return new TimeWindows(sizeMs, sizeMs, 
Math.max(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - sizeMs, 0), false);
+        return new TimeWindows(sizeMs, sizeMs, afterWindowEndMs);
     }
 
     /**
@@ -174,7 +149,7 @@ public final class TimeWindows extends Windows<TimeWindow> {
     public TimeWindows advanceBy(final Duration advance) {
         final String msgPrefix = prepareMillisCheckFailMsgPrefix(advance, 
"advance");
         final long advanceMs = validateMillisecondDuration(advance, msgPrefix);
-        return new TimeWindows(sizeMs, advanceMs, graceMs, false);
+        return new TimeWindows(sizeMs, advanceMs, graceMs);
     }
 
     @Override
@@ -194,31 +169,6 @@ public final class TimeWindows extends Windows<TimeWindow> 
{
         return sizeMs;
     }
 
-    /**
-     * Reject out-of-order events that arrive more than {@code 
millisAfterWindowEnd}
-     * after the end of its window.
-     * <p>
-     * Delay is defined as (stream_time - record_timestamp).
-     *
-     * @param afterWindowEnd The grace period to admit out-of-order events to 
a window.
-     * @return this updated builder
-     * @throws IllegalArgumentException if {@code afterWindowEnd} is negative 
or can't be represented as {@code long milliseconds}
-     * @throws IllegalStateException if {@link #grace(Duration)} is called 
after {@link #ofSizeAndGrace(Duration, Duration)} or {@link 
#ofSizeWithNoGrace(Duration)}
-     * @deprecated since 3.0. Use {@link #ofSizeAndGrace(Duration, Duration)} 
instead
-     */
-    @Deprecated
-    public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
-        if (this.hasSetGrace) {
-            throw new IllegalStateException(
-                "Cannot call grace() after setting grace value via 
ofSizeAndGrace or ofSizeWithNoGrace.");
-        }
-
-        final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(afterWindowEnd, "afterWindowEnd");
-        final long afterWindowEndMs = 
validateMillisecondDuration(afterWindowEnd, msgPrefix);
-
-        return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, false);
-    }
-
     @Override
     public long gracePeriodMs() {
         return graceMs;
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 2f41bb724e6..7d886ffa233 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -80,7 +80,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("deprecation")
 @Timeout(600)
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
@@ -726,6 +725,7 @@ public class TopologyTest {
         assertThat(topology.describe().hashCode(), 
equalTo(expectedDescription.hashCode()));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void streamStreamJoinTopologyWithDefaultStoresNames() {
         final StreamsBuilder builder  = new StreamsBuilder();
@@ -768,6 +768,7 @@ public class TopologyTest {
             describe.toString());
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void streamStreamJoinTopologyWithCustomStoresNames() {
         final StreamsBuilder builder  = new StreamsBuilder();
@@ -811,6 +812,7 @@ public class TopologyTest {
             describe.toString());
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void streamStreamJoinTopologyWithCustomStoresSuppliers() {
         final StreamsBuilder builder  = new StreamsBuilder();
@@ -1267,6 +1269,7 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
kGroupedStreamZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
         // override the default store into in-memory
@@ -1297,7 +1300,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count();
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1321,7 +1324,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count(Materialized.<Object, Long, WindowStore<Bytes, 
byte[]>>as("count-store").withStoreType(Materialized.StoreType.IN_MEMORY));
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1345,7 +1348,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count(Materialized.<Object, Long, WindowStore<Bytes, 
byte[]>>with(null, Serdes.Long())
                 .withStoreType(Materialized.StoreType.ROCKS_DB));
         final Topology topology = builder.build();
@@ -1370,7 +1373,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count(Materialized.as(Materialized.StoreType.IN_MEMORY));
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1389,13 +1392,14 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
timeWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
         // override the default store into in-memory
         final StreamsBuilder builder = new 
StreamsBuilder(overrideDefaultStore(StreamsConfig.IN_MEMORY));
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count();
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1419,7 +1423,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count();
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1443,7 +1447,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(1)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(1)))
             .count(Materialized.<Object, Long, WindowStore<Bytes, 
byte[]>>as("count-store").withStoreType(Materialized.StoreType.IN_MEMORY));
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1462,6 +1466,7 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
slidingWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
         // override the default store into in-memory
@@ -1546,6 +1551,7 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
timeWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
 {
         // override the default store into in-memory
@@ -1635,6 +1641,7 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
slidingWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
 {
         // override the default store into in-memory
@@ -1723,6 +1730,7 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
sessionWindowedCogroupedZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure()
 {
         // override the default store into in-memory
@@ -1757,7 +1765,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(SessionWindows.with(ofMillis(1)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
             .count();
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1781,7 +1789,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(SessionWindows.with(ofMillis(1)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
             .count(Materialized.<Object, Long, SessionStore<Bytes, 
byte[]>>as("count-store")
                 .withStoreType(Materialized.StoreType.IN_MEMORY));
         final Topology topology = builder.build();
@@ -1806,7 +1814,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(SessionWindows.with(ofMillis(1)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
             .count(Materialized.<Object, Long, SessionStore<Bytes, 
byte[]>>with(null, Serdes.Long())
                 .withStoreType(Materialized.StoreType.ROCKS_DB));
         final Topology topology = builder.build();
@@ -1831,7 +1839,7 @@ public class TopologyTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(SessionWindows.with(ofMillis(1)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
             .count(Materialized.as(Materialized.StoreType.IN_MEMORY));
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1850,13 +1858,14 @@ public class TopologyTest {
         
assertThat(topology.internalTopologyBuilder.setApplicationId("test").buildTopology().hasPersistentLocalStore(),
 is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
sessionWindowZeroArgCountWithTopologyConfigShouldPreserveTopologyStructure() {
         // override the default store into in-memory
         final StreamsBuilder builder = new 
StreamsBuilder(overrideDefaultStore(StreamsConfig.IN_MEMORY));
         builder.stream("input-topic")
             .groupByKey()
-            .windowedBy(SessionWindows.with(ofMillis(1)))
+            .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
             .count();
         final Topology topology = builder.build();
         final TopologyDescription describe = topology.describe();
@@ -1960,6 +1969,7 @@ public class TopologyTest {
         assertThat(processorTopology.stateStores().get(1).persistent(), 
is(false));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void 
tableNamedMaterializedCountWithTopologyConfigShouldPreserveTopologyStructure() {
         // override the default store into in-memory
@@ -2405,6 +2415,7 @@ public class TopologyTest {
         assertThat(stateStoreFactory.loggingEnabled(), equalTo(false));
     }
 
+    @SuppressWarnings("deprecation")
     private TopologyConfig overrideDefaultStore(final String defaultStore) {
         final Properties topologyOverrides = new Properties();
         // change default store as in-memory
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 65c2daf5446..b92e2ad135a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -327,7 +327,6 @@ public abstract class AbstractResetIntegrationTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     private Topology setupTopologyWithIntermediateTopic(final boolean 
useRepartitioned,
                                                         final String 
outputTopic2) {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -349,7 +348,7 @@ public abstract class AbstractResetIntegrationTest {
             stream = builder.stream(INTERMEDIATE_USER_TOPIC);
         }
         stream.groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10)))
+            
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(35)).advanceBy(ofMillis(10)))
             .count()
             .toStream()
             .map((key, value) -> new KeyValue<>(key.window().start() + 
key.window().end(), value))
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 7a3ac7365dd..a8764fca957 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -61,7 +61,6 @@ import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static java.time.Duration.ofMillis;
 import static java.time.Duration.ofSeconds;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
@@ -71,7 +70,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /**
  * Tests related to internal topics in streams
  */
-@SuppressWarnings("deprecation")
 @Timeout(600)
 @Tag("integration")
 public class InternalTopicIntegrationTest {
@@ -166,7 +164,7 @@ public class InternalTopicIntegrationTest {
                 (k, v) -> k,
                 Grouped.with("GroupName", Serdes.String(), Serdes.String())
             )
-            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
             .aggregate(
                 () -> "",
                 (k, v, a) -> a + k)
@@ -233,7 +231,7 @@ public class InternalTopicIntegrationTest {
 
         textLines.flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
             .groupBy(MockMapper.selectValueMapper())
-            .windowedBy(TimeWindows.of(ofSeconds(1L)).grace(ofMillis(0L)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofSeconds(1L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("CountWindows").withRetention(ofSeconds(2L)));
 
         try (final KafkaStreams streams = new KafkaStreams(builder.build(), 
streamsProp)) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index e92ea85dde9..bb158837b8b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Properties;
 
 import static java.time.Duration.ofMillis;
+import static java.time.Duration.ofMinutes;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
 /**
@@ -66,7 +67,6 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
  */
 @Timeout(600)
 @Tag("integration")
-@SuppressWarnings("deprecation")
 public class KStreamAggregationDedupIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final long COMMIT_INTERVAL_MS = 300L;
@@ -157,7 +157,7 @@ public class KStreamAggregationDedupIntegrationTest {
         produceMessages(secondBatchTimestamp);
 
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), 
ofMinutes(1L)))
             .reduce(reducer, Materialized.as("reduce-time-windows"))
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
@@ -193,7 +193,7 @@ public class KStreamAggregationDedupIntegrationTest {
         produceMessages(timestamp);
 
         stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))
             .count(Materialized.as("count-windows"))
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start())
             .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index d4a53f1b222..abe5349e034 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -210,7 +210,6 @@ public class KStreamAggregationIntegrationTest {
         return keyComparison;
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceWindowed(final TestInfo testInfo) throws Exception 
{
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -222,7 +221,7 @@ public class KStreamAggregationIntegrationTest {
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
         groupedStream
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), 
ofMinutes(1L)))
             .reduce(reducer)
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
@@ -328,7 +327,6 @@ public class KStreamAggregationIntegrationTest {
         );
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateWindowed(final TestInfo testInfo) throws 
Exception {
         final long firstTimestamp = mockTime.milliseconds();
@@ -339,7 +337,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(secondTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L);
-        groupedStream.windowedBy(TimeWindows.of(ofMillis(500L)))
+        groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), 
ofMinutes(1L)))
             .aggregate(
                 initializer,
                 aggregator,
@@ -459,7 +457,6 @@ public class KStreamAggregationIntegrationTest {
         shouldCountHelper(testInfo);
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldGroupByKey(final TestInfo testInfo) throws Exception {
         final long timestamp = mockTime.milliseconds();
@@ -467,7 +464,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
 
         stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
-            .windowedBy(TimeWindows.of(ofMillis(500L)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)))
             .count()
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + 
windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
 
@@ -499,7 +496,6 @@ public class KStreamAggregationIntegrationTest {
         );
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSlidingWindows(final TestInfo testInfo) throws 
Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -512,7 +508,7 @@ public class KStreamAggregationIntegrationTest {
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
         groupedStream
-            
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+            
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
             .reduce(reducer)
             .toStream()
             .to(outputTopic, Produced.with(windowedSerde, Serdes.String()));
@@ -607,7 +603,6 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldAggregateSlidingWindows(final TestInfo testInfo) throws 
Exception {
         final long firstBatchTimestamp = mockTime.milliseconds();
@@ -619,8 +614,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(thirdBatchTimestamp);
 
         final Serde<Windowed<String>> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class, timeDifference);
-        //noinspection deprecation
-        
groupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(500L),
 ofMinutes(5)))
+        
groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(500L),
 ofMinutes(5)))
             .aggregate(
                 initializer,
                 aggregator,
@@ -720,7 +714,6 @@ public class KStreamAggregationIntegrationTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldCountSessionWindows() throws Exception {
         final long sessionGap = 5 * 60 * 1000L;
@@ -800,10 +793,9 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<Long, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
 
-        //noinspection deprecation
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
             .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
-            .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(sessionGap)))
             .count()
             .toStream()
             .process(() -> (Processor<Windowed<String>, Long, Object, Object>) 
record -> {
@@ -823,7 +815,6 @@ public class KStreamAggregationIntegrationTest {
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, 
t3))), equalTo(KeyValue.pair(1L, t3)));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
@@ -903,9 +894,9 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, KeyValue<String, Long>> results = new 
HashMap<>();
         final CountDownLatch latch = new CountDownLatch(13);
         final String userSessionsStore = "UserSessionsStore";
-        //noinspection deprecation
+
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), 
Serdes.String()))
-            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
.windowedBy(SessionWindows.with(ofMillis(sessionGap))) .reduce((value1, value2) 
-> value1 + ":" + value2, Materialized.as(userSessionsStore))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) 
.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(sessionGap), 
ofMinutes(1))) .reduce((value1, value2) -> value1 + ":" + value2, 
Materialized.as(userSessionsStore))
             .toStream()
             .process(() -> (Processor<Windowed<String>, String, Object, 
Object>) record -> {
                 results.put(record.key(), KeyValue.pair(record.value(), 
record.timestamp()));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index d13de4b402b..db9977901d9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -68,7 +68,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 @Timeout(600)
 @Tag("integration")
-@SuppressWarnings("deprecation")
 public class MetricsIntegrationTest {
     private static final int NUM_BROKERS = 1;
     private static final int NUM_THREADS = 2;
@@ -375,7 +374,7 @@ public class MetricsIntegrationTest {
         final Duration windowSize = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .groupByKey()
-            .windowedBy(TimeWindows.of(windowSize).grace(Duration.ZERO))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(windowSize))
             .aggregate(() -> 0L,
                 (aggKey, newValue, aggValue) -> aggValue,
                 Materialized.<Integer, Long, WindowStore<Bytes, 
byte[]>>as(TIME_WINDOWED_AGGREGATED_STREAM_STORE)
@@ -403,7 +402,7 @@ public class MetricsIntegrationTest {
         final Duration inactivityGap = Duration.ofMillis(50);
         builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .groupByKey()
-            
.windowedBy(SessionWindows.with(inactivityGap).grace(Duration.ZERO))
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(inactivityGap))
             .aggregate(() -> 0L,
                 (aggKey, newValue, aggValue) -> aggValue,
                 (aggKey, leftAggValue, rightAggValue) -> leftAggValue,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 2dcf1ba54df..f16c3e57c39 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -120,7 +120,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 
 @Timeout(600)
 @Tag("integration")
-@SuppressWarnings("deprecation")
 public class QueryableStateIntegrationTest {
     private static final Logger log = 
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
 
@@ -271,7 +270,7 @@ public class QueryableStateIntegrationTest {
 
         // Create a Windowed State Store that contains the word count for 
every 1 minute
         groupedByWord
-            .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
             .count(Materialized.as(windowStoreName + "-" + inputTopic))
             .toStream((key, value) -> key.key())
             .to(windowOutputTopic, Produced.with(Serdes.String(), 
Serdes.Long()));
@@ -1000,7 +999,7 @@ public class QueryableStateIntegrationTest {
 
         final String windowStoreName = "windowed-count";
         s1.groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
             .count(Materialized.as(windowStoreName));
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
         startApplicationAndWaitUntilRunning(kafkaStreams);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 17fd60d568f..2fc1a9a644f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -189,7 +189,7 @@ public class RocksDBMetricsIntegrationTest {
         ).toStream().to(STREAM_OUTPUT_ONE);
         builder.stream(STREAM_INPUT_TWO, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .groupByKey()
-            .windowedBy(TimeWindows.of(WINDOW_SIZE).grace(Duration.ZERO))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
             .aggregate(() -> 0L,
                 (aggKey, newValue, aggValue) -> aggValue,
                 Materialized.<Integer, Long, WindowStore<Bytes, 
byte[]>>as("time-windowed-aggregated-stream-store")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 7868dee5512..755ae96f1dc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -18,12 +18,9 @@ package org.apache.kafka.streams.kstream;
 
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
-
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -68,15 +65,6 @@ public class SessionWindowsTest {
         assertEquals(ANY_GRACE, 
SessionWindows.ofInactivityGapAndGrace(ofMillis(ANY_OTHER_SIZE), 
ofMillis(ANY_GRACE)).gracePeriodMs());
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void oldAPIShouldSetDefaultGracePeriod() {
-        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
-        assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, 
SessionWindows.with(ofMillis(3L)).gracePeriodMs());
-        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
-        assertEquals(0L, 
SessionWindows.with(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
-    }
-
     @Test
     public void windowSizeMustNotBeNegative() {
         assertThrows(IllegalArgumentException.class, () -> 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(-1)));
@@ -87,13 +75,6 @@ public class SessionWindowsTest {
         assertThrows(IllegalArgumentException.class, () -> 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(0)));
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void graceShouldNotCalledAfterGraceSet() {
-        assertThrows(IllegalStateException.class, () -> 
SessionWindows.ofInactivityGapAndGrace(ofMillis(10), 
ofMillis(10)).grace(ofMillis(10)));
-        assertThrows(IllegalStateException.class, () -> 
SessionWindows.ofInactivityGapWithNoGrace(ofMillis(10)).grace(ofMillis(10)));
-    }
-
     @Test
     public void equalsAndHashcodeShouldBeValidForPositiveCases() {
         verifyEquality(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
index accd51cbd23..b0d8e2ed524 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SlidingWindowsTest.java
@@ -29,10 +29,8 @@ public class SlidingWindowsTest {
     private static final long ANY_SIZE = 123L;
     private static final long ANY_GRACE = 1024L;
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSetTimeDifference() {
-        assertEquals(ANY_SIZE, 
SlidingWindows.withTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(3)).timeDifferenceMs());
         assertEquals(ANY_SIZE, 
SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(ANY_SIZE), 
ofMillis(ANY_GRACE)).timeDifferenceMs());
         assertEquals(ANY_SIZE, 
SlidingWindows.ofTimeDifferenceWithNoGrace(ofMillis(ANY_SIZE)).timeDifferenceMs());
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 9ad54dbf90e..5f3b2a5d39f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -20,13 +20,11 @@ import 
org.apache.kafka.streams.kstream.internals.TimeWindow;
 
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
 import java.util.Map;
 
 import static java.time.Duration.ofMillis;
 import static org.apache.kafka.streams.EqualityCheck.verifyEquality;
 import static org.apache.kafka.streams.EqualityCheck.verifyInEquality;
-import static 
org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -37,10 +35,8 @@ public class TimeWindowsTest {
     private static final long ANY_SIZE = 123L;
     private static final long ANY_GRACE = 1024L;
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSetWindowSize() {
-        assertEquals(ANY_SIZE, TimeWindows.of(ofMillis(ANY_SIZE)).sizeMs);
         assertEquals(ANY_SIZE, 
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE)).sizeMs);
         assertEquals(ANY_SIZE, TimeWindows.ofSizeAndGrace(ofMillis(ANY_SIZE), 
ofMillis(ANY_GRACE)).sizeMs);
     }
@@ -61,13 +57,6 @@ public class TimeWindowsTest {
         assertThrows(IllegalArgumentException.class, () -> 
TimeWindows.ofSizeWithNoGrace(ofMillis(-1)));
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void graceShouldNotCalledAfterGraceSet() {
-        assertThrows(IllegalStateException.class, () -> 
TimeWindows.ofSizeAndGrace(ofMillis(10), ofMillis(10)).grace(ofMillis(10)));
-        assertThrows(IllegalStateException.class, () -> 
TimeWindows.ofSizeWithNoGrace(ofMillis(10)).grace(ofMillis(10)));
-    }
-
     @Test
     public void advanceIntervalMustNotBeZero() {
         final TimeWindows windowSpec = 
TimeWindows.ofSizeWithNoGrace(ofMillis(ANY_SIZE));
@@ -113,15 +102,6 @@ public class TimeWindowsTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void oldAPIShouldSetDefaultGracePeriod() {
-        assertEquals(Duration.ofDays(1).toMillis(), 
DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD);
-        assertEquals(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD - 3L, 
TimeWindows.of(ofMillis(3L)).gracePeriodMs());
-        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD)).gracePeriodMs());
-        assertEquals(0L, 
TimeWindows.of(ofMillis(DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD + 
1L)).gracePeriodMs());
-    }
-
     @Test
     public void shouldComputeWindowsForHoppingWindows() {
         final TimeWindows windows = 
TimeWindows.ofSizeWithNoGrace(ofMillis(12L)).advanceBy(ofMillis(5L));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
index a3709dea91f..d3ad500ce2e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.java
@@ -47,12 +47,12 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Properties;
 
+import static java.time.Duration.ofDays;
 import static java.time.Duration.ofMillis;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@SuppressWarnings("deprecation")
 public class SessionWindowedCogroupedKStreamImplTest {
 
     private final StreamsBuilder builder = new StreamsBuilder();
@@ -80,7 +80,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
         groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
         cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
-        windowedCogroupedStream = 
cogroupedStream.windowedBy(SessionWindows.with(ofMillis(100)));
+        windowedCogroupedStream = 
cogroupedStream.windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(100),
 ofDays(1)));
     }
 
     @Test
@@ -147,7 +147,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
                 .with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
         groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(SessionWindows.with(ofMillis(1)))
+                
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(1)))
                 .aggregate(MockInitializer.STRING_INIT, sessionMerger, 
Named.as("foo"));
 
         assertThat(builder.build().describe().toString(), equalTo(
@@ -166,7 +166,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
     @Test
     public void sessionWindowAggregateTest() {
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(SessionWindows.with(ofMillis(500)))
+                
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
                 .aggregate(MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
@@ -190,7 +190,7 @@ public class SessionWindowedCogroupedKStreamImplTest {
     @Test
     public void sessionWindowAggregate2Test() {
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(SessionWindows.with(ofMillis(500)))
+                
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(500)))
                 .aggregate(MockInitializer.STRING_INIT, sessionMerger, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
index 6ccc40f2df4..b0fdf879c86 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
@@ -56,7 +56,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@SuppressWarnings("deprecation")
 public class SlidingWindowedCogroupedKStreamImplTest {
 
     private static final String TOPIC = "topic";
@@ -82,7 +81,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
         final KGroupedStream<String, String> groupedStream2 = 
stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
         final CogroupedKStream<String, String> cogroupedStream = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
-        windowedCogroupedStream = 
cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(
+        windowedCogroupedStream = 
cogroupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(
             WINDOW_SIZE_MS), ofMillis(2000L)));
     }
 
@@ -133,7 +132,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
                 .with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
         groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), 
ofMillis(2000L)))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), 
ofMillis(2000L)))
                 .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
 
         assertThat(builder.build().describe().toString(), equalTo(
@@ -210,7 +209,7 @@ public class SlidingWindowedCogroupedKStreamImplTest {
     public void slidingWindowAggregateOverlappingWindowsTest() {
 
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), 
ofMillis(2000L))).aggregate(
+                
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(WINDOW_SIZE_MS), 
ofMillis(2000L))).aggregate(
                         MockInitializer.STRING_INIT, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index d99bc5671ce..4c37fa2f5cf 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -364,7 +364,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-            .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(2L), ofMillis(1L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -415,7 +415,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-            .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(2L)))
+            .windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(2L), ofMillis(2L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -471,7 +471,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
                 .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
                 .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-                
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(5L), 
ofMillis(15L)))
+                
.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(ofMillis(5L), 
ofMillis(15L)))
                 .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
         valueCounts
                 .suppress(untilWindowCloses(unbounded()))
@@ -561,7 +561,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(0L)))
+            
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(ofMillis(5L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -817,7 +817,7 @@ public class SuppressScenarioTest {
         final KGroupedStream<String, String> stream1 = builder.stream("one", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
         final KGroupedStream<String, String> stream2 = builder.stream("two", 
Consumed.with(Serdes.String(), 
Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
         final KStream<Windowed<String>, Object> cogrouped = 
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, 
(key, value, aggregate) -> aggregate + value)
-            .windowedBy(TimeWindows.of(Duration.ofMinutes(15)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(15)))
             .aggregate(() -> "", Named.as("test"), Materialized.as("store"))
             .suppress(Suppressed.untilWindowCloses(unbounded()))
             .toStream();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
index 3b45b9b8f35..2254f8c10af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressTopologyTest.java
@@ -45,7 +45,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@SuppressWarnings("deprecation")
 public class SuppressTopologyTest {
     private static final Serde<String> STRING_SERDE = Serdes.String();
 
@@ -154,7 +153,7 @@ public class SuppressTopologyTest {
         anonymousNodeBuilder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
+            .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(5L), 
ofMillis(5L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("counts").withCachingDisabled())
             .suppress(untilWindowCloses(unbounded()))
             .toStream()
@@ -172,7 +171,7 @@ public class SuppressTopologyTest {
         namedNodeBuilder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, 
STRING_SERDE))
-            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
+            .windowedBy(SessionWindows.ofInactivityGapAndGrace(ofMillis(5L), 
ofMillis(5L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("counts").withCachingDisabled())
             .suppress(untilWindowCloses(unbounded()).withName("myname"))
             .toStream()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
index f43d56b53e3..17420e0e3c9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowTest.java
@@ -125,10 +125,9 @@ public class TimeWindowTest {
         assertThrows(IllegalArgumentException.class, () -> 
window.overlap(sessionWindow));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldReturnMatchedWindowsOrderedByTimestamp() {
-        final TimeWindows windows = 
TimeWindows.of(ofMillis(12L)).advanceBy(ofMillis(5L));
+        final TimeWindows windows = 
TimeWindows.ofSizeWithNoGrace(ofMillis(12L)).advanceBy(ofMillis(5L));
         final Map<Long, TimeWindow> matched = windows.windowsFor(21L);
 
         final Long[] expected = matched.keySet().toArray(new Long[0]);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
index a1a46ca5130..de157ce3fdc 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedCogroupedKStreamImplTest.java
@@ -53,7 +53,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-@SuppressWarnings("deprecation")
 public class TimeWindowedCogroupedKStreamImplTest {
 
     private static final Long WINDOW_SIZE = 500L;
@@ -81,7 +80,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
         groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
         cogroupedStream = groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
                 .cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
-        windowedCogroupedStream = 
cogroupedStream.windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)));
+        windowedCogroupedStream = 
cogroupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)));
     }
 
     @Test
@@ -132,7 +131,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
                 .with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), 
Serdes.String()));
         groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(TimeWindows.of(ofMillis(WINDOW_SIZE)))
+                
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(WINDOW_SIZE)))
                 .aggregate(MockInitializer.STRING_INIT, Named.as("foo"));
 
         assertThat(builder.build().describe().toString(), equalTo(
@@ -222,7 +221,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
     public void timeWindowAggregateManyWindowsTest() {
 
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                .windowedBy(TimeWindows.of(ofMillis(500L))).aggregate(
+                
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))).aggregate(
                         MockInitializer.STRING_INIT, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
@@ -248,7 +247,7 @@ public class TimeWindowedCogroupedKStreamImplTest {
     public void timeWindowAggregateOverlappingWindowsTest() {
 
         final KTable<Windowed<String>, String> customers = 
groupedStream.cogroup(MockAggregator.TOSTRING_ADDER)
-                
.windowedBy(TimeWindows.of(ofMillis(500L)).advanceBy(ofMillis(200L))).aggregate(
+                
.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L)).advanceBy(ofMillis(200L))).aggregate(
                         MockInitializer.STRING_INIT, 
Materialized.with(Serdes.String(), Serdes.String()));
         customers.toStream().to(OUTPUT);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 5a4a16fa94f..79ea95118f4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -55,7 +55,6 @@ import java.util.regex.Pattern;
 import static java.time.Duration.ofMillis;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-@SuppressWarnings("deprecation")
 public class StreamsGraphTest {
 
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
@@ -64,6 +63,7 @@ public class StreamsGraphTest {
 
     // Test builds topology in successive manner but only graph node not yet 
processed written to topology
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldBeAbleToBuildTopologyIncrementally() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -107,7 +107,7 @@ public class StreamsGraphTest {
 
         // second repartition
         changedKeyStream.groupByKey(Grouped.as("windowed-repartition"))
-            .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5)))
             .count(Materialized.as("windowed-count-store"))
             .toStream()
             .map((k, v) -> KeyValue.pair(k.key(), v)).to("windowed-count", 
Produced.with(Serdes.String(), Serdes.Long()));
@@ -250,7 +250,7 @@ public class StreamsGraphTest {
         final KStream<String, String> mappedKeyStream = 
inputStream.selectKey((k, v) -> k + v);
 
         mappedKeyStream.mapValues(v -> 
v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
-        mappedKeyStream.flatMapValues(v -> 
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.of(ofMillis(5000))).count().toStream().to("windowed-output");
+        mappedKeyStream.flatMapValues(v -> 
Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
 
         return builder.build(properties);
 
@@ -267,7 +267,7 @@ public class StreamsGraphTest {
         final KStream<String, String> mappedKeyStream = 
inputStream.selectKey((k, v) -> k + v).through("through-topic");
 
         mappedKeyStream.groupByKey().count().toStream().to("output");
-        
mappedKeyStream.groupByKey().windowedBy(TimeWindows.of(ofMillis(5000))).count().toStream().to("windowed-output");
+        
mappedKeyStream.groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000))).count().toStream().to("windowed-output");
 
         return builder.build(properties);
 
@@ -290,7 +290,7 @@ public class StreamsGraphTest {
         inputStream
             .repartition()
             .groupByKey()
-            .windowedBy(TimeWindows.of(ofMillis(5000)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(5000)))
             .count()
             .toStream()
             .to("windowed-output");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9460bbe83ac..4702681a650 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -176,7 +176,6 @@ import static org.mockito.Mockito.when;
 @ExtendWith(MockitoExtension.class)
 @MockitoSettings(strictness = Strictness.STRICT_STUBS)
 @Timeout(30_000)
-@SuppressWarnings("deprecation")
 public class StreamsPartitionAssignorTest {
 
     private static final String CONSUMER_1 = "consumer1";
@@ -1837,6 +1836,7 @@ public class StreamsPartitionAssignorTest {
         assertThrows(ConfigException.class, () -> 
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.APPLICATION_SERVER_CONFIG,
 "localhost:j87yhk"), parameterizedConfig));
     }
 
+    @SuppressWarnings("deprecation")
     @ParameterizedTest
     @MethodSource("parameter")
     public void 
shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks(final 
Map<String, Object> parameterizedConfig) {
@@ -2710,7 +2710,7 @@ public class StreamsPartitionAssignorTest {
                 (k, v) -> k,
                 Grouped.with("GroupName", Serdes.String(), Serdes.String())
             )
-            .windowedBy(TimeWindows.of(Duration.ofMinutes(10)))
+            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(10)))
             .aggregate(
                 () -> "",
                 (k, v, a) -> a + k)

Reply via email to