This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 56671891d6f KAFKA-17131: Refactor TimeDefinitions (#18241)
56671891d6f is described below

commit 56671891d6f4e66c6402daecf1be6f8a7fe86f89
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 19 11:40:43 2024 -0800

    KAFKA-17131: Refactor TimeDefinitions (#18241)
    
    Refactor TimeDefintiions to not use old ProcessorContext any longer.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../apache/kafka/streams/kstream/Suppressed.java   | 43 +++++++++++-----------
 .../suppress/FinalResultsSuppressionBuilder.java   |  2 +-
 .../suppress/KTableSuppressProcessorSupplier.java  |  6 +--
 .../internals/suppress/SuppressedInternal.java     | 10 ++---
 .../internals/suppress/TimeDefinitions.java        | 40 ++++++--------------
 5 files changed, 43 insertions(+), 58 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 57b18b4caf5..5bda71d487b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -72,16 +72,16 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
         /**
          * Create a buffer unconstrained by size (either keys or bytes).
          *
-         * As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
+         * <p>As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
          *
-         * If there isn't enough heap available to meet the demand, the 
application will encounter an
+         * <p>If there isn't enough heap available to meet the demand, the 
application will encounter an
          * {@link OutOfMemoryError} and shut down (not guaranteed to be a 
graceful exit). Also, note that
          * JVM processes under extreme memory pressure may exhibit poor GC 
behavior.
          *
-         * This is a convenient option if you doubt that your buffer will be 
that large, but also don't
+         * <p>This is a convenient option if you doubt that your buffer will 
be that large, but also don't
          * wish to pick particular constraints, such as in testing.
          *
-         * This buffer is "strict" in the sense that it will enforce the time 
bound or crash.
+         * <p>This buffer is "strict" in the sense that it will enforce the 
time bound or crash.
          * It will never emit early.
          */
         static StrictBufferConfig unbounded() {
@@ -91,16 +91,16 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
         /**
          * Set the buffer to be unconstrained by size (either keys or bytes).
          *
-         * As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
+         * <p>As a result, the buffer will consume as much memory as it needs, 
dictated by the time bound.
          *
-         * If there isn't enough heap available to meet the demand, the 
application will encounter an
+         * <p>If there isn't enough heap available to meet the demand, the 
application will encounter an
          * {@link OutOfMemoryError} and shut down (not guaranteed to be a 
graceful exit). Also, note that
          * JVM processes under extreme memory pressure may exhibit poor GC 
behavior.
          *
-         * This is a convenient option if you doubt that your buffer will be 
that large, but also don't
+         * <p>This is a convenient option if you doubt that your buffer will 
be that large, but also don't
          * wish to pick particular constraints, such as in testing.
          *
-         * This buffer is "strict" in the sense that it will enforce the time 
bound or crash.
+         * <p>This buffer is "strict" in the sense that it will enforce the 
time bound or crash.
          * It will never emit early.
          */
         StrictBufferConfig withNoBound();
@@ -108,7 +108,7 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
         /**
          * Set the buffer to gracefully shut down the application when any of 
its constraints are violated
          *
-         * This buffer is "strict" in the sense that it will enforce the time 
bound or shut down.
+         * <p>This buffer is "strict" in the sense that it will enforce the 
time bound or shut down.
          * It will never emit early.
          */
         StrictBufferConfig shutDownWhenFull();
@@ -116,7 +116,7 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
         /**
          * Set the buffer to just emit the oldest records when any of its 
constraints are violated.
          *
-         * This buffer is "not strict" in the sense that it may emit early, so 
it is suitable for reducing
+         * <p>This buffer is "not strict" in the sense that it may emit early, 
so it is suitable for reducing
          * duplicate results downstream, but does not promise to eliminate 
them.
          */
         EagerBufferConfig emitEarlyWhenFull();
@@ -125,6 +125,7 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
          * Disable the changelog for this suppression's internal buffer.
          * This will turn off fault-tolerance for the suppression, and will 
result in data loss in the event of a rebalance.
          * By default, the changelog is enabled.
+         *
          * @return this
          */
         BC withLoggingDisabled();
@@ -144,14 +145,14 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
     /**
      * Configure the suppression to emit only the "final results" from the 
window.
      *
-     * By default, all Streams operators emit results whenever new results are 
available.
+     * <p>By default, all Streams operators emit results whenever new results 
are available.
      * This includes windowed operations.
      *
-     * This configuration will instead emit just one result per key for each 
window, guaranteeing
+     * <p>This configuration will instead emit just one result per key for 
each window, guaranteeing
      * to deliver only the final result. This option is suitable for use cases 
in which the business logic
      * requires a hard guarantee that only the final result is propagated. For 
example, sending alerts.
      *
-     * To accomplish this, the operator will buffer events from the window 
until the window close (that is,
+     * <p>To accomplish this, the operator will buffer events from the window 
until the window close (that is,
      * until the end-time passes, and additionally until the grace period 
expires). Since windowed operators
      * are required to reject out-of-order events for a window whose grace 
period is expired, there is an additional
      * guarantee that the final results emitted from this suppression will 
match any queryable state upstream.
@@ -161,7 +162,7 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
      *                     property to emit early and then issue an update 
later.
      * @return a "final results" mode suppression configuration
      */
-    static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig 
bufferConfig) {
+    static Suppressed<Windowed<?>> untilWindowCloses(final StrictBufferConfig 
bufferConfig) {
         return new FinalResultsSuppressionBuilder<>(null, bufferConfig);
     }
 
@@ -175,20 +176,20 @@ public interface Suppressed<K> extends 
NamedOperation<Suppressed<K>> {
      * @param <K> The key type for the KTable to apply this suppression to.
      * @return a suppression configuration
      */
-    static <K> Suppressed<K> untilTimeLimit(final Duration 
timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
+    static <K> Suppressed<K> untilTimeLimit(final Duration 
timeToWaitForMoreEvents, final BufferConfig<?> bufferConfig) {
         return new SuppressedInternal<>(null, timeToWaitForMoreEvents, 
bufferConfig, null, false);
     }
 
     /**
      * Use the specified name for the suppression node in the topology.
-     * <p>
-     * This can be used to insert a suppression without changing the rest of 
the topology names
+     *
+     * <p>This can be used to insert a suppression without changing the rest 
of the topology names
      * (and therefore not requiring an application reset).
-     * <p>
-     * Note however, that once a suppression has buffered some records, 
removing it from the topology would cause
+     *
+     * <p>Note however, that once a suppression has buffered some records, 
removing it from the topology would cause
      * the loss of those records.
-     * <p>
-     * A suppression can be "disabled" with the configuration {@code 
untilTimeLimit(Duration.ZERO, ...}.
+     *
+     * <p>A suppression can be "disabled" with the configuration {@code 
untilTimeLimit(Duration.ZERO, ...}.
      *
      * @param name The name to be used for the suppression node and changelog 
topic
      * @return The same configuration with the addition of the given {@code 
name}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index e917556c873..9aff6e61b84 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed;
 import java.time.Duration;
 import java.util.Objects;
 
-public class FinalResultsSuppressionBuilder<K extends Windowed> implements 
Suppressed<K>, NamedSuppressed<K> {
+public class FinalResultsSuppressionBuilder<K extends Windowed<?>> implements 
Suppressed<K>, NamedSuppressed<K> {
     private final String name;
     private final StrictBufferConfig bufferConfig;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 0b0c6ca15e9..3f98d444bb2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -63,12 +63,12 @@ public class KTableSuppressProcessorSupplier<K, V> 
implements
     @Override
     public KTableValueGetterSupplier<K, V> view() {
         final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = 
parentKTable.valueGetterSupplier();
-        return new KTableValueGetterSupplier<K, V>() {
+        return new KTableValueGetterSupplier<>() {
 
             @Override
             public KTableValueGetter<K, V> get() {
                 final KTableValueGetter<K, V> parentGetter = 
parentValueGetterSupplier.get();
-                return new KTableValueGetter<K, V>() {
+                return new KTableValueGetter<>() {
                     private TimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
 
                     @Override
@@ -166,7 +166,7 @@ public class KTableSuppressProcessorSupplier<K, V> 
implements
         }
 
         private void buffer(final Record<K, Change<V>> record) {
-            final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, record.key());
+            final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext.recordContext(), 
record.key());
 
             buffer.put(bufferTime, record, 
internalProcessorContext.recordContext());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 51307bba9f5..89b07a9808b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -27,7 +27,7 @@ public class SuppressedInternal<K> implements Suppressed<K>, 
NamedSuppressed<K>
     private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG = 
(StrictBufferConfigImpl) BufferConfig.unbounded();
 
     private final String name;
-    private final BufferConfigInternal bufferConfig;
+    private final BufferConfigInternal<?> bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
     private final boolean safeToDropTombstones;
@@ -39,7 +39,7 @@ public class SuppressedInternal<K> implements Suppressed<K>, 
NamedSuppressed<K>
      *                             idempotent and correct). We decided that 
the unnecessary tombstones would not be
      *                             desirable in the output stream, though, 
hence the ability to drop them.
      *
-     *                             A alternative is to remember whether a 
result has previously been emitted
+     *                             <p>A alternative is to remember whether a 
result has previously been emitted
      *                             for a key and drop tombstones in that case, 
but it would be a little complicated to
      *                             figure out when to forget the fact that we 
have emitted some result (currently, the
      *                             buffer immediately forgets all about a key 
when we emit, which helps to keep it
@@ -47,13 +47,13 @@ public class SuppressedInternal<K> implements 
Suppressed<K>, NamedSuppressed<K>
      */
     public SuppressedInternal(final String name,
                               final Duration suppressionTime,
-                              final BufferConfig bufferConfig,
+                              final BufferConfig<?> bufferConfig,
                               final TimeDefinition<K> timeDefinition,
                               final boolean safeToDropTombstones) {
         this.name = name;
         this.timeToWaitForMoreEvents = suppressionTime == null ? 
DEFAULT_SUPPRESSION_TIME : suppressionTime;
         this.timeDefinition = timeDefinition == null ? 
TimeDefinitions.RecordTimeDefinition.instance() : timeDefinition;
-        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
(BufferConfigInternal) bufferConfig;
+        this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
(BufferConfigInternal<?>) bufferConfig;
         this.safeToDropTombstones = safeToDropTombstones;
     }
 
@@ -69,7 +69,7 @@ public class SuppressedInternal<K> implements Suppressed<K>, 
NamedSuppressed<K>
 
     @SuppressWarnings("unchecked")
     public <BC extends Suppressed.BufferConfig<BC>> BufferConfigInternal<BC> 
bufferConfig() {
-        return bufferConfig;
+        return (BufferConfigInternal<BC>) bufferConfig;
     }
 
     TimeDefinition<K> timeDefinition() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
index 640965fdd6a..c4a38e23c97 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
@@ -17,63 +17,47 @@
 package org.apache.kafka.streams.kstream.internals.suppress;
 
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
 
 final class TimeDefinitions {
     private TimeDefinitions() {}
 
-    enum TimeDefinitionType {
-        RECORD_TIME, WINDOW_END_TIME
-    }
-
     /**
      * This interface should never be instantiated outside of this class.
      */
     interface TimeDefinition<K> {
-        long time(final ProcessorContext context, final K key);
-
-        TimeDefinitionType type();
+        long time(final RecordContext context, final K key);
     }
 
-    public static class RecordTimeDefinition<K> implements TimeDefinition<K> {
-        private static final RecordTimeDefinition INSTANCE = new 
RecordTimeDefinition();
+    static class RecordTimeDefinition<K> implements TimeDefinition<K> {
+        private static final RecordTimeDefinition<?> INSTANCE = new 
RecordTimeDefinition<>();
 
         private RecordTimeDefinition() {}
 
         @SuppressWarnings("unchecked")
-        public static <K> RecordTimeDefinition<K> instance() {
-            return RecordTimeDefinition.INSTANCE;
+        static <K> RecordTimeDefinition<K> instance() {
+            return (RecordTimeDefinition<K>) RecordTimeDefinition.INSTANCE;
         }
 
         @Override
-        public long time(final ProcessorContext context, final K key) {
+        public long time(final RecordContext context, final K key) {
             return context.timestamp();
         }
-
-        @Override
-        public TimeDefinitionType type() {
-            return TimeDefinitionType.RECORD_TIME;
-        }
     }
 
-    public static class WindowEndTimeDefinition<K extends Windowed> implements 
TimeDefinition<K> {
-        private static final WindowEndTimeDefinition INSTANCE = new 
WindowEndTimeDefinition();
+    static class WindowEndTimeDefinition<K extends Windowed<?>> implements 
TimeDefinition<K> {
+        private static final WindowEndTimeDefinition<?> INSTANCE = new 
WindowEndTimeDefinition<>();
 
         private WindowEndTimeDefinition() {}
 
         @SuppressWarnings("unchecked")
-        public static <K extends Windowed> WindowEndTimeDefinition<K> 
instance() {
-            return WindowEndTimeDefinition.INSTANCE;
+        static <K extends Windowed<?>> WindowEndTimeDefinition<K> instance() {
+            return (WindowEndTimeDefinition<K>) 
WindowEndTimeDefinition.INSTANCE;
         }
 
         @Override
-        public long time(final ProcessorContext context, final K key) {
+        public long time(final RecordContext context, final K key) {
             return key.window().end();
         }
-
-        @Override
-        public TimeDefinitionType type() {
-            return TimeDefinitionType.WINDOW_END_TIME;
-        }
     }
 }

Reply via email to