This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 12be344fdd3 KAFKA-14936: Add Grace period logic to Stream Table Join 
(2/N) (#13855)
12be344fdd3 is described below

commit 12be344fdd3b20f338ccab87933b89049ce202a4
Author: Walker Carlson <[email protected]>
AuthorDate: Thu Jun 29 07:14:04 2023 -0500

    KAFKA-14936: Add Grace period logic to Stream Table Join (2/N) (#13855)
    
    This PR adds the interface for grace period to the Joined object as well as 
uses the buffer. The majority of it is tests and moving some of the existing 
join logic.
    
    Reviewers: Victoria Xia <[email protected]>, Bruno Cadonna 
<[email protected]>
---
 .../org/apache/kafka/streams/kstream/Joined.java   |  77 +++++++++--
 .../kstream/internals/KStreamGlobalKTableJoin.java |   3 +-
 .../streams/kstream/internals/KStreamImpl.java     |  22 +++-
 .../kstream/internals/KStreamKTableJoin.java       |  15 ++-
 .../internals/KStreamKTableJoinProcessor.java      |  89 +++++++++++--
 .../InMemoryTimeOrderedKeyValueChangeBuffer.java   |   7 +-
 .../RocksDBTimeOrderedKeyValueBuffer.java          |   5 +-
 ...cksDBTimeOrderedKeyValueBytesStoreSupplier.java |   7 +-
 .../state/internals/TimeOrderedKeyValueBuffer.java |   2 +-
 .../StreamTableJoinWithGraceIntegrationTest.java   | 144 +++++++++++++++++++++
 .../kstream/internals/KStreamKTableJoinTest.java   | 110 ++++++++++++++++
 .../RocksDBTimeOrderedKeyValueBufferTest.java      |  14 +-
 12 files changed, 455 insertions(+), 40 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
index a2793afce16..55dff2428f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.serialization.Serde;
 
+import java.time.Duration;
+
 /**
  * The {@code Joined} class represents optional params that can be passed to
  * {@link KStream#join(KTable, ValueJoiner, Joined) KStream#join(KTable,...)} 
and
@@ -29,19 +31,22 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
     protected final Serde<V> valueSerde;
     protected final Serde<VO> otherValueSerde;
     protected final String name;
+    protected final Duration gracePeriod;
 
     private Joined(final Serde<K> keySerde,
                    final Serde<V> valueSerde,
                    final Serde<VO> otherValueSerde,
-                   final String name) {
+                   final String name,
+                   final Duration gracePeriod) {
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.otherValueSerde = otherValueSerde;
         this.name = name;
+        this.gracePeriod = gracePeriod;
     }
 
     protected Joined(final Joined<K, V, VO> joined) {
-        this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, 
joined.name);
+        this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, 
joined.name, joined.gracePeriod);
     }
 
     /**
@@ -59,7 +64,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
     public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
                                                    final Serde<V> valueSerde,
                                                    final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, null);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, null, null);
     }
 
     /**
@@ -84,7 +89,34 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
                                                    final Serde<V> valueSerde,
                                                    final Serde<VO> 
otherValueSerde,
                                                    final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with key, value, and otherValue 
{@link Serde} instances.
+     * {@code null} values are accepted and will be replaced by the default 
serdes as defined in
+     * config.
+     *
+     * @param keySerde the key serde to use. If {@code null} the default key 
serde from config will be
+     * used
+     * @param valueSerde the value serde to use. If {@code null} the default 
value serde from config
+     * will be used
+     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde
+     * from config will be used
+     * @param name the name used as the base for naming components of the join 
including any
+     * repartition topics
+     * @param gracePeriod stream buffer time
+     * @param <K> key type
+     * @param <V> value type
+     * @param <VO> other value type
+     * @return new {@code Joined} instance with the provided serdes
+     */
+    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
+                                                   final Serde<V> valueSerde,
+                                                   final Serde<VO> 
otherValueSerde,
+                                                   final String name,
+                                                   final Duration gracePeriod) 
{
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
     }
 
     /**
@@ -98,7 +130,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the keySerde
      */
     public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> 
keySerde) {
-        return new Joined<>(keySerde, null, null, null);
+        return new Joined<>(keySerde, null, null, null, null);
     }
 
     /**
@@ -112,9 +144,10 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the valueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> 
valueSerde) {
-        return new Joined<>(null, valueSerde, null, null);
+        return new Joined<>(null, valueSerde, null, null, null);
     }
 
+
     /**
      * Create an instance of {@code Joined} with an other value {@link Serde}.
      * {@code null} values are accepted and will be replaced by the default 
value serde as defined in config.
@@ -126,7 +159,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the otherValueSerde
      */
     public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(null, null, otherValueSerde, null);
