This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12be344fdd3 KAFKA-14936: Add Grace period logic to Stream Table Join
(2/N) (#13855)
12be344fdd3 is described below
commit 12be344fdd3b20f338ccab87933b89049ce202a4
Author: Walker Carlson <[email protected]>
AuthorDate: Thu Jun 29 07:14:04 2023 -0500
KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) (#13855)
This PR adds the interface for grace period to the Joined object as well as
uses the buffer. The majority of it is tests and moving some of the existing
join logic.
Reviewers: Victoria Xia <[email protected]>, Bruno Cadonna
<[email protected]>
---
.../org/apache/kafka/streams/kstream/Joined.java | 77 +++++++++--
.../kstream/internals/KStreamGlobalKTableJoin.java | 3 +-
.../streams/kstream/internals/KStreamImpl.java | 22 +++-
.../kstream/internals/KStreamKTableJoin.java | 15 ++-
.../internals/KStreamKTableJoinProcessor.java | 89 +++++++++++--
.../InMemoryTimeOrderedKeyValueChangeBuffer.java | 7 +-
.../RocksDBTimeOrderedKeyValueBuffer.java | 5 +-
...cksDBTimeOrderedKeyValueBytesStoreSupplier.java | 7 +-
.../state/internals/TimeOrderedKeyValueBuffer.java | 2 +-
.../StreamTableJoinWithGraceIntegrationTest.java | 144 +++++++++++++++++++++
.../kstream/internals/KStreamKTableJoinTest.java | 110 ++++++++++++++++
.../RocksDBTimeOrderedKeyValueBufferTest.java | 14 +-
12 files changed, 455 insertions(+), 40 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index a2793afce16..55dff2428f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Serde;
+import java.time.Duration;
+
/**
* The {@code Joined} class represents optional params that can be passed to
* {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)}
and
@@ -29,19 +31,22 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
protected final Serde<V> valueSerde;
protected final Serde<VO> otherValueSerde;
protected final String name;
+ protected final Duration gracePeriod;
private Joined(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO> otherValueSerde,
- final String name) {
+ final String name,
+ final Duration gracePeriod) {
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.otherValueSerde = otherValueSerde;
this.name = name;
+ this.gracePeriod = gracePeriod;
}
protected Joined(final Joined<K, V, VO> joined) {
- this(joined.keySerde, joined.valueSerde, joined.otherValueSerde,
joined.name);
+ this(joined.keySerde, joined.valueSerde, joined.otherValueSerde,
joined.name, joined.gracePeriod);
}
/**
@@ -59,7 +64,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
final Serde<V> valueSerde,
final Serde<VO>
otherValueSerde) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, null);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null);
}
/**
@@ -84,7 +89,34 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
final Serde<V> valueSerde,
final Serde<VO>
otherValueSerde,
final String name) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null);
+ }
+
+ /**
+ * Create an instance of {@code Joined} with key, value, and otherValue
{@link Serde} instances.
+ * {@code null} values are accepted and will be replaced by the default
serdes as defined in
+ * config.
+ *
+ * @param keySerde the key serde to use. If {@code null} the default key
serde from config will be
+ * used
+ * @param valueSerde the value serde to use. If {@code null} the default
value serde from config
+ * will be used
+ * @param otherValueSerde the otherValue serde to use. If {@code null} the
default value serde
+ * from config will be used
+ * @param name the name used as the base for naming components of the join
including any
+ * repartition topics
+ * @param gracePeriod stream buffer time
+ * @param <K> key type
+ * @param <V> value type
+ * @param <VO> other value type
+ * @return new {@code Joined} instance with the provided serdes
+ */
+ public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
+ final Serde<V> valueSerde,
+ final Serde<VO>
otherValueSerde,
+ final String name,
+ final Duration gracePeriod)
{
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
}
/**
@@ -98,7 +130,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the keySerde
*/
public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K>
keySerde) {
- return new Joined<>(keySerde, null, null, null);
+ return new Joined<>(keySerde, null, null, null, null);
}
/**
@@ -112,9 +144,10 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the valueSerde
*/
public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V>
valueSerde) {
- return new Joined<>(null, valueSerde, null, null);
+ return new Joined<>(null, valueSerde, null, null, null);
}
+
/**
* Create an instance of {@code Joined} with an other value {@link Serde}.
* {@code null} values are accepted and will be replaced by the default
value serde as defined in config.
@@ -126,7 +159,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the otherValueSerde
*/
public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO>
otherValueSerde) {
- return new Joined<>(null, null, otherValueSerde, null);
+ return new Joined<>(null, null, otherValueSerde, null, null);
}
/**
@@ -142,10 +175,9 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
*
*/
public static <K, V, VO> Joined<K, V, VO> as(final String name) {
- return new Joined<>(null, null, null, name);
+ return new Joined<>(null, null, null, name, null);
}
-
/**
* Set the key {@link Serde} to be used. Null values are accepted and will
be replaced by the default
* key serde as defined in config
@@ -154,7 +186,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the {@code name}
*/
public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
}
/**
@@ -165,7 +197,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the {@code
valueSerde}
*/
public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
}
/**
@@ -176,7 +208,7 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
* @return new {@code Joined} instance configured with the {@code
valueSerde}
*/
public Joined<K, V, VO> withOtherValueSerde(final Serde<VO>
otherValueSerde) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
}
/**
@@ -189,7 +221,26 @@ public class Joined<K, V, VO> implements
NamedOperation<Joined<K, V, VO>> {
*/
@Override
public Joined<K, V, VO> withName(final String name) {
- return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
+ }
+
+ /**
+ * Set the grace period on the stream side of the join. Records will enter
a buffer before being processed.
+ * Out of order records in the grace period will be processed in timestamp
order. Late records, out of the
+ * grace period, will be executed right as they come in, if it is past the
table history retention this could
+ * result in a null join. Long gaps in stream side arriving records will
cause
+ * records to be delayed in processing.
+ *
+ *
+ * @param gracePeriod the duration of the grace period. Must be less than
the joining table's history retention.
+ * @return new {@code Joined} instance configured with the gracePeriod
+ */
+ public Joined<K, V, VO> withGracePeriod(final Duration gracePeriod) {
+ return new Joined<>(keySerde, valueSerde, otherValueSerde, name,
gracePeriod);
+ }
+
+ public Duration gracePeriod() {
+ return gracePeriod;
}
public Serde<K> keySerde() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
index f9d43a66374..c48920f7396 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import java.util.Optional;
class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> implements
ProcessorSupplier<K1, V1, K1, VOut> {
@@ -40,6 +41,6 @@ class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut>
implements ProcessorSupplier
@Override
public Processor<K1, V1, K1, VOut> get() {
- return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(),
mapper, joiner, leftJoin);
+ return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(),
mapper, joiner, leftJoin, Optional.empty(), Optional.empty());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 29ea20547fb..1540a9091b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -72,8 +72,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
+import
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStore;
+import
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import static
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;
@@ -1256,10 +1261,25 @@ public class KStreamImpl<K, V> extends
AbstractStream<K, V> implements KStream<K
final NamedInternal renamed = new NamedInternal(joinedInternal.name());
final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin
? LEFTJOIN_NAME : JOIN_NAME);
+
+ Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+ if (joined.gracePeriod() != null) {
+ if (!((KTableImpl<K, ?, VO>)
table).graphNode.isOutputVersioned().orElse(true)) {
+ throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
+ }
+ final String bufferStoreName = name + "-Buffer";
+ final RocksDBTimeOrderedKeyValueBytesStore store = new
RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();
+
+ buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store,
joined.gracePeriod(), name));
+ }
+
final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new
KStreamKTableJoin<>(
((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
joiner,
- leftJoin);
+ leftJoin,
+ Optional.ofNullable(joined.gracePeriod()),
+ buffer);
final ProcessorParameters<K, V, ?, ?> processorParameters = new
ProcessorParameters<>(processorSupplier, name);
final StreamTableJoinNode<K, V> streamTableJoinNode = new
StreamTableJoinNode<>(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index f6f7cc87382..6151c74ba56 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -20,6 +20,10 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import java.time.Duration;
+import java.util.Optional;
class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1,
K, VOut> {
@@ -27,18 +31,25 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements
ProcessorSupplier<K, V1, K,
private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut>
joiner;
private final boolean leftJoin;
+ private final Optional<Duration> gracePeriod;
+ private final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer;
+
KStreamKTableJoin(final KTableValueGetterSupplier<K, V2>
valueGetterSupplier,
final ValueJoinerWithKey<? super K, ? super V1, ? super
V2, VOut> joiner,
- final boolean leftJoin) {
+ final boolean leftJoin,
+ final Optional<Duration> gracePeriod,
+ final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>>
buffer) {
this.valueGetterSupplier = valueGetterSupplier;
this.joiner = joiner;
this.leftJoin = leftJoin;
+ this.gracePeriod = gracePeriod;
+ this.buffer = buffer;
}
@Override
public Processor<K, V1, K, VOut> get() {
- return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(),
keyValueMapper, joiner, leftJoin);
+ return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(),
keyValueMapper, joiner, leftJoin, gracePeriod, buffer);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 53e246167d9..788417d8ec9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
@@ -23,11 +24,19 @@ import
org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
+import java.util.Optional;
+
+import static
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
import static
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
@@ -39,15 +48,28 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ?
extends VOut> joiner;
private final boolean leftJoin;
private Sensor droppedRecordsSensor;
+ private final Optional<Duration> gracePeriod;
+ private final Optional<TimeOrderedKeyValueBuffer<K1, V1, V1>> buffer;
+ protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+ private InternalProcessorContext internalProcessorContext;
+ private final boolean useBuffer;
KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
final KeyValueMapper<? super K1, ? super V1, ?
extends K2> keyMapper,
final ValueJoinerWithKey<? super K1, ? super
V1, ? super V2, ? extends VOut> joiner,
- final boolean leftJoin) {
+ final boolean leftJoin,
+ final Optional<Duration> gracePeriod,
+ final Optional<TimeOrderedKeyValueBuffer<K1,
V1, V1>> buffer) {
this.valueGetter = valueGetter;
this.keyMapper = keyMapper;
this.joiner = joiner;
this.leftJoin = leftJoin;
+ this.useBuffer = buffer.isPresent();
+ if (gracePeriod.isPresent() ^ buffer.isPresent()) {
+ throw new IllegalArgumentException("Grace Period requires a
buffer");
+ }
+ this.gracePeriod = gracePeriod;
+ this.buffer = buffer;
}
@Override
@@ -56,10 +78,65 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
final StreamsMetricsImpl metrics = (StreamsMetricsImpl)
context.metrics();
droppedRecordsSensor =
droppedRecordsSensor(Thread.currentThread().getName(),
context.taskId().toString(), metrics);
valueGetter.init(context);
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context);
+ if (useBuffer) {
+ if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+ throw new IllegalArgumentException("KTable must be versioned
to use a grace period in a stream table join.");
+ }
+
+ buffer.get().setSerdesIfNull(new SerdeGetter(context));
+
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext)
context(), null);
+ }
}
@Override
public void process(final Record<K1, V1> record) {
+ internalProcessorContext =
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
context());
+ updateObservedStreamTime(record.timestamp());
+ if (maybeDropRecord(record)) {
+ return;
+ }
+
+ if (!useBuffer) {
+ doJoin(record);
+ } else {
+ if (!buffer.get().put(observedStreamTime, record,
internalProcessorContext.recordContext())) {
+ doJoin(record);
+ } else {
+ buffer.get().evictWhile(() -> true, this::emit);
+ }
+ }
+ }
+
+ private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit)
{
+ final Record<K1, V1> record = new Record<>(toEmit.key(),
toEmit.value(), toEmit.recordContext().timestamp())
+ .withHeaders(toEmit.recordContext().headers());
+ final ProcessorRecordContext prevRecordContext =
internalProcessorContext.recordContext();
+ try {
+ internalProcessorContext.setRecordContext(toEmit.recordContext());
+ doJoin(record);
+ } finally {
+ internalProcessorContext.setRecordContext(prevRecordContext);
+ }
+ }
+
+ protected void updateObservedStreamTime(final long timestamp) {
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void doJoin(final Record<K1, V1> record) {
+ final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+ final ValueAndTimestamp<V2> valueAndTimestamp2 =
valueGetter.isVersioned()
+ ? valueGetter.get(mappedKey, record.timestamp())
+ : valueGetter.get(mappedKey);
+ final V2 value2 = getValueOrNull(valueAndTimestamp2);
+ if (leftJoin || value2 != null) {
+
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(),
record.value(), value2)));
+ }
+ }
+
+ private boolean maybeDropRecord(final Record<K1, V1> record) {
// we do join iff the join keys are equal, thus, if {@code keyMapper}
returns {@code null} we
// cannot join and just ignore the record. Note for KTables, this is
the same as having a null key
// since keyMapper just returns the key, but for GlobalKTables we can
have other keyMappers
@@ -83,15 +160,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut>
extends ContextualProcess
);
}
droppedRecordsSensor.record();
- } else {
- final ValueAndTimestamp<V2> valueAndTimestamp2 =
valueGetter.isVersioned()
- ? valueGetter.get(mappedKey, record.timestamp())
- : valueGetter.get(mappedKey);
- final V2 value2 = getValueOrNull(valueAndTimestamp2);
- if (leftJoin || value2 != null) {
- context().forward(record.withValue(joiner.apply(record.key(),
record.value(), value2)));
- }
+ return true;
}
+ return false;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index 1f76713fae8..5757fd85caf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -477,9 +477,9 @@ public final class
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
}
@Override
- public void put(final long time,
- final Record<K, Change<V>> record,
- final ProcessorRecordContext recordContext) {
+ public boolean put(final long time,
+ final Record<K, Change<V>> record,
+ final ProcessorRecordContext recordContext) {
requireNonNull(record.value(), "value cannot be null");
requireNonNull(recordContext, "recordContext cannot be null");
@@ -503,6 +503,7 @@ public final class
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
dirtyKeys.add(serializedKey);
}
updateBufferMetrics();
+ return true;
}
private BufferValue getBuffered(final Bytes key) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 04aa3a56ef9..fb62e3d3986 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -122,12 +122,12 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V>
extends WrappedStateStore<Ro
}
@Override
- public void put(final long time, final Record<K, V> record, final
ProcessorRecordContext recordContext) {
+ public boolean put(final long time, final Record<K, V> record, final
ProcessorRecordContext recordContext) {
requireNonNull(record.value(), "value cannot be null");
requireNonNull(record.key(), "key cannot be null");
requireNonNull(recordContext, "recordContext cannot be null");
if (wrapped().observedStreamTime - gracePeriod > record.timestamp()) {
- return;
+ return false;
}
maybeUpdateSeqnumForDups();
final Bytes serializedKey = Bytes.wrap(
@@ -142,6 +142,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends
WrappedStateStore<Ro
if (minTimestamp() > record.timestamp()) {
minTimestamp = record.timestamp();
}
+ return true;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
index 3ce6e4358a6..fb8655b69ac 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
@@ -18,12 +18,9 @@ package org.apache.kafka.streams.state.internals;
public class RocksDBTimeOrderedKeyValueBytesStoreSupplier {
private final String name;
- private final long retentionPeriod;
-
- public RocksDBTimeOrderedKeyValueBytesStoreSupplier(final String name,
- final long
retentionPeriod) {
+
+ public RocksDBTimeOrderedKeyValueBytesStoreSupplier(final String name) {
this.name = name;
- this.retentionPeriod = retentionPeriod;
}
public String name() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index d11d7a37bb6..c66e7a2ea47 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -89,7 +89,7 @@ public interface TimeOrderedKeyValueBuffer<K, V, T> extends
StateStore {
Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
- void put(long time, Record<K, T> record, ProcessorRecordContext
recordContext);
+ boolean put(long time, Record<K, T> record, ProcessorRecordContext
recordContext);
int numRecords();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
new file mode 100644
index 00000000000..cf859e46ba4
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends
AbstractJoinIntegrationTest {
+
+ private static final String STORE_NAME = "table-store";
+
+ @Rule
+ public Timeout globalTimeout = Timeout.seconds(600);
+ private KStream<Long, String> leftStream;
+ private KTable<Long, String> rightTable;
+ private Joined<Long, String, String> joined;
+
+ public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled)
{
+ super(cacheEnabled);
+ }
+
+ @Before
+ public void prepareTopology() throws InterruptedException {
+ super.prepareEnvironment();
+ appID = "stream-table-join-integration-test";
+ builder = new StreamsBuilder();
+ joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(),
"Grace", Duration.ofMillis(2));
+ }
+
+ @Test
+ public void testInnerWithVersionedStore() {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-inner");
+
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
+ leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC);
+
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b",
null, 6L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a",
null, 4L)),
+ null
+ );
+
+ runTestWithDriver(input, expectedResult);
+ }
+
+ @Test
+ public void testLeftWithVersionedStore() {
+ STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID +
"-left");
+
+ leftStream = builder.stream(INPUT_TOPIC_LEFT);
+ rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+ Stores.persistentVersionedKeyValueStore(STORE_NAME,
Duration.ofMinutes(5))));
+ leftStream.leftJoin(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC);
+
+ final List<List<TestRecord<Long, String>>> expectedResult =
Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"A-null", null, 3L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a",
null, 5L)),
+ null,
+ null,
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b",
null, 6L)),
+ null,
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY,
"C-null", null, 9L)),
+ null,
+ null,
+ Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a",
null, 4L)),
+ null
+ );
+
+ runTestWithDriver(input, expectedResult);
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index dbbe56d4313..a62f39b1885 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -18,7 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import java.time.Duration;
import java.time.Instant;
@@ -44,6 +46,8 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
@@ -96,6 +100,15 @@ public class KStreamKTableJoinTest {
}
}
+ private void pushToTableNonRandom(final int messageCount, final String
valuePrefix) {
+ for (int i = 0; i < messageCount; i++) {
+ inputTableTopic.pipeInput(
+ expectedKeys[i],
+ valuePrefix + expectedKeys[i],
+ 0);
+ }
+ }
+
private void pushToTable(final int messageCount, final String valuePrefix)
{
final Random r = new Random(System.currentTimeMillis());
for (int i = 0; i < messageCount; i++) {
@@ -112,6 +125,103 @@ public class KStreamKTableJoinTest {
}
}
+
+ private void makeJoin(final Duration grace) {
+ final KStream<Integer, String> stream;
+ final KTable<Integer, String> table;
+ final MockApiProcessorSupplier<Integer, String, Void, Void> supplier =
new MockApiProcessorSupplier<>();
+ builder = new StreamsBuilder();
+
+ final Consumed<Integer, String> consumed =
Consumed.with(Serdes.Integer(), Serdes.String());
+ stream = builder.stream(streamTopic, consumed);
+ table = builder.table("tableTopic2", consumed, Materialized.as(
+ Stores.persistentVersionedKeyValueStore("V-grace",
Duration.ofMinutes(5))));
+ stream.join(table,
+ MockValueJoiner.TOSTRING_JOINER,
+ Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(),
"Grace", grace)
+ ).process(supplier);
+ final Properties props =
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+ driver = new TopologyTestDriver(builder.build(), props);
+ inputStreamTopic = driver.createInputTopic(streamTopic, new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
+ inputTableTopic = driver.createInputTopic("tableTopic2", new
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L),
Duration.ZERO);
+
+ processor = supplier.theCapturedProcessor();
+ }
+
+ @Test
+ public void shouldFailIfTableIsNotVersioned() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> streamA = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KTable<String, String> tableB = builder.table("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
+
+ final IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class,
+ () -> streamA.join(tableB, (value1, value2) -> value1 + value2,
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join",
Duration.ofMillis(6))).to("out-one"));
+ assertThat(
+ exception.getMessage(),
+ is("KTable must be versioned to use a grace period in a stream
table join.")
+ );
+ }
+
+ @Test
+ public void shouldDelayJoinByGracePeriod() {
+ makeJoin(Duration.ofMillis(2));
+
+ // push four items to the table. this should not produce any item.
+ pushToTableNonRandom(4, "Y");
+ processor.checkAndClearProcessResult(EMPTY);
+
+ // push all four items to the primary stream. this should produce two
items.
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+ // push all items to the table. this should not produce any item
+ pushToTableNonRandom(4, "YY");
+ processor.checkAndClearProcessResult(EMPTY);
+
+ // push all four items to the primary stream. this should produce two
items.
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+YY0", 0),
+ new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+ inputStreamTopic.pipeInput(5, "test", 7);
+
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(2, "X2+YY2", 2),
+ new KeyValueTimestamp<>(2, "X2+YY2", 2),
+ new KeyValueTimestamp<>(3, "X3+YY3", 3),
+ new KeyValueTimestamp<>(3, "X3+YY3", 3));
+
+
+ // push all items to the table. this should not produce any item
+ pushToTableNonRandom(4, "YYY");
+ processor.checkAndClearProcessResult(EMPTY);
+ }
+
+ @Test
+ public void shouldHandleLateJoinsWithGracePeriod() {
+ makeJoin(Duration.ofMillis(2));
+
+ // push four items to the table. this should not produce any item.
+ pushToTableNonRandom(4, "Y");
+ processor.checkAndClearProcessResult(EMPTY);
+
+ // push 4 records into the buffer and evict the first two
+ pushToStream(4, "X");
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0),
+ new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+ //should be processed immediately and not evict any other records
+ pushToStream(1, "X");
+ processor.checkAndClearProcessResult(
+ new KeyValueTimestamp<>(0, "X0+Y0", 0));
+ }
+
@Test
public void shouldReuseRepartitionTopicWithGeneratedName() {
final StreamsBuilder builder = new StreamsBuilder();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 9c15b4c5c17..3ade7c5e59e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -66,16 +66,24 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
}
private void createBuffer(final Duration grace) {
- final RocksDBTimeOrderedKeyValueBytesStore store = new
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing", 100).get();
+ final RocksDBTimeOrderedKeyValueBytesStore store = new
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
+
buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace,
"testing");
buffer.setSerdesIfNull(serdeGetter);
buffer.init((StateStoreContext) context, store);
}
- private void pipeRecord(final String key, final String value, final long
time) {
+ private boolean pipeRecord(final String key, final String value, final
long time) {
final Record<String, String> record = new Record<>(key, value, time);
context.setRecordContext(new ProcessorRecordContext(time, offset++, 0,
"testing", new RecordHeaders()));
- buffer.put(time, record, context.recordContext());
+ return buffer.put(time, record, context.recordContext());
+ }
+
+ @Test
+ public void shouldReturnIfRecordWasAdded() {
+ createBuffer(Duration.ofMillis(1));
+ assertThat(pipeRecord("K", "V", 2L), equalTo(true));
+ assertThat(pipeRecord("K", "V", 0L), equalTo(false));
}
@Test