http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index 0c94084..1f9a473 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -38,7 +38,7 @@ public interface PartitionGrouper { * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this * interface. See {@link DefaultPartitionGrouper} for more information. * - * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics + * @param topicGroups The map from the {@link TopologyBuilder#topicGroups()} topic group} id to topics * @param metadata Metadata of the consuming cluster * @return a map of task ids to groups of partitions */
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 1743baf..19440e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -68,6 +68,8 @@ public class TopologyBuilder { private final HashMap<String, Pattern> topicToPatterns = new HashMap<>(); private final HashMap<String, String> nodeToSinkTopic = new HashMap<>(); private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + private String applicationId; + private Map<Integer, Set<String>> nodeGroups = null; private Pattern topicPattern; @@ -601,8 +603,8 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Map<Integer, TopicsInfo> topicGroups(String applicationId) { - Map<Integer, TopicsInfo> topicGroups = new HashMap<>(); + public Map<Integer, TopicsInfo> topicGroups() { + Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); if (subscriptionUpdates.hasUpdates()) { @@ -629,6 +631,12 @@ public class TopologyBuilder { // if some of the topics are internal, add them to the internal topics for (String topic : topics) { if (this.internalTopicNames.contains(topic)) { + if (applicationId == null) { + throw new TopologyBuilderException("There are internal topics and" + + " applicationId hasn't been " + + "set. Call setApplicationId " + + "first"); + } // prefix the internal topic name with the application id String internalTopic = applicationId + "-" + topic; internalSourceTopics.add(internalTopic); @@ -681,7 +689,7 @@ public class TopologyBuilder { } private Map<Integer, Set<String>> makeNodeGroups() { - HashMap<Integer, Set<String>> nodeGroups = new HashMap<>(); + HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>(); HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>(); int nodeGroupId = 0; @@ -739,13 +747,30 @@ public class TopologyBuilder { for (String node : nodeNames) { String[] topics = nodeToSourceTopics.get(node); if (topics != null) - copartitionGroup.addAll(Arrays.asList(topics)); + copartitionGroup.addAll(convertInternalTopicNames(topics)); } list.add(Collections.unmodifiableSet(copartitionGroup)); } return Collections.unmodifiableList(list); } + private List<String> convertInternalTopicNames(String...topics) { + final List<String> topicNames = new ArrayList<>(); + for (String topic : topics) { + if (internalTopicNames.contains(topic)) { + if (applicationId == null) { + throw new TopologyBuilderException("there are internal topics " + + "and applicationId hasn't been set. Call " + + "setApplicationId first"); + } + topicNames.add(applicationId + "-" + topic); + } else { + topicNames.add(topic); + } + } + return topicNames; + } + /** * Build the topology for the specified topic group. This is called automatically when passing this builder into the * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor. @@ -814,10 +839,15 @@ public class TopologyBuilder { * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ - public Set<String> sourceTopics(String applicationId) { + public Set<String> sourceTopics() { Set<String> topics = new HashSet<>(); for (String topic : sourceTopicNames) { if (internalTopicNames.contains(topic)) { + if (applicationId == null) { + throw new TopologyBuilderException("there are internal topics and " + + "applicationId is null. Call " + + "setApplicationId before sourceTopics"); + } topics.add(applicationId + "-" + topic); } else { topics.add(topic); @@ -849,4 +879,14 @@ public class TopologyBuilder { public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) { this.subscriptionUpdates = subscriptionUpdates; } + + /** + * Set the applicationId. This is required before calling + * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources} + * @param applicationId the streams applicationId. Should be the same as set by + * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} + */ + public void setApplicationId(String applicationId) { + this.applicationId = applicationId; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index eb731be..07f9a1f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -81,7 +81,9 @@ public class RecordCollector { if (partitions != null) partition = partitioner.partition(record.key(), record.value(), partitions.size()); } - this.producer.send(new ProducerRecord<>(record.topic(), partition, keyBytes, valBytes), callback); + this.producer.send(new ProducerRecord<>(record.topic(), partition, record.timestamp(), + keyBytes, + valBytes), callback); } public void flush() { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index adefab9..4b52511 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Configurable; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; @@ -259,7 +260,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } streamThread.builder.updateSubscriptions(subscriptionUpdates); - this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId); + this.topicGroups = streamThread.builder.topicGroups(); // ensure the co-partitioning topics within the group have the same number of partitions, // and enforce the number of partitions for those internal topics. @@ -270,14 +271,15 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics); internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics); } - Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups(); - ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata); - // for those internal source topics that do not have co-partition enforcement, + // for all internal source topics // set the number of partitions to the maximum of the depending sub-topologies source topics + Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>(); + Set<String> allInternalTopicNames = new HashSet<>(); for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) { Set<String> internalTopics = entry.getValue().interSourceTopics; + allInternalTopicNames.addAll(internalTopics); for (String internalTopic : internalTopics) { Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic); @@ -288,20 +290,41 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable if (otherSinkTopics.contains(internalTopic)) { for (String topic : other.getValue().sourceTopics) { - List<PartitionInfo> infos = metadata.partitionsForTopic(topic); - - if (infos != null && infos.size() > numPartitions) - numPartitions = infos.size(); + Integer partitions = null; + // It is possible the sourceTopic is another internal topic, i.e, + // map().join().join(map()) + if (allInternalTopicNames.contains(topic)) { + Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(topic); + if (taskIds != null) { + for (TaskId taskId : taskIds) { + partitions = taskId.partition; + } + } + } else { + partitions = metadata.partitionCountForTopic(topic); + } + if (partitions != null && partitions > numPartitions) { + numPartitions = partitions; + } } } } - internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions))); + for (int partition = 0; partition < numPartitions; partition++) { + internalPartitionInfos.put(new TopicPartition(internalTopic, partition), + new PartitionInfo(internalTopic, partition, null, new Node[0], new Node[0])); + } } } } - Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false); + + Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups(); + ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, + metadata.withPartitions(internalPartitionInfos)); + + + internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false); internalSourceTopicToTaskIds.clear(); Cluster metadataWithInternalTopics = metadata; @@ -469,10 +492,22 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable } } + if (numPartitions == -1) { + for (String topic : internalTopics) { + if (copartitionGroup.contains(topic)) { + Integer partitions = metadata.partitionCountForTopic(topic); + if (partitions != null && partitions > numPartitions) { + numPartitions = partitions; + } + } + } + } // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds for (String topic : internalTopics) { - if (copartitionGroup.contains(topic)) - internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions))); + if (copartitionGroup.contains(topic)) { + internalSourceTopicToTaskIds + .put(topic, Collections.singleton(new TaskId(-1, numPartitions))); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 64127a8..d1ce40f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -161,7 +161,7 @@ public class StreamThread extends Thread { this.applicationId = applicationId; this.config = config; this.builder = builder; - this.sourceTopics = builder.sourceTopics(applicationId); + this.sourceTopics = builder.sourceTopics(); this.topicPattern = builder.sourceTopicPattern(); this.clientId = clientId; this.processId = processId; http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 809a238..b642b2a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -135,12 +134,12 @@ public class InternalTopicIntegrationTest { public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } - }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + }).groupBy(new KeyValueMapper<String, String, String>() { @Override - public KeyValue<String, String> apply(String key, String value) { - return new KeyValue<String, String>(value, value); + public String apply(String key, String value) { + return value; } - }).countByKey("Counts").toStream(); + }).count("Counts").toStream(); wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java index 9e9d366..ea216f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -205,12 +205,13 @@ public class JoinIntegrationTest { } }) // Compute the total per region by summing the individual click counts per region. - .reduceByKey(new Reducer<Long>() { + .groupByKey(stringSerde, longSerde) + .reduce(new Reducer<Long>() { @Override public Long apply(Long value1, Long value2) { return value1 + value2; } - }, stringSerde, longSerde, "ClicksPerRegionUnwindowed"); + }, "ClicksPerRegionUnwindowed"); // Write the (continuously updating) results to the output topic. clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java new file mode 100644 index 0000000..44e92f7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class KGroupedStreamIntegrationTest { + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + new EmbeddedSingleNodeKafkaCluster(); + private static volatile int testNo = 0; + private KStreamBuilder builder; + private Properties streamsConfiguration; + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String outputTopic; + private KGroupedStream<String, String> groupedStream; + private Reducer<String> reducer; + private Initializer<Integer> initializer; + private Aggregator<String, String, Integer> aggregator; + private KStream<Integer, String> stream; + + + @Before + public void before() { + testNo++; + builder = new KStreamBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + String applicationId = "kgrouped-stream-test-" + + testNo; + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + streamsConfiguration + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kgrouped-stream-test"); + + KeyValueMapper<Integer, String, String> + mapper = + new KeyValueMapper<Integer, String, String>() { + @Override + public String apply(Integer key, String value) { + return value; + } + }; + stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput); + groupedStream = stream + .groupBy( + mapper, + Serdes.String(), + Serdes.String()); + + reducer = new Reducer<String>() { + @Override + public String apply(String value1, String value2) { + return value1 + ":" + value2; + } + }; + initializer = new Initializer<Integer>() { + @Override + public Integer apply() { + return 0; + } + }; + aggregator = new Aggregator<String, String, Integer>() { + @Override + public Integer apply(String aggKey, String value, Integer aggregate) { + return aggregate + value.length(); + } + }; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + + @Test + public void shouldReduce() throws Exception { + produceMessages(System.currentTimeMillis()); + groupedStream + .reduce(reducer, "reduce-by-key") + .to(Serdes.String(), Serdes.String(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, String>> results = receiveMessages( + new StringDeserializer(), + new StringDeserializer() + , 10); + + Collections.sort(results, new Comparator<KeyValue<String, String>>() { + @Override + public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"), + KeyValue.pair("A", "A:A"), + KeyValue.pair("B", "B"), + KeyValue.pair("B", "B:B"), + KeyValue.pair("C", "C"), + KeyValue.pair("C", "C:C"), + KeyValue.pair("D", "D"), + KeyValue.pair("D", "D:D"), + KeyValue.pair("E", "E"), + KeyValue.pair("E", "E:E")))); + } + + @SuppressWarnings("unchecked") + private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1, + final KeyValue<K, V> o2) { + final int keyComparison = o1.key.compareTo(o2.key); + if (keyComparison == 0) { + return o1.value.compareTo(o2.value); + } + return keyComparison; + } + + @Test + public void shouldReduceWindowed() throws Exception { + long firstBatchTimestamp = System.currentTimeMillis() - 1000; + produceMessages(firstBatchTimestamp); + long secondBatchTimestamp = System.currentTimeMillis(); + produceMessages(secondBatchTimestamp); + produceMessages(secondBatchTimestamp); + + groupedStream + .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L)) + .toStream(new KeyValueMapper<Windowed<String>, String, String>() { + @Override + public String apply(Windowed<String> windowedKey, String value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }) + .to(Serdes.String(), Serdes.String(), outputTopic); + + startStreams(); + + List<KeyValue<String, String>> windowedOutput = receiveMessages( + new StringDeserializer(), + new StringDeserializer() + , 15); + + Comparator<KeyValue<String, String>> + comparator = + new Comparator<KeyValue<String, String>>() { + @Override + public int compare(final KeyValue<String, String> o1, + final KeyValue<String, String> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }; + + Collections.sort(windowedOutput, comparator); + long firstBatchWindow = firstBatchTimestamp / 500 * 500; + long secondBatchWindow = secondBatchTimestamp / 500 * 500; + + assertThat(windowedOutput, is( + Arrays.asList( + new KeyValue<>("A@" + firstBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A"), + new KeyValue<>("A@" + secondBatchWindow, "A:A"), + new KeyValue<>("B@" + firstBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B"), + new KeyValue<>("B@" + secondBatchWindow, "B:B"), + new KeyValue<>("C@" + firstBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C"), + new KeyValue<>("C@" + secondBatchWindow, "C:C"), + new KeyValue<>("D@" + firstBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D"), + new KeyValue<>("D@" + secondBatchWindow, "D:D"), + new KeyValue<>("E@" + firstBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E"), + new KeyValue<>("E@" + secondBatchWindow, "E:E") + ) + )); + } + + @Test + public void shouldAggregate() throws Exception { + produceMessages(System.currentTimeMillis()); + groupedStream.aggregate( + initializer, + aggregator, + Serdes.Integer(), + "aggregate-by-selected-key") + .to(Serdes.String(), Serdes.Integer(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, Integer>> results = receiveMessages( + new StringDeserializer(), + new IntegerDeserializer() + , 10); + + Collections.sort(results, new Comparator<KeyValue<String, Integer>>() { + @Override + public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList( + KeyValue.pair("A", 1), + KeyValue.pair("A", 2), + KeyValue.pair("B", 1), + KeyValue.pair("B", 2), + KeyValue.pair("C", 1), + KeyValue.pair("C", 2), + KeyValue.pair("D", 1), + KeyValue.pair("D", 2), + KeyValue.pair("E", 1), + KeyValue.pair("E", 2) + ))); + } + + @Test + public void shouldAggregateWindowed() throws Exception { + long firstTimestamp = System.currentTimeMillis() - 1000; + produceMessages(firstTimestamp); + long secondTimestamp = System.currentTimeMillis(); + produceMessages(secondTimestamp); + produceMessages(secondTimestamp); + + groupedStream.aggregate( + initializer, + aggregator, + TimeWindows.of("aggregate-by-key-windowed", 500L), + Serdes.Integer()) + .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() { + @Override + public String apply(Windowed<String> windowedKey, Integer value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }) + .to(Serdes.String(), Serdes.Integer(), outputTopic); + + startStreams(); + + List<KeyValue<String, Integer>> windowedMessages = receiveMessages( + new StringDeserializer(), + new IntegerDeserializer() + , 15); + + Comparator<KeyValue<String, Integer>> + comparator = + new Comparator<KeyValue<String, Integer>>() { + @Override + public int compare(final KeyValue<String, Integer> o1, + final KeyValue<String, Integer> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }; + + Collections.sort(windowedMessages, comparator); + + long firstWindow = firstTimestamp / 500 * 500; + long secondWindow = secondTimestamp / 500 * 500; + + assertThat(windowedMessages, is( + Arrays.asList( + new KeyValue<>("A@" + firstWindow, 1), + new KeyValue<>("A@" + secondWindow, 1), + new KeyValue<>("A@" + secondWindow, 2), + new KeyValue<>("B@" + firstWindow, 1), + new KeyValue<>("B@" + secondWindow, 1), + new KeyValue<>("B@" + secondWindow, 2), + new KeyValue<>("C@" + firstWindow, 1), + new KeyValue<>("C@" + secondWindow, 1), + new KeyValue<>("C@" + secondWindow, 2), + new KeyValue<>("D@" + firstWindow, 1), + new KeyValue<>("D@" + secondWindow, 1), + new KeyValue<>("D@" + secondWindow, 2), + new KeyValue<>("E@" + firstWindow, 1), + new KeyValue<>("E@" + secondWindow, 1), + new KeyValue<>("E@" + secondWindow, 2) + ))); + } + + @Test + public void shouldCount() throws Exception { + produceMessages(System.currentTimeMillis()); + + groupedStream.count("count-by-key") + .to(Serdes.String(), Serdes.Long(), outputTopic); + + startStreams(); + + produceMessages(System.currentTimeMillis()); + + List<KeyValue<String, Long>> results = receiveMessages( + new StringDeserializer(), + new LongDeserializer() + , 10); + Collections.sort(results, new Comparator<KeyValue<String, Long>>() { + @Override + public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }); + + assertThat(results, is(Arrays.asList( + KeyValue.pair("A", 1L), + KeyValue.pair("A", 2L), + KeyValue.pair("B", 1L), + KeyValue.pair("B", 2L), + KeyValue.pair("C", 1L), + KeyValue.pair("C", 2L), + KeyValue.pair("D", 1L), + KeyValue.pair("D", 2L), + KeyValue.pair("E", 1L), + KeyValue.pair("E", 2L) + ))); + } + + @Test + public void shouldGroupByKey() throws Exception { + long timestamp = System.currentTimeMillis(); + produceMessages(timestamp); + produceMessages(timestamp); + + stream.groupByKey(Serdes.Integer(), Serdes.String()) + .count(TimeWindows.of("count-windows", 500L)) + .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() { + @Override + public String apply(final Windowed<Integer> windowedKey, final Long value) { + return windowedKey.key() + "@" + windowedKey.window().start(); + } + }).to(Serdes.String(), Serdes.Long(), outputTopic); + + startStreams(); + + List<KeyValue<String, Long>> results = receiveMessages( + new StringDeserializer(), + new LongDeserializer() + , 10); + Collections.sort(results, new Comparator<KeyValue<String, Long>>() { + @Override + public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) { + return KGroupedStreamIntegrationTest.compare(o1, o2); + } + }); + + long window = timestamp / 500 * 500; + assertThat(results, is(Arrays.asList( + KeyValue.pair("1@" + window, 1L), + KeyValue.pair("1@" + window, 2L), + KeyValue.pair("2@" + window, 1L), + KeyValue.pair("2@" + window, 2L), + KeyValue.pair("3@" + window, 1L), + KeyValue.pair("3@" + window, 2L), + KeyValue.pair("4@" + window, 1L), + KeyValue.pair("4@" + window, 2L), + KeyValue.pair("5@" + window, 1L), + KeyValue.pair("5@" + window, 2L) + ))); + + } + + + private void produceMessages(long timestamp) + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp( + streamOneInput, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(2, "B"), + new KeyValue<>(3, "C"), + new KeyValue<>(4, "D"), + new KeyValue<>(5, "E")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties()), + timestamp); + } + + + private void createTopics() { + streamOneInput = "stream-one-" + testNo; + outputTopic = "output-" + testNo; + CLUSTER.createTopic(streamOneInput, 3, 1); + CLUSTER.createTopic(outputTopic); + } + + private void startStreams() { + kafkaStreams = new KafkaStreams(builder, streamsConfiguration); + kafkaStreams.start(); + } + + + private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K> + keyDeserializer, + final Deserializer<V> + valueDeserializer, + final int numMessages) + throws InterruptedException { + final Properties consumerProperties = new Properties(); + consumerProperties + .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" + + testNo); + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + keyDeserializer.getClass().getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + valueDeserializer.getClass().getName()); + return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties, + outputTopic, + numMessages, 60 * 1000); + + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java new file mode 100644 index 0000000..221d349 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by + * applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class KStreamRepartitionJoinTest { + + @ClassRule + public static final EmbeddedSingleNodeKafkaCluster CLUSTER = + new EmbeddedSingleNodeKafkaCluster(); + + private static volatile int testNo = 0; + + private KStreamBuilder builder; + private Properties streamsConfiguration; + private KStream<Long, Integer> streamOne; + private KStream<Integer, String> streamTwo; + private KStream<Integer, Integer> streamThree; + private KStream<Integer, String> streamFour; + private KTable<Integer, String> kTable; + private ValueJoiner<Integer, String, String> valueJoiner; + private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> + keyMapper; + + private final List<String> + expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E"); + private KafkaStreams kafkaStreams; + private String streamOneInput; + private String streamTwoInput; + private String streamFourInput; + private String tableInput; + private String outputTopic; + private String streamThreeInput; + + + + @Before + public void before() { + testNo++; + String applicationId = "kstream-repartition-join-test" + testNo; + builder = new KStreamBuilder(); + createTopics(); + streamsConfiguration = new Properties(); + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, + applicationId); + streamsConfiguration + .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test"); + + streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput); + streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput); + streamThree = builder.stream(Serdes.Integer(), Serdes.Integer(), streamThreeInput); + streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput); + + kTable = builder.table(Serdes.Integer(), Serdes.String(), tableInput); + + valueJoiner = new ValueJoiner<Integer, String, String>() { + @Override + public String apply(final Integer value1, final String value2) { + return value1 + ":" + value2; + } + }; + + keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() { + @Override + public KeyValue<Integer, Integer> apply(final Long key, final Integer value) { + return new KeyValue<>(value, value); + } + }; + } + + @After + public void whenShuttingDown() throws IOException { + if (kafkaStreams != null) { + kafkaStreams.close(); + } + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldMapStreamOneAndJoin() throws ExecutionException, InterruptedException { + produceMessages(); + doJoin(streamOne.map(keyMapper), streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + @Test + public void shouldMapBothStreamsAndJoin() throws Exception { + produceMessages(); + + final KStream<Integer, Integer> + map1 = + streamOne.map(keyMapper); + + final KStream<Integer, String> map2 = streamTwo.map( + new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { + @Override + public KeyValue<Integer, String> apply(Integer key, + String value) { + return new KeyValue<>(key, value); + } + }); + + doJoin(map1, map2); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + + } + + @Test + public void shouldMapMapJoin() throws Exception { + produceMessages(); + + final KStream<Integer, Integer> mapMapStream = streamOne.map( + new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() { + @Override + public KeyValue<Long, Integer> apply(Long key, Integer value) { + return new KeyValue<>(key + value, value); + } + }).map(keyMapper); + + doJoin(mapMapStream, streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + + @Test + public void shouldSelectKeyAndJoin() throws ExecutionException, InterruptedException { + produceMessages(); + + final KStream<Integer, Integer> + keySelected = + streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() { + @Override + public Integer apply(final Long key, final Integer value) { + return value; + } + }); + + doJoin(keySelected, streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + + @Test + public void shouldFlatMapJoin() throws Exception { + produceMessages(); + + final KStream<Integer, Integer> flatMapped = streamOne.flatMap( + new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() { + @Override + public Iterable<KeyValue<Integer, Integer>> apply(Long key, + Integer value) { + return Collections.singletonList(new KeyValue<>(value, value)); + } + }); + + doJoin(flatMapped, streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + @Test + public void shouldJoinTwoStreamsPartitionedTheSame() throws Exception { + produceMessages(); + doJoin(streamThree, streamTwo); + startStreams(); + verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E")); + } + + @Test + public void shouldJoinWithRhsStreamMapped() throws Exception { + produceMessages(); + + ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() { + @Override + public String apply(String value1, Integer value2) { + return value1 + ":" + value2; + } + }; + streamTwo + .join(streamOne.map(keyMapper), + joiner, + JoinWindows.of("the-join").within(60 * 1000), + Serdes.Integer(), + Serdes.String(), + Serdes.Integer()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + + startStreams(); + verifyCorrectOutput(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5")); + } + + @Test + public void shouldLeftJoinTwoStreamsPartitionedTheSame() throws Exception { + produceMessages(); + doLeftJoin(streamThree, streamTwo); + startStreams(); + verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E")); + } + + @Test + public void shouldMapStreamOneAndLeftJoin() throws ExecutionException, InterruptedException { + produceMessages(); + doLeftJoin(streamOne.map(keyMapper), streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + @Test + public void shouldMapBothStreamsAndLeftJoin() throws Exception { + produceMessages(); + + final KStream<Integer, Integer> + map1 = + streamOne.map(keyMapper); + + final KStream<Integer, String> map2 = streamTwo.map( + new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { + @Override + public KeyValue<Integer, String> apply(Integer key, + String value) { + return new KeyValue<>(key, value); + } + }); + + doLeftJoin(map1, map2); + startStreams(); + + List<String> received = receiveMessages(new StringDeserializer(), 5); + + if (!received.equals(expectedStreamOneTwoJoin)) { + produceToStreamOne(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + } + + @Test + public void shouldLeftJoinWithRhsStreamMapped() throws Exception { + produceMessages(); + + ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() { + @Override + public String apply(String value1, Integer value2) { + return value1 + ":" + value2; + } + }; + streamTwo + .leftJoin(streamOne.map(keyMapper), + joiner, + JoinWindows.of("the-join").within(60 * 1000), + Serdes.Integer(), + null, + Serdes.Integer()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + + startStreams(); + List<String> received = receiveMessages(new StringDeserializer(), 5); + + List<String> expectedMessages = Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"); + if (!received.equals(expectedMessages)) { + produceStreamTwoInputTo(streamTwoInput); + verifyCorrectOutput(expectedMessages); + } + } + + @Test + public void shouldLeftJoinWithKTableAfterMap() throws Exception { + produceMessages(); + streamOne.map(keyMapper) + .leftJoin(kTable, valueJoiner, Serdes.Integer(), Serdes.Integer()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + + startStreams(); + + List<String> received = receiveMessages(new StringDeserializer(), 5); + assertThat(received, is(expectedStreamOneTwoJoin)); + } + + @Test + public void shouldLeftJoinWithTableProducedFromGroupBy() throws Exception { + produceMessages(); + KTable<Integer, String> aggTable = + streamOne.map(keyMapper) + .groupByKey(Serdes.Integer(), Serdes.Integer()) + .aggregate(new Initializer<String>() { + @Override + public String apply() { + return ""; + } + }, new Aggregator<Integer, Integer, String>() { + @Override + public String apply(final Integer aggKey, final Integer value, + final String aggregate) { + return aggregate + ":" + value; + } + }, Serdes.String(), "agg-by-key"); + + streamTwo.leftJoin(aggTable, new ValueJoiner<String, String, String>() { + @Override + public String apply(final String value1, final String value2) { + return value1 + "@" + value2; + } + }, Serdes.Integer(), Serdes.String()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + + startStreams(); + + receiveMessages(new StringDeserializer(), 5); + produceStreamTwoInputTo(streamTwoInput); + List<String> received = receiveMessages(new StringDeserializer(), 5); + + assertThat(received, is(Arrays.asList("A@:1", "B@:2", "C@:3", "D@:4", "E@:5"))); + + } + + + @Test + public void shouldJoinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception { + produceMessages(); + + final KStream<Integer, Integer> + map1 = + streamOne.map(keyMapper); + + final KeyValueMapper<Integer, String, KeyValue<Integer, String>> + kvMapper = + new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() { + @Override + public KeyValue<Integer, String> apply(Integer key, + String value) { + return new KeyValue<>(key, value); + } + }; + + final KStream<Integer, String> map2 = streamTwo.map(kvMapper); + + final KStream<Integer, String> join = map1.join(map2, + valueJoiner, + JoinWindows.of("the-join") + .within(60 * 1000), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()); + + ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { + @Override + public String apply(final String value1, final String value2) { + return value1 + ":" + value2; + } + }; + join.map(kvMapper) + .join(streamFour.map(kvMapper), + joiner, + JoinWindows.of("the-other-join").within(60 * 1000), + Serdes.Integer(), + Serdes.String(), + Serdes.String()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + + startStreams(); + verifyCorrectOutput(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E")); + } + + @Test + public void shouldFilterNullKeysWhenRepartionedOnJoin() throws Exception { + produceMessages(); + IntegrationTestUtils.produceKeyValuesSynchronously( + streamOneInput, + Collections.singleton( + new KeyValue<Long, Integer>(70L, null)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + IntegerSerializer.class, + new Properties())); + + doJoin(streamOne.map(keyMapper), streamTwo); + startStreams(); + verifyCorrectOutput(expectedStreamOneTwoJoin); + } + + private void produceMessages() + throws ExecutionException, InterruptedException { + produceToStreamOne(); + produceStreamTwoInputTo(streamTwoInput); + produceToStreamThree(); + produceStreamTwoInputTo(tableInput); + produceStreamTwoInputTo(streamFourInput); + + } + + private void produceToStreamThree() + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronously( + streamThreeInput, + Arrays.asList( + new KeyValue<>(1, 10), + new KeyValue<>(2, 20), + new KeyValue<>(3, 30), + new KeyValue<>(4, 40), + new KeyValue<>(5, 50)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class, + new Properties())); + } + + private void produceStreamTwoInputTo(final String streamTwoInput) + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronously( + streamTwoInput, + Arrays.asList( + new KeyValue<>(1, "A"), + new KeyValue<>(2, "B"), + new KeyValue<>(3, "C"), + new KeyValue<>(4, "D"), + new KeyValue<>(5, "E")), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + IntegerSerializer.class, + StringSerializer.class, + new Properties())); + } + + private void produceToStreamOne() + throws ExecutionException, InterruptedException { + IntegrationTestUtils.produceKeyValuesSynchronously( + streamOneInput, + Arrays.asList( + new KeyValue<>(10L, 1), + new KeyValue<>(5L, 2), + new KeyValue<>(12L, 3), + new KeyValue<>(15L, 4), + new KeyValue<>(20L, 5)), + TestUtils.producerConfig( + CLUSTER.bootstrapServers(), + LongSerializer.class, + IntegerSerializer.class, + new Properties())); + } + + private void createTopics() { + streamOneInput = "stream-one-" + testNo; + streamTwoInput = "stream-two-" + testNo; + streamThreeInput = "stream-three-" + testNo; + streamFourInput = "stream-four-" + testNo; + tableInput = "table-stream-two-" + testNo; + outputTopic = "output-" + testNo; + CLUSTER.createTopic(streamOneInput); + CLUSTER.createTopic(streamTwoInput, 2, 1); + CLUSTER.createTopic(streamThreeInput, 2, 1); + CLUSTER.createTopic(streamFourInput); + CLUSTER.createTopic(tableInput, 2, 1); + CLUSTER.createTopic(outputTopic); + } + + + private void startStreams() { + kafkaStreams = new KafkaStreams(builder, streamsConfiguration); + kafkaStreams.start(); + } + + + private List<String> receiveMessages(final Deserializer<?> valueDeserializer, + final int numMessages) throws InterruptedException { + + final Properties config = new Properties(); + + config + .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test-" + testNo); + config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + IntegerDeserializer.class.getName()); + config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + valueDeserializer.getClass().getName()); + List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config, + outputTopic, + numMessages, + 60 * + 1000); + Collections.sort(received); + return received; + } + + private void verifyCorrectOutput(List<String> expectedMessages) throws InterruptedException { + assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size()), + is(expectedMessages)); + } + + private void doJoin(KStream<Integer, Integer> lhs, + KStream<Integer, String> rhs) { + lhs.join(rhs, + valueJoiner, + JoinWindows.of("the-join").within(60 * 1000), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + } + + private void doLeftJoin(KStream<Integer, Integer> lhs, + KStream<Integer, String> rhs) { + lhs.leftJoin(rhs, + valueJoiner, + JoinWindows.of("the-join").within(60 * 1000), + Serdes.Integer(), + Serdes.Integer(), + Serdes.String()) + .to(Serdes.Integer(), Serdes.String(), outputTopic); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java index e00cd13..2966590 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java @@ -36,6 +36,8 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertThat; import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -56,7 +58,7 @@ public class WordCountIntegrationTest { @BeforeClass public static void startKafkaCluster() throws Exception { - CLUSTER.createTopic(DEFAULT_INPUT_TOPIC); + CLUSTER.createTopic(DEFAULT_INPUT_TOPIC, 2, 1); CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); } @@ -65,9 +67,9 @@ public class WordCountIntegrationTest { List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world"); List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList( new KeyValue<>("hello", 1L), + new KeyValue<>("hello", 2L), new KeyValue<>("world", 1L), new KeyValue<>("world", 2L), - new KeyValue<>("hello", 2L), new KeyValue<>("world", 3L) ); @@ -101,12 +103,12 @@ public class WordCountIntegrationTest { public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } - }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + }).groupBy(new KeyValueMapper<String, String, String>() { @Override - public KeyValue<String, String> apply(String key, String value) { - return new KeyValue<String, String>(value, value); + public String apply(final String key, final String value) { + return value; } - }).countByKey("Counts") + }).count("Counts") .toStream(); wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); @@ -139,6 +141,16 @@ public class WordCountIntegrationTest { consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size()); + Collections.sort(actualWordCounts, new Comparator<KeyValue<String, Long>>() { + @Override + public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) { + int keyComparison = o1.key.compareTo(o2.key); + if (keyComparison == 0) { + return o1.value.compareTo(o2.value); + } + return keyComparison; + } + }); streams.close(); assertThat(actualWordCounts, equalTo(expectedWordCounts)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index c3f9089..83b431c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -134,10 +134,21 @@ public class IntegrationTestUtils { public static <K, V> void produceKeyValuesSynchronously( String topic, Collection<KeyValue<K, V>> records, Properties producerConfig) throws ExecutionException, InterruptedException { + produceKeyValuesSynchronouslyWithTimestamp(topic, + records, + producerConfig, + null); + } + + public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic, + Collection<KeyValue<K, V>> records, + Properties producerConfig, + Long timestamp) + throws ExecutionException, InterruptedException { Producer<K, V> producer = new KafkaProducer<>(producerConfig); for (KeyValue<K, V> record : records) { Future<RecordMetadata> f = producer.send( - new ProducerRecord<>(topic, record.key, record.value)); + new ProducerRecord<>(topic, null, timestamp, record.key, record.value)); f.get(); } producer.flush(); @@ -226,4 +237,5 @@ public class IntegrationTestUtils { Thread.sleep(Math.min(waitTime, 100L)); } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 65a4b54..1a608a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -78,7 +78,8 @@ public class KStreamKStreamLeftJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test") + .within(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); @@ -156,7 +157,8 @@ public class KStreamKStreamLeftJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde); + joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test") + .within(100), intSerde, stringSerde, stringSerde); joined.process(processor); Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java index 2c6108b..8bc9a77 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java @@ -17,14 +17,12 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; @@ -144,19 +142,5 @@ public class KStreamKTableLeftJoinTest { processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); } - @Test(expected = KafkaException.class) - public void testNotJoinable() { - KStreamBuilder builder = new KStreamBuilder(); - - KStream<Integer, String> stream; - KTable<Integer, String> table; - MockProcessorSupplier<Integer, String> processor; - - processor = new MockProcessorSupplier<>(); - stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); - table = builder.table(intSerde, stringSerde, topic2); - - stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index f4fe3a6..db533e4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -71,10 +71,12 @@ public class KStreamWindowAggregateTest { KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); KTable<Windowed<String>, String> table2 = - stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic1-Canonized", 10).advanceBy(5), - strSerde, - strSerde); + stream1.groupByKey(strSerde, + strSerde) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + TimeWindows.of("topic1-Canonized", 10).advanceBy(5), + strSerde); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); @@ -149,20 +151,22 @@ public class KStreamWindowAggregateTest { KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1); KTable<Windowed<String>, String> table1 = - stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic1-Canonized", 10).advanceBy(5), - strSerde, - strSerde); + stream1.groupByKey(strSerde, strSerde) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + TimeWindows.of("topic1-Canonized", 10).advanceBy(5), + strSerde); MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); table1.toStream().process(proc1); KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2); KTable<Windowed<String>, String> table2 = - stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, - TimeWindows.of("topic2-Canonized", 10).advanceBy(5), - strSerde, - strSerde); + stream2.groupByKey(strSerde, strSerde) + .aggregate(MockInitializer.STRING_INIT, + MockAggregator.STRING_ADDER, + TimeWindows.of("topic2-Canonized", 10).advanceBy(5), + strSerde); MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 28acf09..107d832 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -139,7 +139,7 @@ public class TopologyBuilderTest { @Test public void testSourceTopics() { final TopologyBuilder builder = new TopologyBuilder(); - + builder.setApplicationId("X"); builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); @@ -150,7 +150,7 @@ public class TopologyBuilderTest { expected.add("topic-2"); expected.add("X-topic-3"); - assertEquals(expected, builder.sourceTopics("X")); + assertEquals(expected, builder.sourceTopics()); } @Test @@ -259,7 +259,7 @@ public class TopologyBuilderTest { builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); - Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X"); + Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet())); @@ -277,7 +277,7 @@ public class TopologyBuilderTest { @Test public void testTopicGroupsByStateStore() { final TopologyBuilder builder = new TopologyBuilder(); - + builder.setApplicationId("X"); builder.addSource("source-1", "topic-1", "topic-1x"); builder.addSource("source-2", "topic-2"); builder.addSource("source-3", "topic-3"); @@ -297,7 +297,7 @@ public class TopologyBuilderTest { builder.addStateStore(supplier); builder.connectProcessorAndStateStores("processor-5", "store-3"); - Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X"); + Map<Integer, TopicsInfo> topicGroups = builder.topicGroups(); Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>(); expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1")))); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 17bda54..4f7037c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -269,9 +269,9 @@ public class StreamPartitionAssignorTest { @Test public void testAssignWithStates() throws Exception { StreamsConfig config = new StreamsConfig(configProps()); - + String applicationId = "test"; TopologyBuilder builder = new TopologyBuilder(); - + builder.setApplicationId(applicationId); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); @@ -295,10 +295,11 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime()); StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); subscriptions.put("consumer10", @@ -474,8 +475,9 @@ public class StreamPartitionAssignorTest { @Test public void testAssignWithInternalTopics() throws Exception { StreamsConfig config = new StreamsConfig(configProps()); - + String applicationId = "test"; TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); builder.addSource("source1", "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); @@ -489,10 +491,11 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; MockClientSupplier clientSupplier = new MockClientSupplier(); - StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime()); StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(internalTopicManager); @@ -501,13 +504,55 @@ public class StreamPartitionAssignorTest { subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode())); - Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + partitionAssignor.assign(metadata, subscriptions); // check prepared internal topics assertEquals(1, internalTopicManager.readyTopics.size()); assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX")); } + @Test + public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + String applicationId = "test"; + TopologyBuilder builder = new TopologyBuilder(); + builder.setApplicationId(applicationId); + builder.addInternalTopic("topicX"); + builder.addSource("source1", "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addSink("sink1", "topicX", "processor1"); + builder.addSource("source2", "topicX"); + builder.addInternalTopic("topicZ"); + builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); + builder.addSink("sink2", "topicZ", "processor2"); + builder.addSource("source3", "topicZ"); + List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + + UUID uuid1 = UUID.randomUUID(); + String client1 = "client1"; + + MockClientSupplier clientSupplier = new MockClientSupplier(); + + StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); + MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer); + partitionAssignor.setInternalTopicManager(internalTopicManager); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + Set<TaskId> emptyTasks = Collections.<TaskId>emptySet(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check prepared internal topics + assertEquals(2, internalTopicManager.readyTopics.size()); + assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ")); + } + private class MockInternalTopicManager extends InternalTopicManager { public Map<String, Integer> readyTopics = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java index fbe7754..1e1e3f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -106,7 +107,11 @@ public class SmokeTestClient extends SmokeTestUtil { data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data")); // min - data.aggregateByKey( + KGroupedStream<String, Integer> + groupedData = + data.groupByKey(stringSerde, intSerde); + + groupedData.aggregate( new Initializer<Integer>() { public Integer apply() { return Integer.MAX_VALUE; @@ -119,7 +124,6 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("uwin-min"), - stringSerde, intSerde ).toStream().map( new Unwindow<String, Integer>() @@ -129,7 +133,7 @@ public class SmokeTestClient extends SmokeTestUtil { minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min")); // max - data.aggregateByKey( + groupedData.aggregate( new Initializer<Integer>() { public Integer apply() { return Integer.MIN_VALUE; @@ -142,7 +146,6 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("uwin-max"), - stringSerde, intSerde ).toStream().map( new Unwindow<String, Integer>() @@ -152,7 +155,7 @@ public class SmokeTestClient extends SmokeTestUtil { maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max")); // sum - data.aggregateByKey( + groupedData.aggregate( new Initializer<Long>() { public Long apply() { return 0L; @@ -165,7 +168,6 @@ public class SmokeTestClient extends SmokeTestUtil { } }, UnlimitedWindows.of("win-sum"), - stringSerde, longSerde ).toStream().map( new Unwindow<String, Long>() @@ -176,10 +178,8 @@ public class SmokeTestClient extends SmokeTestUtil { sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum")); // cnt - data.countByKey( - UnlimitedWindows.of("uwin-cnt"), - stringSerde - ).toStream().map( + groupedData.count(UnlimitedWindows.of("uwin-cnt")) + .toStream().map( new Unwindow<String, Long>() ).to(stringSerde, longSerde, "cnt"); @@ -206,10 +206,8 @@ public class SmokeTestClient extends SmokeTestUtil { ).to(stringSerde, doubleSerde, "avg"); // windowed count - data.countByKey( - TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE), - stringSerde - ).toStream().map( + groupedData.count(TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE)) + .toStream().map( new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() { @Override public KeyValue<String, Long> apply(Windowed<String> key, Long value) {