+        return new Joined<>(null, null, otherValueSerde, null, null);
     }
 
     /**
@@ -142,10 +175,9 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      *
      */
     public static <K, V, VO> Joined<K, V, VO> as(final String name) {
-        return new Joined<>(null, null, null, name);
+        return new Joined<>(null, null, null, name, null);
     }
 
-
     /**
      * Set the key {@link Serde} to be used. Null values are accepted and will 
be replaced by the default
      * key serde as defined in config
@@ -154,7 +186,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the {@code name}
      */
     public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
     }
 
     /**
@@ -165,7 +197,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the {@code 
valueSerde}
      */
     public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
     }
 
     /**
@@ -176,7 +208,7 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      * @return new {@code Joined} instance configured with the {@code 
valueSerde}
      */
     public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> 
otherValueSerde) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
     }
 
     /**
@@ -189,7 +221,26 @@ public class Joined<K, V, VO> implements 
NamedOperation<Joined<K, V, VO>> {
      */
     @Override
     public Joined<K, V, VO> withName(final String name) {
-        return new Joined<>(keySerde, valueSerde, otherValueSerde, name);
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    }
+
+    /**
+     * Set the grace period on the stream side of the join. Records will enter 
a buffer before being processed.
+     * Out of order records in the grace period will be processed in timestamp 
order. Late records, out of the
+     * grace period, will be executed right as they come in, if it is past the 
table history retention this could
+     * result in a null join. Long gaps in stream side arriving records will 
cause
+     * records to be delayed in processing.
+     *
+     *
+     * @param gracePeriod the duration of the grace period. Must be less than 
the joining table's history retention.
+     * @return new {@code Joined} instance configured with the gracePeriod
+     */
+    public Joined<K, V, VO> withGracePeriod(final Duration gracePeriod) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde, name, 
gracePeriod);
+    }
+
+    public Duration gracePeriod() {
+        return gracePeriod;
     }
 
     public Serde<K> keySerde() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
index f9d43a66374..c48920f7396 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoin.java
@@ -20,6 +20,7 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import java.util.Optional;
 
 class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> implements 
ProcessorSupplier<K1, V1, K1, VOut> {
 
@@ -40,6 +41,6 @@ class KStreamGlobalKTableJoin<K1, V1, K2, V2, VOut> 
implements ProcessorSupplier
 
     @Override
     public Processor<K1, V1, K1, VOut> get() {
-        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), 
mapper, joiner, leftJoin);
+        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), 
mapper, joiner, leftJoin, Optional.empty(), Optional.empty());
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 29ea20547fb..1540a9091b8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -72,8 +72,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+import 
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;
+import 
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStore;
+import 
org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 
 import static 
org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;
 
@@ -1256,10 +1261,25 @@ public class KStreamImpl<K, V> extends 
AbstractStream<K, V> implements KStream<K
         final NamedInternal renamed = new NamedInternal(joinedInternal.name());
 
         final String name = renamed.orElseGenerateWithPrefix(builder, leftJoin 
? LEFTJOIN_NAME : JOIN_NAME);
+
+        Optional<TimeOrderedKeyValueBuffer<K, V, V>> buffer = Optional.empty();
+
+        if (joined.gracePeriod() != null) {
+            if (!((KTableImpl<K, ?, VO>) 
table).graphNode.isOutputVersioned().orElse(true)) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+            final String bufferStoreName = name + "-Buffer";
+            final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier(bufferStoreName).get();
+
+            buffer = Optional.of(new RocksDBTimeOrderedKeyValueBuffer<>(store, 
joined.gracePeriod(), name));
+        }
+
         final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new 
