This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new 754365a032a KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592) 754365a032a is described below commit 754365a032adf96e42603d0c18e433d2efa73549 Author: Matthias J. Sax <matth...@confluent.io> AuthorDate: Mon Apr 24 12:40:25 2023 -0700 KAFKA-14862: Outer stream-stream join does not output all results with multiple input partitions (#13592) Stream-stream outer join, uses a "shared time tracker" to track stream-time progress for left and right input in a single place. This time tracker is incorrectly shared across tasks. This PR introduces a supplier to create a "shared time tracker" object per task, to be shared between the left and right join processors. Reviewers: Victoria Xia <victoria....@confluent.io>, Bruno Cadonna <br...@confluent.io>, Walker Carlson <wcarl...@confluent.io> --- .../streams/kstream/internals/KStreamImplJoin.java | 21 ++- .../kstream/internals/KStreamKStreamJoin.java | 15 +- .../kstream/internals/KStreamKStreamSelfJoin.java | 9 +- .../integration/KStreamKStreamIntegrationTest.java | 185 +++++++++++++++++++++ 4 files changed, 216 insertions(+), 14 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java index b2405db7317..f4ad9ac682f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StreamStreamJoinNode; import org.apache.kafka.streams.kstream.internals.graph.WindowedStreamProcessorNode; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide; import org.apache.kafka.streams.state.StoreBuilder; @@ -43,6 +44,7 @@ import org.apache.kafka.streams.state.internals.LeftOrRightValueSerde; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Optional; @@ -57,6 +59,18 @@ class KStreamImplJoin { private final boolean leftOuter; private final boolean rightOuter; + static class TimeTrackerSupplier { + private final Map<TaskId, TimeTracker> tracker = new HashMap<>(); + + public TimeTracker get(final TaskId taskId) { + return tracker.computeIfAbsent(taskId, taskId1 -> new TimeTracker()); + } + + public void remove(final TaskId taskId) { + tracker.remove(taskId); + } + } + static class TimeTracker { private long emitIntervalMs = 1000L; long streamTime = ConsumerRecord.NO_TIMESTAMP; @@ -159,7 +173,7 @@ class KStreamImplJoin { } // Time-shared between joins to keep track of the maximum stream time - final TimeTracker sharedTimeTracker = new TimeTracker(); + final TimeTrackerSupplier sharedTimeTrackerSupplier = new TimeTrackerSupplier(); final JoinWindowsInternal internalWindows = new JoinWindowsInternal(windows); final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new KStreamKStreamJoin<>( @@ -169,7 +183,7 @@ class KStreamImplJoin { joiner, leftOuter, outerJoinWindowStore.map(StoreBuilder::name), - sharedTimeTracker + sharedTimeTrackerSupplier ); final KStreamKStreamJoin<K, V2, V1, VOut> joinOther = new KStreamKStreamJoin<>( @@ -179,14 +193,13 @@ class KStreamImplJoin { AbstractStream.reverseJoinerWithKey(joiner), rightOuter, outerJoinWindowStore.map(StoreBuilder::name), - sharedTimeTracker + sharedTimeTrackerSupplier ); final KStreamKStreamSelfJoin<K, V1, V2, VOut> selfJoin = new KStreamKStreamSelfJoin<>( thisWindowStore.name(), internalWindows, joiner, - sharedTimeTracker, windows.size() + windows.gracePeriodMs() ); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index 397a86d24ad..067dd50f0cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.ValueJoinerWithKey; import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker; +import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTrackerSupplier; import org.apache.kafka.streams.processor.api.ContextualProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -57,7 +58,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, private final Optional<String> outerJoinWindowName; private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner; - private final TimeTracker sharedTimeTracker; + private final TimeTrackerSupplier sharedTimeTrackerSupplier; KStreamKStreamJoin(final boolean isLeftSide, final String otherWindowName, @@ -65,7 +66,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner, final boolean outer, final Optional<String> outerJoinWindowName, - final TimeTracker sharedTimeTracker) { + final TimeTrackerSupplier sharedTimeTrackerSupplier) { this.isLeftSide = isLeftSide; this.otherWindowName = otherWindowName; if (isLeftSide) { @@ -82,7 +83,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, this.joiner = joiner; this.outer = outer; this.outerJoinWindowName = outerJoinWindowName; - this.sharedTimeTracker = sharedTimeTracker; + this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier; } @Override @@ -95,6 +96,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, private Sensor droppedRecordsSensor; private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore = Optional.empty(); private InternalProcessorContext<K, VOut> internalProcessorContext; + private TimeTracker sharedTimeTracker; @Override public void init(final ProcessorContext<K, VOut> context) { @@ -104,6 +106,7 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, final StreamsMetricsImpl metrics = (StreamsMetricsImpl) context.metrics(); droppedRecordsSensor = droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics); otherWindowStore = context.getStateStore(otherWindowName); + sharedTimeTracker = sharedTimeTrackerSupplier.get(context.taskId()); if (enableSpuriousResultFix) { outerJoinStore = outerJoinWindowName.map(context::getStateStore); @@ -124,7 +127,6 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { return; } - boolean needOuterJoin = outer; final long inputRecordTimestamp = record.timestamp(); @@ -262,5 +264,10 @@ class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, } } } + + @Override + public void close() { + sharedTimeTrackerSupplier.remove(context().taskId()); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java index a6af2a4e082..64d8b45c039 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java @@ -44,13 +44,10 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1 private final long retentionPeriod; private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis; - private final TimeTracker sharedTimeTracker; - KStreamKStreamSelfJoin( final String windowName, final JoinWindowsInternal windows, final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joinerThis, - final TimeTracker sharedTimeTracker, final long retentionPeriod) { this.windowName = windowName; @@ -59,7 +56,6 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1 this.joinOtherBeforeMs = windows.afterMs; this.joinOtherAfterMs = windows.beforeMs; this.joinerThis = joinerThis; - this.sharedTimeTracker = sharedTimeTracker; this.retentionPeriod = retentionPeriod; } @@ -69,6 +65,7 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1 } private class KStreamKStreamSelfJoinProcessor extends ContextualProcessor<K, V1, K, VOut> { + private final TimeTracker timeTracker = new TimeTracker(); private WindowStore<K, V2> windowStore; private Sensor droppedRecordsSensor; @@ -95,9 +92,9 @@ class KStreamKStreamSelfJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1 final Record selfRecord = record .withValue(joinerThis.apply(record.key(), record.value(), (V2) record.value())) .withTimestamp(inputRecordTimestamp); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); + timeTracker.advanceStreamTime(inputRecordTimestamp); // We emit the self record only if it isn't expired. - final boolean emitSelfRecord = inputRecordTimestamp > sharedTimeTracker.streamTime - retentionPeriod + 1; + final boolean emitSelfRecord = inputRecordTimestamp > timeTracker.streamTime - retentionPeriod + 1; // Join current record with other try (final WindowStoreIterator<V2> iter = windowStore.fetch(record.key(), timeFrom, timeTo)) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java new file mode 100644 index 00000000000..7e60441a0f2 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static java.time.Duration.ofSeconds; +import static java.util.Arrays.asList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsEqual.equalTo; + +@Timeout(600) +@Tag("integration") +public class KStreamKStreamIntegrationTest { + private final static int NUM_BROKERS = 1; + + public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final static MockTime MOCK_TIME = CLUSTER.time; + private final static String LEFT_STREAM = "leftStream"; + private final static String RIGHT_STREAM = "rightStream"; + private final static String OUTPUT = "output"; + private Properties streamsConfig; + private KafkaStreams streams; + private final static Properties CONSUMER_CONFIG = new Properties(); + private final static Properties PRODUCER_CONFIG = new Properties(); + + @BeforeAll + public static void startCluster() throws Exception { + CLUSTER.start(); + + //Use multiple partitions to ensure distribution of keys. + CLUSTER.createTopic(LEFT_STREAM, 4, 1); + CLUSTER.createTopic(RIGHT_STREAM, 4, 1); + CLUSTER.createTopic(OUTPUT, 4, 1); + + CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "result-consumer"); + CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + } + + @AfterAll + public static void closeCluster() { + CLUSTER.stop(); + } + + @BeforeEach + public void before(final TestInfo testInfo) throws IOException { + final String stateDirBasePath = TestUtils.tempDirectory().getPath(); + final String safeTestName = safeUniqueTestName(getClass(), testInfo); + streamsConfig = getStreamsConfig(safeTestName); + streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath); + } + + @AfterEach + public void after() throws IOException { + if (streams != null) { + streams.close(); + streams = null; + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfig); + } + + @Test + public void shouldOuterJoin() throws Exception { + final Set<KeyValue<String, String>> expected = new HashSet<>(); + expected.add(new KeyValue<>("Key-1", "value1=left-1a,value2=null")); + expected.add(new KeyValue<>("Key-2", "value1=left-2a,value2=null")); + expected.add(new KeyValue<>("Key-3", "value1=left-3a,value2=null")); + expected.add(new KeyValue<>("Key-4", "value1=left-4a,value2=null")); + + verifyKStreamKStreamOuterJoin(expected); + } + + private void verifyKStreamKStreamOuterJoin(final Set<KeyValue<String, String>> expectedResult) throws Exception { + streams = prepareTopology(streamsConfig); + + startApplicationAndWaitUntilRunning(Collections.singletonList(streams), ofSeconds(120)); + + PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + final List<KeyValue<String, String>> left1 = asList( + new KeyValue<>("Key-1", "left-1a"), + new KeyValue<>("Key-2", "left-2a"), + new KeyValue<>("Key-3", "left-3a"), + new KeyValue<>("Key-4", "left-4a") + ); + + final List<KeyValue<String, String>> left2 = asList( + new KeyValue<>("Key-1", "left-1b"), + new KeyValue<>("Key-2", "left-2b"), + new KeyValue<>("Key-3", "left-3b"), + new KeyValue<>("Key-4", "left-4b") + ); + + IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left1, PRODUCER_CONFIG, MOCK_TIME); + MOCK_TIME.sleep(10000); + IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left2, PRODUCER_CONFIG, MOCK_TIME); + + final Set<KeyValue<String, String>> result = new HashSet<>(waitUntilMinKeyValueRecordsReceived( + CONSUMER_CONFIG, + OUTPUT, + expectedResult.size())); + + assertThat(expectedResult, equalTo(result)); + } + + private Properties getStreamsConfig(final String testName) { + final Properties streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStream-KStream-join" + testName); + streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L); + streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); + + return streamsConfig; + } + + private static KafkaStreams prepareTopology(final Properties streamsConfig) { + final StreamsBuilder builder = new StreamsBuilder(); + + final KStream<String, String> stream1 = builder.stream(LEFT_STREAM); + final KStream<String, String> stream2 = builder.stream(RIGHT_STREAM); + + final ValueJoiner<String, String, String> joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2; + + stream1.outerJoin(stream2, joiner, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10))).to(OUTPUT); + + return new KafkaStreams(builder.build(streamsConfig), streamsConfig); + } + +} \ No newline at end of file