This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 56671891d6f KAFKA-17131: Refactor TimeDefinitions (#18241)
56671891d6f is described below
commit 56671891d6f4e66c6402daecf1be6f8a7fe86f89
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Dec 19 11:40:43 2024 -0800
KAFKA-17131: Refactor TimeDefinitions (#18241)
Refactor TimeDefintiions to not use old ProcessorContext any longer.
Reviewers: Bruno Cadonna <[email protected]>
---
.../apache/kafka/streams/kstream/Suppressed.java | 43 +++++++++++-----------
.../suppress/FinalResultsSuppressionBuilder.java | 2 +-
.../suppress/KTableSuppressProcessorSupplier.java | 6 +--
.../internals/suppress/SuppressedInternal.java | 10 ++---
.../internals/suppress/TimeDefinitions.java | 40 ++++++--------------
5 files changed, 43 insertions(+), 58 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 57b18b4caf5..5bda71d487b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -72,16 +72,16 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
/**
* Create a buffer unconstrained by size (either keys or bytes).
*
- * As a result, the buffer will consume as much memory as it needs,
dictated by the time bound.
+ * <p>As a result, the buffer will consume as much memory as it needs,
dictated by the time bound.
*
- * If there isn't enough heap available to meet the demand, the
application will encounter an
+ * <p>If there isn't enough heap available to meet the demand, the
application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a
graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC
behavior.
*
- * This is a convenient option if you doubt that your buffer will be
that large, but also don't
+ * <p>This is a convenient option if you doubt that your buffer will
be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
- * This buffer is "strict" in the sense that it will enforce the time
bound or crash.
+ * <p>This buffer is "strict" in the sense that it will enforce the
time bound or crash.
* It will never emit early.
*/
static StrictBufferConfig unbounded() {
@@ -91,16 +91,16 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
/**
* Set the buffer to be unconstrained by size (either keys or bytes).
*
- * As a result, the buffer will consume as much memory as it needs,
dictated by the time bound.
+ * <p>As a result, the buffer will consume as much memory as it needs,
dictated by the time bound.
*
- * If there isn't enough heap available to meet the demand, the
application will encounter an
+ * <p>If there isn't enough heap available to meet the demand, the
application will encounter an
* {@link OutOfMemoryError} and shut down (not guaranteed to be a
graceful exit). Also, note that
* JVM processes under extreme memory pressure may exhibit poor GC
behavior.
*
- * This is a convenient option if you doubt that your buffer will be
that large, but also don't
+ * <p>This is a convenient option if you doubt that your buffer will
be that large, but also don't
* wish to pick particular constraints, such as in testing.
*
- * This buffer is "strict" in the sense that it will enforce the time
bound or crash.
+ * <p>This buffer is "strict" in the sense that it will enforce the
time bound or crash.
* It will never emit early.
*/
StrictBufferConfig withNoBound();
@@ -108,7 +108,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
/**
* Set the buffer to gracefully shut down the application when any of
its constraints are violated
*
- * This buffer is "strict" in the sense that it will enforce the time
bound or shut down.
+ * <p>This buffer is "strict" in the sense that it will enforce the
time bound or shut down.
* It will never emit early.
*/
StrictBufferConfig shutDownWhenFull();
@@ -116,7 +116,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
/**
* Set the buffer to just emit the oldest records when any of its
constraints are violated.
*
- * This buffer is "not strict" in the sense that it may emit early, so
it is suitable for reducing
+ * <p>This buffer is "not strict" in the sense that it may emit early,
so it is suitable for reducing
* duplicate results downstream, but does not promise to eliminate
them.
*/
EagerBufferConfig emitEarlyWhenFull();
@@ -125,6 +125,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
* Disable the changelog for this suppression's internal buffer.
* This will turn off fault-tolerance for the suppression, and will
result in data loss in the event of a rebalance.
* By default, the changelog is enabled.
+ *
* @return this
*/
BC withLoggingDisabled();
@@ -144,14 +145,14 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
/**
* Configure the suppression to emit only the "final results" from the
window.
*
- * By default, all Streams operators emit results whenever new results are
available.
+ * <p>By default, all Streams operators emit results whenever new results
are available.
* This includes windowed operations.
*
- * This configuration will instead emit just one result per key for each
window, guaranteeing
+ * <p>This configuration will instead emit just one result per key for
each window, guaranteeing
* to deliver only the final result. This option is suitable for use cases
in which the business logic
* requires a hard guarantee that only the final result is propagated. For
example, sending alerts.
*
- * To accomplish this, the operator will buffer events from the window
until the window close (that is,
+ * <p>To accomplish this, the operator will buffer events from the window
until the window close (that is,
* until the end-time passes, and additionally until the grace period
expires). Since windowed operators
* are required to reject out-of-order events for a window whose grace
period is expired, there is an additional
* guarantee that the final results emitted from this suppression will
match any queryable state upstream.
@@ -161,7 +162,7 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
* property to emit early and then issue an update
later.
* @return a "final results" mode suppression configuration
*/
- static Suppressed<Windowed> untilWindowCloses(final StrictBufferConfig
bufferConfig) {
+ static Suppressed<Windowed<?>> untilWindowCloses(final StrictBufferConfig
bufferConfig) {
return new FinalResultsSuppressionBuilder<>(null, bufferConfig);
}
@@ -175,20 +176,20 @@ public interface Suppressed<K> extends
NamedOperation<Suppressed<K>> {
* @param <K> The key type for the KTable to apply this suppression to.
* @return a suppression configuration
*/
- static <K> Suppressed<K> untilTimeLimit(final Duration
timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
+ static <K> Suppressed<K> untilTimeLimit(final Duration
timeToWaitForMoreEvents, final BufferConfig<?> bufferConfig) {
return new SuppressedInternal<>(null, timeToWaitForMoreEvents,
bufferConfig, null, false);
}
/**
* Use the specified name for the suppression node in the topology.
- * <p>
- * This can be used to insert a suppression without changing the rest of
the topology names
+ *
+ * <p>This can be used to insert a suppression without changing the rest
of the topology names
* (and therefore not requiring an application reset).
- * <p>
- * Note however, that once a suppression has buffered some records,
removing it from the topology would cause
+ *
+ * <p>Note however, that once a suppression has buffered some records,
removing it from the topology would cause
* the loss of those records.
- * <p>
- * A suppression can be "disabled" with the configuration {@code
untilTimeLimit(Duration.ZERO, ...}.
+ *
+ * <p>A suppression can be "disabled" with the configuration {@code
untilTimeLimit(Duration.ZERO, ...}.
*
* @param name The name to be used for the suppression node and changelog
topic
* @return The same configuration with the addition of the given {@code
name}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index e917556c873..9aff6e61b84 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import java.time.Duration;
import java.util.Objects;
-public class FinalResultsSuppressionBuilder<K extends Windowed> implements
Suppressed<K>, NamedSuppressed<K> {
+public class FinalResultsSuppressionBuilder<K extends Windowed<?>> implements
Suppressed<K>, NamedSuppressed<K> {
private final String name;
private final StrictBufferConfig bufferConfig;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
index 0b0c6ca15e9..3f98d444bb2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
@@ -63,12 +63,12 @@ public class KTableSuppressProcessorSupplier<K, V>
implements
@Override
public KTableValueGetterSupplier<K, V> view() {
final KTableValueGetterSupplier<K, V> parentValueGetterSupplier =
parentKTable.valueGetterSupplier();
- return new KTableValueGetterSupplier<K, V>() {
+ return new KTableValueGetterSupplier<>() {
@Override
public KTableValueGetter<K, V> get() {
final KTableValueGetter<K, V> parentGetter =
parentValueGetterSupplier.get();
- return new KTableValueGetter<K, V>() {
+ return new KTableValueGetter<>() {
private TimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
@Override
@@ -166,7 +166,7 @@ public class KTableSuppressProcessorSupplier<K, V>
implements
}
private void buffer(final Record<K, Change<V>> record) {
- final long bufferTime =
bufferTimeDefinition.time(internalProcessorContext, record.key());
+ final long bufferTime =
bufferTimeDefinition.time(internalProcessorContext.recordContext(),
record.key());
buffer.put(bufferTime, record,
internalProcessorContext.recordContext());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
index 51307bba9f5..89b07a9808b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java
@@ -27,7 +27,7 @@ public class SuppressedInternal<K> implements Suppressed<K>,
NamedSuppressed<K>
private static final StrictBufferConfigImpl DEFAULT_BUFFER_CONFIG =
(StrictBufferConfigImpl) BufferConfig.unbounded();
private final String name;
- private final BufferConfigInternal bufferConfig;
+ private final BufferConfigInternal<?> bufferConfig;
private final Duration timeToWaitForMoreEvents;
private final TimeDefinition<K> timeDefinition;
private final boolean safeToDropTombstones;
@@ -39,7 +39,7 @@ public class SuppressedInternal<K> implements Suppressed<K>,
NamedSuppressed<K>
* idempotent and correct). We decided that
the unnecessary tombstones would not be
* desirable in the output stream, though,
hence the ability to drop them.
*
- * A alternative is to remember whether a
result has previously been emitted
+ * <p>A alternative is to remember whether a
result has previously been emitted
* for a key and drop tombstones in that case,
but it would be a little complicated to
* figure out when to forget the fact that we
have emitted some result (currently, the
* buffer immediately forgets all about a key
when we emit, which helps to keep it
@@ -47,13 +47,13 @@ public class SuppressedInternal<K> implements
Suppressed<K>, NamedSuppressed<K>
*/
public SuppressedInternal(final String name,
final Duration suppressionTime,
- final BufferConfig bufferConfig,
+ final BufferConfig<?> bufferConfig,
final TimeDefinition<K> timeDefinition,
final boolean safeToDropTombstones) {
this.name = name;
this.timeToWaitForMoreEvents = suppressionTime == null ?
DEFAULT_SUPPRESSION_TIME : suppressionTime;
this.timeDefinition = timeDefinition == null ?
TimeDefinitions.RecordTimeDefinition.instance() : timeDefinition;
- this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG :
(BufferConfigInternal) bufferConfig;
+ this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG :
(BufferConfigInternal<?>) bufferConfig;
this.safeToDropTombstones = safeToDropTombstones;
}
@@ -69,7 +69,7 @@ public class SuppressedInternal<K> implements Suppressed<K>,
NamedSuppressed<K>
@SuppressWarnings("unchecked")
public <BC extends Suppressed.BufferConfig<BC>> BufferConfigInternal<BC>
bufferConfig() {
- return bufferConfig;
+ return (BufferConfigInternal<BC>) bufferConfig;
}
TimeDefinition<K> timeDefinition() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
index 640965fdd6a..c4a38e23c97 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeDefinitions.java
@@ -17,63 +17,47 @@
package org.apache.kafka.streams.kstream.internals.suppress;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
final class TimeDefinitions {
private TimeDefinitions() {}
- enum TimeDefinitionType {
- RECORD_TIME, WINDOW_END_TIME
- }
-
/**
* This interface should never be instantiated outside of this class.
*/
interface TimeDefinition<K> {
- long time(final ProcessorContext context, final K key);
-
- TimeDefinitionType type();
+ long time(final RecordContext context, final K key);
}
- public static class RecordTimeDefinition<K> implements TimeDefinition<K> {
- private static final RecordTimeDefinition INSTANCE = new
RecordTimeDefinition();
+ static class RecordTimeDefinition<K> implements TimeDefinition<K> {
+ private static final RecordTimeDefinition<?> INSTANCE = new
RecordTimeDefinition<>();
private RecordTimeDefinition() {}
@SuppressWarnings("unchecked")
- public static <K> RecordTimeDefinition<K> instance() {
- return RecordTimeDefinition.INSTANCE;
+ static <K> RecordTimeDefinition<K> instance() {
+ return (RecordTimeDefinition<K>) RecordTimeDefinition.INSTANCE;
}
@Override
- public long time(final ProcessorContext context, final K key) {
+ public long time(final RecordContext context, final K key) {
return context.timestamp();
}
-
- @Override
- public TimeDefinitionType type() {
- return TimeDefinitionType.RECORD_TIME;
- }
}
- public static class WindowEndTimeDefinition<K extends Windowed> implements
TimeDefinition<K> {
- private static final WindowEndTimeDefinition INSTANCE = new
WindowEndTimeDefinition();
+ static class WindowEndTimeDefinition<K extends Windowed<?>> implements
TimeDefinition<K> {
+ private static final WindowEndTimeDefinition<?> INSTANCE = new
WindowEndTimeDefinition<>();
private WindowEndTimeDefinition() {}
@SuppressWarnings("unchecked")
- public static <K extends Windowed> WindowEndTimeDefinition<K>
instance() {
- return WindowEndTimeDefinition.INSTANCE;
+ static <K extends Windowed<?>> WindowEndTimeDefinition<K> instance() {
+ return (WindowEndTimeDefinition<K>)
WindowEndTimeDefinition.INSTANCE;
}
@Override
- public long time(final ProcessorContext context, final K key) {
+ public long time(final RecordContext context, final K key) {
return key.window().end();
}
-
- @Override
- public TimeDefinitionType type() {
- return TimeDefinitionType.WINDOW_END_TIME;
- }
}
}