KStreamKTableJoin<>(
             ((KTableImpl<K, ?, VO>) table).valueGetterSupplier(),
             joiner,
-            leftJoin);
+            leftJoin,
+            Optional.ofNullable(joined.gracePeriod()),
+            buffer);
 
         final ProcessorParameters<K, V, ?, ?> processorParameters = new 
ProcessorParameters<>(processorSupplier, name);
         final StreamTableJoinNode<K, V> streamTableJoinNode = new 
StreamTableJoinNode<>(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
index f6f7cc87382..6151c74ba56 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java
@@ -20,6 +20,10 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
+
+import java.time.Duration;
+import java.util.Optional;
 
 class KStreamKTableJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, 
K, VOut> {
 
@@ -27,18 +31,25 @@ class KStreamKTableJoin<K, V1, V2, VOut> implements 
ProcessorSupplier<K, V1, K,
     private final KTableValueGetterSupplier<K, V2> valueGetterSupplier;
     private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, VOut> 
joiner;
     private final boolean leftJoin;
+    private final Optional<Duration> gracePeriod;
+    private final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> buffer;
+
 
     KStreamKTableJoin(final KTableValueGetterSupplier<K, V2> 
valueGetterSupplier,
                       final ValueJoinerWithKey<? super K, ? super V1, ? super 
V2, VOut> joiner,
-                      final boolean leftJoin) {
+                      final boolean leftJoin,
+                      final Optional<Duration> gracePeriod,
+                      final Optional<TimeOrderedKeyValueBuffer<K, V1, V1>> 
buffer) {
         this.valueGetterSupplier = valueGetterSupplier;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
+        this.gracePeriod = gracePeriod;
+        this.buffer = buffer;
     }
 
     @Override
     public Processor<K, V1, K, VOut> get() {
-        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), 
keyValueMapper, joiner, leftJoin);
+        return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), 
keyValueMapper, joiner, leftJoin, gracePeriod, buffer);
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 53e246167d9..788417d8ec9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
@@ -23,11 +24,19 @@ import 
org.apache.kafka.streams.processor.api.ContextualProcessor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Duration;
+import java.util.Optional;
+
+import static 
org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;
 import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
 import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
 
@@ -39,15 +48,28 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
     private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? 
extends VOut> joiner;
     private final boolean leftJoin;
     private Sensor droppedRecordsSensor;
+    private final Optional<Duration> gracePeriod;
+    private final Optional<TimeOrderedKeyValueBuffer<K1, V1, V1>> buffer;
+    protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+    private InternalProcessorContext internalProcessorContext;
+    private final boolean useBuffer;
 
     KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
                                final KeyValueMapper<? super K1, ? super V1, ? 
extends K2> keyMapper,
                                final ValueJoinerWithKey<? super K1, ? super 
V1, ? super V2, ? extends VOut> joiner,
-                               final boolean leftJoin) {
+                               final boolean leftJoin,
+                               final Optional<Duration> gracePeriod,
+                               final Optional<TimeOrderedKeyValueBuffer<K1, 
V1, V1>> buffer) {
         this.valueGetter = valueGetter;
         this.keyMapper = keyMapper;
         this.joiner = joiner;
         this.leftJoin = leftJoin;
+        this.useBuffer = buffer.isPresent();
+        if (gracePeriod.isPresent() ^ buffer.isPresent()) {
+            throw new IllegalArgumentException("Grace Period requires a 
buffer");
+        }
+        this.gracePeriod = gracePeriod;
+        this.buffer = buffer;
     }
 
     @Override
@@ -56,10 +78,65 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
         final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
         droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
         valueGetter.init(context);
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context);
+        if (useBuffer) {
+            if (!valueGetter.isVersioned() && gracePeriod.isPresent()) {
+                throw new IllegalArgumentException("KTable must be versioned 
to use a grace period in a stream table join.");
+            }
+
+            buffer.get().setSerdesIfNull(new SerdeGetter(context));
+            
buffer.get().init((org.apache.kafka.streams.processor.StateStoreContext) 
context(), null);
+        }
     }
 
     @Override
     public void process(final Record<K1, V1> record) {
+        internalProcessorContext = 
asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext)
 context());
