This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 529dde9 KAFKA-12648: handle MissingSourceTopicException for named
topologies (#11600)
529dde9 is described below
commit 529dde904a892ba2f95e6150066758e7476f15e3
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jan 18 11:49:23 2022 -0800
KAFKA-12648: handle MissingSourceTopicException for named topologies
(#11600)
Avoid throwing a MissingSourceTopicException inside the #assign method when
named topologies are used, and just remove those topologies which are missing
any of their input topics from the assignment.
Reviewers: Guozhang Wang <[email protected]>, Walker Carlson
<[email protected]>, Bruno Cadonna <[email protected]>
---
.../processor/internals/RepartitionTopics.java | 131 +++++++++++++--------
.../streams/processor/internals/StreamThread.java | 7 +-
.../internals/StreamsPartitionAssignor.java | 37 ++++--
.../processor/internals/TopologyMetadata.java | 30 ++++-
.../integration/NamedTopologyIntegrationTest.java | 74 +++++++++---
.../processor/internals/RepartitionTopicsTest.java | 30 +++--
6 files changed, 218 insertions(+), 91 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 801e6c1..2951451 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -21,10 +21,8 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
-import
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
import
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.slf4j.Logger;
@@ -45,6 +43,7 @@ public class RepartitionTopics {
private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private final Logger log;
private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new
HashMap<>();
+ private final Map<String, Set<String>> missingUserInputTopicsPerTopology =
new HashMap<>();
public RepartitionTopics(final TopologyMetadata topologyMetadata,
final InternalTopicManager internalTopicManager,
@@ -59,50 +58,93 @@ public class RepartitionTopics {
log = logContext.logger(getClass());
}
- public void setup() {
- final Map<Subtopology, TopicsInfo> topicGroups =
topologyMetadata.topicGroups();
- final Map<String, InternalTopicConfig> repartitionTopicMetadata =
computeRepartitionTopicConfig(topicGroups, clusterMetadata);
-
- // ensure the co-partitioning topics within the group have the same
number of partitions,
- // and enforce the number of partitions for those repartition topics
to be the same if they
- // are co-partitioned as well.
- ensureCopartitioning(topologyMetadata.copartitionGroups(),
repartitionTopicMetadata, clusterMetadata);
-
- // make sure the repartition source topics exist with the right number
of partitions,
- // create these topics if necessary
- internalTopicManager.makeReady(repartitionTopicMetadata);
-
- // augment the metadata with the newly computed number of partitions
for all the
- // repartition source topics
- for (final Map.Entry<String, InternalTopicConfig> entry :
repartitionTopicMetadata.entrySet()) {
- final String topic = entry.getKey();
- final int numPartitions =
entry.getValue().numberOfPartitions().orElse(-1);
-
- for (int partition = 0; partition < numPartitions; partition++) {
- topicPartitionInfos.put(
- new TopicPartition(topic, partition),
- new PartitionInfo(topic, partition, null, new Node[0], new
Node[0])
- );
+ /**
+ * @return true iff setup was completed successfully and all user input
topics were verified to exist
+ */
+ public boolean setup() {
+ final Map<String, Collection<TopicsInfo>> topicGroups =
topologyMetadata.topicGroupsByTopology();
+ final Map<String, InternalTopicConfig> repartitionTopicMetadata
+ = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+
+ if (repartitionTopicMetadata.isEmpty()) {
+ if (missingUserInputTopicsPerTopology.isEmpty()) {
+ log.info("Skipping the repartition topic validation since
there are no repartition topics.");
+ } else {
+ log.info("Skipping the repartition topic validation since all
topologies containing repartition"
+ + "topics are missing external user source topics
and cannot be processed.");
+ }
+ } else {
+ // ensure the co-partitioning topics within the group have the
same number of partitions,
+ // and enforce the number of partitions for those repartition
topics to be the same if they
+ // are co-partitioned as well.
+ ensureCopartitioning(topologyMetadata.copartitionGroups(),
repartitionTopicMetadata, clusterMetadata);
+
+ // make sure the repartition source topics exist with the right
number of partitions,
+ // create these topics if necessary
+ internalTopicManager.makeReady(repartitionTopicMetadata);
+
+ // augment the metadata with the newly computed number of
partitions for all the
+ // repartition source topics
+ for (final Map.Entry<String, InternalTopicConfig> entry :
repartitionTopicMetadata.entrySet()) {
+ final String topic = entry.getKey();
+ final int numPartitions =
entry.getValue().numberOfPartitions().orElse(-1);
+
+ for (int partition = 0; partition < numPartitions;
partition++) {
+ topicPartitionInfos.put(
+ new TopicPartition(topic, partition),
+ new PartitionInfo(topic, partition, null, new Node[0],
new Node[0])
+ );
+ }
}
}
+
+ return missingUserInputTopicsPerTopology.isEmpty();
+ }
+
+ public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
+ return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
}
public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
return Collections.unmodifiableMap(topicPartitionInfos);
}
- private Map<String, InternalTopicConfig>
computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
+ /**
+ * @param topicGroups information about the
topic groups (subtopologies) in this application
+ * @param clusterMetadata cluster metadata, eg
which topics exist on the brokers
+ */
+ private Map<String, InternalTopicConfig>
computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>>
topicGroups,
final Cluster clusterMetadata) {
-
- final Map<String, InternalTopicConfig> repartitionTopicConfigs = new
HashMap<>();
- for (final TopicsInfo topicsInfo : topicGroups.values()) {
- checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-
repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
- .collect(Collectors.toMap(InternalTopicConfig::name,
topicConfig -> topicConfig)));
+ final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+ final Map<String, InternalTopicConfig> allRepartitionTopicConfigs =
new HashMap<>();
+ for (final Map.Entry<String, Collection<TopicsInfo>> topology :
topicGroups.entrySet()) {
+ final String topologyName = topology.getKey();
+ final Set<String> missingSourceTopics = new HashSet<>();
+ final Map<String, InternalTopicConfig>
repartitionTopicConfigsPerTopology = new HashMap<>();
+ for (final TopicsInfo topicsInfo : topology.getValue()) {
+
missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo,
clusterMetadata));
+ repartitionTopicConfigsPerTopology.putAll(
+ topicsInfo.repartitionSourceTopics
+ .values()
+ .stream()
+ .collect(Collectors.toMap(InternalTopicConfig::name,
topicConfig -> topicConfig)));
+ }
+ if (missingSourceTopics.isEmpty()) {
+
allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+ allTopicsInfo.addAll(topology.getValue());
+ } else {
+ missingUserInputTopicsPerTopology.put(topologyName,
missingSourceTopics);
+ log.error("Topology {} was missing source topics {} and will
be excluded from the current assignment, "
+ + "this can be due to the consumer client's
metadata being stale or because they have "
+ + "not been created yet. Please verify that you
have created all input topics; if they "
+ + "do exist, you just need to wait for the
metadata to be updated, at which time a new "
+ + "rebalance will be kicked off automatically
and the topology will be retried at that time."
+ + topologyName, missingSourceTopics);
+ }
}
- setRepartitionSourceTopicPartitionCount(repartitionTopicConfigs,
topicGroups, clusterMetadata);
+ setRepartitionSourceTopicPartitionCount(allRepartitionTopicConfigs,
allTopicsInfo, clusterMetadata);
- return repartitionTopicConfigs;
+ return allRepartitionTopicConfigs;
}
private void ensureCopartitioning(final Collection<Set<String>>
copartitionGroups,
@@ -113,31 +155,26 @@ public class RepartitionTopics {
}
}
- private void checkIfExternalSourceTopicsExist(final TopicsInfo topicsInfo,
- final Cluster
clusterMetadata) {
+ private Set<String> computeMissingExternalSourceTopics(final TopicsInfo
topicsInfo,
+ final Cluster
clusterMetadata) {
final Set<String> missingExternalSourceTopics = new
HashSet<>(topicsInfo.sourceTopics);
missingExternalSourceTopics.removeAll(topicsInfo.repartitionSourceTopics.keySet());
missingExternalSourceTopics.removeAll(clusterMetadata.topics());
- if (!missingExternalSourceTopics.isEmpty()) {
- log.error("The following source topics are missing/unknown: {}.
Please make sure all source topics " +
- "have been pre-created before starting the Streams
application. ",
- missingExternalSourceTopics);
- throw new MissingSourceTopicException("Missing source topics.");
- }
+ return missingExternalSourceTopics;
}
/**
* Computes the number of partitions and sets it for each repartition
topic in repartitionTopicMetadata
*/
private void setRepartitionSourceTopicPartitionCount(final Map<String,
InternalTopicConfig> repartitionTopicMetadata,
- final
Map<Subtopology, TopicsInfo> topicGroups,
+ final
Collection<TopicsInfo> topicGroups,
final Cluster
clusterMetadata) {
boolean partitionCountNeeded;
do {
partitionCountNeeded = false;
boolean progressMadeThisIteration = false; // avoid infinitely
looping without making any progress on unknown repartitions
- for (final TopicsInfo topicsInfo : topicGroups.values()) {
+ for (final TopicsInfo topicsInfo : topicGroups) {
for (final String repartitionSourceTopic :
topicsInfo.repartitionSourceTopics.keySet()) {
final Optional<Integer>
repartitionSourceTopicPartitionCount =
repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
@@ -173,12 +210,12 @@ public class RepartitionTopics {
}
private Integer computePartitionCount(final Map<String,
InternalTopicConfig> repartitionTopicMetadata,
- final Map<Subtopology, TopicsInfo>
topicGroups,
+ final Collection<TopicsInfo>
topicGroups,
final Cluster clusterMetadata,
final String repartitionSourceTopic)
{
Integer partitionCount = null;
// try set the number of partitions for this repartition topic if it
is not set yet
- for (final TopicsInfo topicsInfo : topicGroups.values()) {
+ for (final TopicsInfo topicsInfo : topicGroups) {
final Set<String> sinkTopics = topicsInfo.sinkTopics;
if (sinkTopics.contains(repartitionSourceTopic)) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ef425ec..509fd44 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -917,8 +917,9 @@ public class StreamThread extends Thread {
topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+ // We don't need to manually trigger a rebalance to pick up tasks
from the new topology, as
+ // a rebalance will always occur when the metadata is updated
after a change in subscription
subscribeConsumer();
- mainConsumer.enforceRebalance();
}
}
@@ -1137,6 +1138,8 @@ public class StreamThread extends Thread {
log.info("Shutting down");
+ topologyMetadata.unregisterThread(threadMetadata.threadName());
+
try {
taskManager.shutdown(cleanRun);
} catch (final Throwable e) {
@@ -1165,8 +1168,6 @@ public class StreamThread extends Thread {
setState(State.DEAD);
- topologyMetadata.unregisterThread(threadMetadata.threadName());
-
log.info("Shutdown complete");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 083253c..4c8b146 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -317,7 +317,7 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
boolean shutdownRequested = false;
- boolean assignementErrorFound = false;
+ boolean assignmentErrorFound = false;
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry :
subscriptions.entrySet()) {
final String consumerId = entry.getKey();
@@ -355,12 +355,12 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
final int prevSize = allOwnedPartitions.size();
allOwnedPartitions.addAll(subscription.ownedPartitions());
if (allOwnedPartitions.size() < prevSize +
subscription.ownedPartitions().size()) {
- assignementErrorFound = true;
+ assignmentErrorFound = true;
}
clientMetadata.addPreviousTasksAndOffsetSums(consumerId,
info.taskOffsetSums());
}
- if (assignementErrorFound) {
+ if (assignmentErrorFound) {
log.warn("The previous assignment contains a partition more than
once. " +
"\t Mapping: {}", subscriptions);
}
@@ -380,7 +380,10 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// parse the topology to determine the repartition source topics,
// making sure they are created with the number of partitions as
// the maximum of the depending sub-topologies source topics'
number of partitions
- final Map<TopicPartition, PartitionInfo>
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+ final RepartitionTopics repartitionTopics =
prepareRepartitionTopics(metadata);
+ final Map<TopicPartition, PartitionInfo>
allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();
+ final Map<String, Set<String>> missingUserInputTopicsPerTopology =
repartitionTopics.missingUserInputTopicsPerTopology();
+
final Cluster fullMetadata =
metadata.withPartitions(allRepartitionTopicPartitions);
log.debug("Created repartition topics {} from the parsed
topology.", allRepartitionTopicPartitions.values());
@@ -388,7 +391,8 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
// construct the assignment of tasks to clients
- final Map<Subtopology, TopicsInfo> topicGroups =
taskManager.topologyMetadata().topicGroups();
+ final Map<Subtopology, TopicsInfo> topicGroups =
+
taskManager.topologyMetadata().topicGroups(missingUserInputTopicsPerTopology.keySet());
final Set<String> allSourceTopics = new HashSet<>();
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new
HashMap<>();
@@ -488,12 +492,18 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
}
/**
- * Computes and assembles all repartition topic metadata then creates the
topics if necessary.
+ * Computes and assembles all repartition topic metadata then creates the
topics if necessary. Also verifies
+ * that all user input topics of each topology have been created ahead of
time. If any such source topics are
+ * missing from a NamedTopology, the assignor will skip distributing its
tasks until they have been created
+ * and invoke the exception handler (without killing the thread) once for
each topology to alert the user of
+ * the missing topics.
+ * <p>
+ * For regular applications without named topologies, the assignor will
instead send a shutdown signal to
+ * all clients so the user can identify and resolve the problem.
*
- * @return map from repartition topic to its partition info
+ * @return application metadata such as partition info of repartition
topics, missing external topics, etc
*/
- private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final
Cluster metadata) {
-
+ private RepartitionTopics prepareRepartitionTopics(final Cluster metadata)
{
final RepartitionTopics repartitionTopics = new RepartitionTopics(
taskManager.topologyMetadata(),
internalTopicManager,
@@ -501,8 +511,13 @@ public class StreamsPartitionAssignor implements
ConsumerPartitionAssignor, Conf
metadata,
logPrefix
);
- repartitionTopics.setup();
- return repartitionTopics.topicPartitionsInfo();
+ final boolean isMissingInputTopics = !repartitionTopics.setup();
+ if (isMissingInputTopics) {
+ if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+ throw new MissingSourceTopicException("Missing source
topics.");
+ }
+ }
+ return repartitionTopics;
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index e855962..fb161ca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -59,7 +59,7 @@ public class TopologyMetadata {
// the "__" (double underscore) string is not allowed for topology names,
so it's safe to use to indicate
// that it's not a named topology
- private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+ public static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
private static final Pattern EMPTY_ZERO_LENGTH_PATTERN =
Pattern.compile("");
private final StreamsConfig config;
@@ -436,9 +436,33 @@ public class TopologyMetadata {
return sourceTopics;
}
- public Map<Subtopology, TopicsInfo> topicGroups() {
+ private String getTopologyNameOrElseUnnamed(final String topologyName) {
+ return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
+ }
+
+ /**
+ * @param topologiesToExclude the names of any topologies to exclude from
the returned topic groups,
+ * eg because they have missing source topics
and can't be processed yet
+ */
+ public Map<Subtopology, TopicsInfo> topicGroups(final Set<String>
topologiesToExclude) {
final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
- applyToEachBuilder(b -> topicGroups.putAll(b.topicGroups()));
+ for (final InternalTopologyBuilder builder : builders.values()) {
+ if (!topologiesToExclude.contains(builder.topologyName())) {
+ topicGroups.putAll(builder.topicGroups());
+ }
+ }
+ return topicGroups;
+ }
+
+ /**
+ * @return map from topologies with missing external source topics to
the set of missing topic names,
+ * keyed by topology name or
+ */
+ public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
+ final Map<String, Collection<TopicsInfo>> topicGroups = new
HashMap<>();
+ applyToEachBuilder(
+ b ->
topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()),
b.topicGroups().values())
+ );
return topicGroups;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 1517736..e58ec96 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -52,7 +52,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -84,11 +83,6 @@ public class NamedTopologyIntegrationTest {
private static final String TOPOLOGY_2 = "topology-2";
private static final String TOPOLOGY_3 = "topology-3";
- // TODO KAFKA-12648:
- // 1) full test coverage for add/removeNamedTopology, covering:
- // - the "last topology removed" case
- // - test using multiple clients, with standbys
-
// "standard" input topics which are pre-filled with the
STANDARD_INPUT_DATA
private final static String INPUT_STREAM_1 = "input-stream-1";
private final static String INPUT_STREAM_2 = "input-stream-2";
@@ -107,6 +101,12 @@ public class NamedTopologyIntegrationTest {
private final static String DELAYED_INPUT_STREAM_3 =
"delayed-input-stream-3";
private final static String DELAYED_INPUT_STREAM_4 =
"delayed-input-stream-4";
+ // topic that is not initially created during the test setup
+ private final static String NEW_STREAM = "new-stream";
+
+ // existing topic that is pre-filled but cleared between tests
+ private final static String EXISTING_STREAM = "existing-stream";
+
private final static Materialized<Object, Long, KeyValueStore<Bytes,
byte[]>> IN_MEMORY_STORE =
Materialized.as(Stores.inMemoryKeyValueStore("store"));
private final static Materialized<Object, Long, KeyValueStore<Bytes,
byte[]>> ROCKSDB_STORE =
Materialized.as(Stores.persistentKeyValueStore("store"));
@@ -375,7 +375,6 @@ public class NamedTopologyIntegrationTest {
}
@Test
- @Ignore
public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes()
throws Exception {
setupSecondKafkaStreams();
topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
@@ -388,18 +387,14 @@ public class NamedTopologyIntegrationTest {
streams2.start(topology1Builder2.build());
waitForApplicationState(asList(streams, streams2), State.RUNNING,
Duration.ofSeconds(30));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
final AddNamedTopologyResult result =
streams.addNamedTopology(topology2Builder.build());
final AddNamedTopologyResult result2 =
streams2.addNamedTopology(topology2Builder2.build());
result.all().get();
result2.all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-
- // TODO KAFKA-12648: need to make sure that both instances actually
did some of this processing of topology-2,
- // ie that both joined the group after the new topology was added and
then successfully processed records from it
- // Also: test where we wait for a rebalance between
streams.addNamedTopology and streams2.addNamedTopology,
- // and vice versa, to make sure we hit case where not all new tasks
are initially assigned, and when not all yet known
}
@Test
@@ -552,7 +547,6 @@ public class NamedTopologyIntegrationTest {
}
@Test
- @Ignore
public void
shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning()
throws Exception {
CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
// Build up named topology with two stateful subtopologies
@@ -589,6 +583,58 @@ public class NamedTopologyIntegrationTest {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
+ @Test
+ public void
shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics()
throws Exception {
+ try {
+ CLUSTER.createTopic(EXISTING_STREAM, 2, 1);
+ produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA);
+ setupSecondKafkaStreams();
+ topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+ topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+ streams.start(topology1Builder.build());
+ streams2.start(topology1Builder2.build());
+ waitForApplicationState(asList(streams, streams2), State.RUNNING,
Duration.ofSeconds(30));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+ topology2Builder.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+ topology2Builder2.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+ streams.addNamedTopology(topology2Builder.build());
+ streams2.addNamedTopology(topology2Builder2.build());
+
+ // make sure the original topology can continue processing while
waiting on the new source topics
+ produceToInputTopics(EXISTING_STREAM, singletonList(pair("A",
30L)));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L))));
+
+ CLUSTER.createTopic(NEW_STREAM, 2, 1);
+ produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+ } finally {
+ CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
+ }
+ }
+
+ @Test
+ public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
+ setupSecondKafkaStreams();
+ topology1Builder.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+ topology1Builder2.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+ streams.start(topology1Builder.build());
+ streams2.start(topology1Builder2.build());
+ waitForApplicationState(asList(streams, streams2), State.RUNNING,
Duration.ofSeconds(30));
+
+ try {
+ CLUSTER.createTopic(NEW_STREAM, 2, 1);
+ produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ } finally {
+ CLUSTER.deleteTopicsAndWait(NEW_STREAM);
+ }
+ }
+
private static void produceToInputTopics(final String topic, final
Collection<KeyValue<String, Long>> records) {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index ce94294..57bf60e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -21,12 +21,12 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
import
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import
org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -41,6 +41,7 @@ import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
@@ -51,6 +52,7 @@ import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.niceMock;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -100,9 +102,14 @@ public class RepartitionTopicsTest {
final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer =
mock(CopartitionedTopicsEnforcer.class);
final Cluster clusterMetadata = niceMock(Cluster.class);
+ @Before
+ public void setup() {
+
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
+ expect(internalTopologyBuilder.topologyName()).andStubReturn(null);
+ }
+
@Test
public void shouldSetupRepartitionTopics() {
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1,
SOURCE_TOPIC_NAME2);
@@ -141,8 +148,8 @@ public class RepartitionTopicsTest {
}
@Test
- public void shouldThrowMissingSourceTopicException() {
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
+ public void shouldReturnMissingSourceTopics() {
+ final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
@@ -152,7 +159,7 @@ public class RepartitionTopicsTest {
mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
))
).andReturn(Collections.emptySet());
- setupClusterWithMissingTopics(mkSet(SOURCE_TOPIC_NAME1));
+ setupClusterWithMissingTopics(missingSourceTopics);
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(
new TopologyMetadata(internalTopologyBuilder, config),
@@ -162,14 +169,17 @@ public class RepartitionTopicsTest {
"[test] "
);
- assertThrows(MissingSourceTopicException.class,
repartitionTopics::setup);
+ assertThat(repartitionTopics.setup(), equalTo(false));
+ assertThat(
+ repartitionTopics.missingUserInputTopicsPerTopology(),
+ equalTo(Collections.singletonMap(UNNAMED_TOPOLOGY,
missingSourceTopics))
+ );
}
@Test
public void
shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
{
final RepartitionTopicConfig
repartitionTopicConfigWithoutPartitionCount =
new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT,
TOPIC_CONFIG5);
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
@@ -208,7 +218,6 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -252,7 +261,6 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -307,7 +315,6 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -357,11 +364,8 @@ public class RepartitionTopicsTest {
Collections.emptyMap(),
Collections.emptyMap()
);
-
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
expect(internalTopologyBuilder.topicGroups())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
-
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptySet());
-
expect(internalTopicManager.makeReady(Collections.emptyMap())).andReturn(Collections.emptySet());
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
final RepartitionTopics repartitionTopics = new RepartitionTopics(