+        updateObservedStreamTime(record.timestamp());
+        if (maybeDropRecord(record)) {
+            return;
+        }
+
+        if (!useBuffer) {
+            doJoin(record);
+        } else {
+            if (!buffer.get().put(observedStreamTime, record, 
internalProcessorContext.recordContext())) {
+                doJoin(record);
+            } else {
+                buffer.get().evictWhile(() -> true, this::emit);
+            }
+        }
+    }
+
+    private void emit(final TimeOrderedKeyValueBuffer.Eviction<K1, V1> toEmit) 
{
+        final Record<K1, V1> record = new Record<>(toEmit.key(), 
toEmit.value(), toEmit.recordContext().timestamp())
+            .withHeaders(toEmit.recordContext().headers());
+        final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
+        try {
+            internalProcessorContext.setRecordContext(toEmit.recordContext());
+            doJoin(record);
+        } finally {
+            internalProcessorContext.setRecordContext(prevRecordContext);
+        }
+    }
+
+    protected void updateObservedStreamTime(final long timestamp) {
+        observedStreamTime = Math.max(observedStreamTime, timestamp);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void doJoin(final Record<K1, V1> record) {
+        final K2 mappedKey = keyMapper.apply(record.key(), record.value());
+        final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter.isVersioned()
+            ? valueGetter.get(mappedKey, record.timestamp())
+            : valueGetter.get(mappedKey);
+        final V2 value2 = getValueOrNull(valueAndTimestamp2);
+        if (leftJoin || value2 != null) {
+            
internalProcessorContext.forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
+        }
+    }
+
+    private boolean maybeDropRecord(final Record<K1, V1> record) {
         // we do join iff the join keys are equal, thus, if {@code keyMapper} 
returns {@code null} we
         // cannot join and just ignore the record. Note for KTables, this is 
the same as having a null key
         // since keyMapper just returns the key, but for GlobalKTables we can 
have other keyMappers
@@ -83,15 +160,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> 
extends ContextualProcess
                 );
             }
             droppedRecordsSensor.record();
-        } else {
-            final ValueAndTimestamp<V2> valueAndTimestamp2 = 
valueGetter.isVersioned()
-                ? valueGetter.get(mappedKey, record.timestamp())
-                : valueGetter.get(mappedKey);
-            final V2 value2 = getValueOrNull(valueAndTimestamp2);
-            if (leftJoin || value2 != null) {
-                context().forward(record.withValue(joiner.apply(record.key(), 
record.value(), value2)));
-            }
+            return true;
         }
+        return false;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
index 1f76713fae8..5757fd85caf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java
@@ -477,9 +477,9 @@ public final class 
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
     }
 
     @Override
-    public void put(final long time,
-                    final Record<K, Change<V>> record,
-                    final ProcessorRecordContext recordContext) {
+    public boolean put(final long time,
+                       final Record<K, Change<V>> record,
+                       final ProcessorRecordContext recordContext) {
         requireNonNull(record.value(), "value cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
 
@@ -503,6 +503,7 @@ public final class 
InMemoryTimeOrderedKeyValueChangeBuffer<K, V, T> implements T
             dirtyKeys.add(serializedKey);
         }
         updateBufferMetrics();
+        return true;
     }
 
     private BufferValue getBuffered(final Bytes key) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
index 04aa3a56ef9..fb62e3d3986 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBuffer.java
@@ -122,12 +122,12 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> 
extends WrappedStateStore<Ro
     }
 
     @Override
-    public void put(final long time, final Record<K, V> record, final 
ProcessorRecordContext recordContext) {
+    public boolean put(final long time, final Record<K, V> record, final 
ProcessorRecordContext recordContext) {
         requireNonNull(record.value(), "value cannot be null");
         requireNonNull(record.key(), "key cannot be null");
         requireNonNull(recordContext, "recordContext cannot be null");
         if (wrapped().observedStreamTime - gracePeriod > record.timestamp()) {
-            return;
+            return false;
         }
         maybeUpdateSeqnumForDups();
         final Bytes serializedKey = Bytes.wrap(
@@ -142,6 +142,7 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> extends 
WrappedStateStore<Ro
         if (minTimestamp() > record.timestamp()) {
             minTimestamp = record.timestamp();
         }
+        return true;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
index 3ce6e4358a6..fb8655b69ac 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBytesStoreSupplier.java
@@ -18,12 +18,9 @@ package org.apache.kafka.streams.state.internals;
 
 public class RocksDBTimeOrderedKeyValueBytesStoreSupplier {
     private final String name;
-    private final long retentionPeriod;
-
-    public RocksDBTimeOrderedKeyValueBytesStoreSupplier(final String name,
-                                                        final long 
retentionPeriod) {
+  
+    public RocksDBTimeOrderedKeyValueBytesStoreSupplier(final String name) {
         this.name = name;
-        this.retentionPeriod = retentionPeriod;
     }
 
     public String name() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
index d11d7a37bb6..c66e7a2ea47 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java
@@ -89,7 +89,7 @@ public interface TimeOrderedKeyValueBuffer<K, V, T> extends 
StateStore {
 
     Maybe<ValueAndTimestamp<V>> priorValueForBuffered(K key);
 
-    void put(long time, Record<K, T> record, ProcessorRecordContext 
recordContext);
+    boolean put(long time, Record<K, T> record, ProcessorRecordContext 
recordContext);
 
     int numRecords();
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
new file mode 100644
index 00000000000..cf859e46ba4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinWithGraceIntegrationTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.TestRecord;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamTableJoinWithGraceIntegrationTest extends 
AbstractJoinIntegrationTest {
+
+    private static final String STORE_NAME = "table-store";
+
+    @Rule
+    public Timeout globalTimeout = Timeout.seconds(600);
+    private KStream<Long, String> leftStream;
+    private KTable<Long, String> rightTable;
+    private Joined<Long, String, String> joined;
+
+    public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) 
{
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+        appID = "stream-table-join-integration-test";
+        builder = new StreamsBuilder();
+        joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), 
"Grace", Duration.ofMillis(2));
+    }
+    
+    @Test
+    public void testInnerWithVersionedStore() {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
+
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+            Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
+        leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC);
+
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b", 
null,  6L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a", 
null,  4L)),
+            null
+        );
+
+        runTestWithDriver(input, expectedResult);
+    }
+
+    @Test
+    public void testLeftWithVersionedStore() {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-left");
+
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
+            Stores.persistentVersionedKeyValueStore(STORE_NAME, 
Duration.ofMinutes(5))));
+        leftStream.leftJoin(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC);
+
+        final List<List<TestRecord<Long, String>>> expectedResult = 
Arrays.asList(
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"A-null", null, 3L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", 
null, 5L)),
+            null,
+            null,
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-b", 
null, 6L)),
+            null,
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, 
"C-null", null, 9L)),
+            null,
+            null,
+            Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-a", 
null,  4L)),
+            null
+        );
+
+        runTestWithDriver(input, expectedResult);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index dbbe56d4313..a62f39b1885 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -18,7 +18,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 import java.time.Duration;
 import java.time.Instant;
@@ -44,6 +46,8 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.MockApiProcessor;
 import org.apache.kafka.test.MockApiProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -96,6 +100,15 @@ public class KStreamKTableJoinTest {
         }
     }
 
+    private void pushToTableNonRandom(final int messageCount, final String 
valuePrefix) {
+        for (int i = 0; i < messageCount; i++) {
+            inputTableTopic.pipeInput(
+                expectedKeys[i],
+                valuePrefix + expectedKeys[i],
+                0);
+        }
+    }
+
     private void pushToTable(final int messageCount, final String valuePrefix) 
{
         final Random r = new Random(System.currentTimeMillis());
         for (int i = 0; i < messageCount; i++) {
@@ -112,6 +125,103 @@ public class KStreamKTableJoinTest {
         }
     }
 
+
+    private void makeJoin(final Duration grace) {
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockApiProcessorSupplier<Integer, String, Void, Void> supplier = 
new MockApiProcessorSupplier<>();
+        builder = new StreamsBuilder();
+
+        final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+        stream = builder.stream(streamTopic, consumed);
+        table = builder.table("tableTopic2", consumed, Materialized.as(
+            Stores.persistentVersionedKeyValueStore("V-grace", 
Duration.ofMinutes(5))));
+        stream.join(table,
+            MockValueJoiner.TOSTRING_JOINER,
+            Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String(), 
"Grace", grace)
+        ).process(supplier);
+        final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
+        driver = new TopologyTestDriver(builder.build(), props);
+        inputStreamTopic = driver.createInputTopic(streamTopic, new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+        inputTableTopic = driver.createInputTopic("tableTopic2", new 
IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), 
Duration.ZERO);
+
+        processor = supplier.theCapturedProcessor();
+    }
+
+    @Test
+    public void shouldFailIfTableIsNotVersioned() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> tableB = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class,
+            () -> streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one"));
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldDelayJoinByGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YY");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push all four items to the primary stream. this should produce two 
items.
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+YY0", 0),
+            new KeyValueTimestamp<>(1, "X1+YY1", 1));
+
+        inputStreamTopic.pipeInput(5, "test", 7);
+
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(2, "X2+YY2", 2),
+            new KeyValueTimestamp<>(2, "X2+YY2", 2),
+            new KeyValueTimestamp<>(3, "X3+YY3", 3),
+            new KeyValueTimestamp<>(3, "X3+YY3", 3));
+
+
+        // push all items to the table. this should not produce any item
+        pushToTableNonRandom(4, "YYY");
+        processor.checkAndClearProcessResult(EMPTY);
+    }
+
+    @Test
+    public void shouldHandleLateJoinsWithGracePeriod() {
+        makeJoin(Duration.ofMillis(2));
+
+        // push four items to the table. this should not produce any item.
+        pushToTableNonRandom(4, "Y");
+        processor.checkAndClearProcessResult(EMPTY);
+
+        // push 4 records into the buffer and evict the first two
+        pushToStream(4, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1));
+
+        //should be processed immediately and not evict any other records
+        pushToStream(1, "X");
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0));
+    }
+
     @Test
     public void shouldReuseRepartitionTopicWithGeneratedName() {
         final StreamsBuilder builder = new StreamsBuilder();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
index 9c15b4c5c17..3ade7c5e59e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedKeyValueBufferTest.java
@@ -66,16 +66,24 @@ public class RocksDBTimeOrderedKeyValueBufferTest {
     }
 
     private void createBuffer(final Duration grace) {
-        final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing",  100).get();
+        final RocksDBTimeOrderedKeyValueBytesStore store = new 
RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();
+
         buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, 
"testing");
         buffer.setSerdesIfNull(serdeGetter);
         buffer.init((StateStoreContext) context, store);
     }
 
-    private void pipeRecord(final String key, final String value, final long 
time) {
+    private boolean pipeRecord(final String key, final String value, final 
long time) {
         final Record<String, String> record = new Record<>(key, value, time);
         context.setRecordContext(new ProcessorRecordContext(time, offset++, 0, 
"testing", new RecordHeaders()));
-        buffer.put(time, record, context.recordContext());
+        return buffer.put(time, record, context.recordContext());
+    }
+
+    @Test
+    public void shouldReturnIfRecordWasAdded() {
+        createBuffer(Duration.ofMillis(1));
+        assertThat(pipeRecord("K", "V", 2L), equalTo(true));
+        assertThat(pipeRecord("K", "V", 0L), equalTo(false));
     }
 
     @Test


Reply via email to