This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59d3a56 Minor: Replace InternalTopicMetadata with InternalTopicConfig
(#6886)
59d3a56 is described below
commit 59d3a56740a8c668ff8151b88d00c7adc371a50c
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jun 6 06:58:58 2019 -0700
Minor: Replace InternalTopicMetadata with InternalTopicConfig (#6886)
Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an
InternalTopicMetadata class which wraps an InternalTopicConfig object along
with the number of partitions. But InternalTopicConfig already has a
numPartitions field, so we should just use it directly instead.
Reviewers: Guozhang Wang <[email protected]>, Bruno Cadonna
<[email protected]>, Bill Bejeck <[email protected]>
---
.../processor/internals/InternalTopicConfig.java | 6 +-
.../internals/StreamsPartitionAssignor.java | 66 ++++++++--------------
.../CopartitionedTopicsValidatorTest.java | 47 ++++++++-------
3 files changed, 46 insertions(+), 73 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index aa565e4..9005311 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -26,10 +26,11 @@ import java.util.Objects;
* the internal topics we create for change-logs and repartitioning etc.
*/
public abstract class InternalTopicConfig {
+
final String name;
final Map<String, String> topicConfigs;
- private int numberOfPartitions = -1;
+ private int numberOfPartitions = StreamsPartitionAssignor.UNKNOWN;
InternalTopicConfig(final String name, final Map<String, String>
topicConfigs) {
Objects.requireNonNull(name, "name can't be null");
@@ -53,9 +54,6 @@ public abstract class InternalTopicConfig {
}
public int numberOfPartitions() {
- if (numberOfPartitions == -1) {
- throw new IllegalStateException("Number of partitions not
specified.");
- }
return numberOfPartitions;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 33cce13..714bb0e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -58,7 +58,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
public class StreamsPartitionAssignor implements PartitionAssignor,
Configurable {
- private final static int UNKNOWN = -1;
+ final static int UNKNOWN = -1;
private final static int VERSION_ONE = 1;
private final static int VERSION_TWO = 2;
private final static int VERSION_THREE = 3;
@@ -174,25 +174,6 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
- static class InternalTopicMetadata {
- public final InternalTopicConfig config;
-
- public int numPartitions;
-
- InternalTopicMetadata(final InternalTopicConfig config) {
- this.config = config;
- this.numPartitions = UNKNOWN;
- }
-
- @Override
- public String toString() {
- return "InternalTopicMetadata(" +
- "config=" + config +
- ", numPartitions=" + numPartitions +
- ")";
- }
- }
-
private static final class InternalStreamsConfig extends StreamsConfig {
private InternalStreamsConfig(final Map<?, ?> props) {
super(props, false);
@@ -458,7 +439,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
// the maximum of the depending sub-topologies source topics' number
of partitions
final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups =
taskManager.builder().topicGroups();
- final Map<String, InternalTopicMetadata> repartitionTopicMetadata =
new HashMap<>();
+ final Map<String, InternalTopicConfig> repartitionTopicMetadata = new
HashMap<>();
for (final InternalTopologyBuilder.TopicsInfo topicsInfo :
topicGroups.values()) {
for (final String topic : topicsInfo.sourceTopics) {
if
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
@@ -469,7 +450,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
for (final InternalTopicConfig topic:
topicsInfo.repartitionSourceTopics.values()) {
- repartitionTopicMetadata.put(topic.name(), new
InternalTopicMetadata(topic));
+ repartitionTopicMetadata.put(topic.name(), topic);
}
}
@@ -479,10 +460,10 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
for (final InternalTopologyBuilder.TopicsInfo topicsInfo :
topicGroups.values()) {
for (final String topicName :
topicsInfo.repartitionSourceTopics.keySet()) {
- int numPartitions =
repartitionTopicMetadata.get(topicName).numPartitions;
+ int numPartitions =
repartitionTopicMetadata.get(topicName).numberOfPartitions();
- // try set the number of partitions for this repartition
topic if it is not set yet
if (numPartitions == UNKNOWN) {
+ // try set the number of partitions for this
repartition topic if it is not set yet
for (final InternalTopologyBuilder.TopicsInfo
otherTopicsInfo : topicGroups.values()) {
final Set<String> otherSinkTopics =
otherTopicsInfo.sinkTopics;
@@ -494,7 +475,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
// It is possible the sourceTopic is
another internal topic, i.e,
// map().join().join(map())
if
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
- numPartitionsCandidate =
repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+ numPartitionsCandidate =
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions();
} else {
numPartitionsCandidate =
metadata.partitionCountForTopic(sourceTopicName);
}
@@ -510,7 +491,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
if (numPartitions == UNKNOWN) {
numPartitionsNeeded = true;
} else {
-
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
+
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
}
}
}
@@ -530,9 +511,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
// augment the metadata with the newly computed number of partitions
for all the
// repartition source topics
final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions
= new HashMap<>();
- for (final Map.Entry<String, InternalTopicMetadata> entry :
repartitionTopicMetadata.entrySet()) {
+ for (final Map.Entry<String, InternalTopicConfig> entry :
repartitionTopicMetadata.entrySet()) {
final String topic = entry.getKey();
- final int numPartitions = entry.getValue().numPartitions;
+ final int numPartitions = entry.getValue().numberOfPartitions();
for (int partition = 0; partition < numPartitions; partition++) {
allRepartitionTopicPartitions.put(new TopicPartition(topic,
partition),
@@ -591,7 +572,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
// add tasks to state change log topic subscribers
- final Map<String, InternalTopicMetadata> changelogTopicMetadata = new
HashMap<>();
+ final Map<String, InternalTopicConfig> changelogTopicMetadata = new
HashMap<>();
for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo>
entry : topicGroups.entrySet()) {
final int topicGroupId = entry.getKey();
final Map<String, InternalTopicConfig> stateChangelogTopics =
entry.getValue().stateChangelogTopics;
@@ -605,10 +586,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
numPartitions = task.partition + 1;
}
}
- final InternalTopicMetadata topicMetadata = new
InternalTopicMetadata(topicConfig);
- topicMetadata.numPartitions = numPartitions;
+ topicConfig.setNumberOfPartitions(numPartitions);
- changelogTopicMetadata.put(topicConfig.name(),
topicMetadata);
+ changelogTopicMetadata.put(topicConfig.name(),
topicConfig);
} else {
log.debug("No tasks found for topic group {}",
topicGroupId);
}
@@ -949,17 +929,15 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
*
* @param topicPartitions Map that contains the topic names to be created
with the number of partitions
*/
- private void prepareTopic(final Map<String, InternalTopicMetadata>
topicPartitions) {
+ private void prepareTopic(final Map<String, InternalTopicConfig>
topicPartitions) {
log.debug("Starting to validate internal topics {} in partition
assignor.", topicPartitions);
// first construct the topics to make ready
final Map<String, InternalTopicConfig> topicsToMakeReady = new
HashMap<>();
- for (final InternalTopicMetadata metadata : topicPartitions.values()) {
- final InternalTopicConfig topic = metadata.config;
- final int numPartitions = metadata.numPartitions;
-
- if (numPartitions < 0) {
+ for (final InternalTopicConfig topic : topicPartitions.values()) {
+ final int numPartitions = topic.numberOfPartitions();
+ if (numPartitions == UNKNOWN) {
throw new StreamsException(String.format("%sTopic [%s] number
of partitions not defined", logPrefix, topic.name()));
}
@@ -975,7 +953,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
private void ensureCopartitioning(final Collection<Set<String>>
copartitionGroups,
- final Map<String, InternalTopicMetadata>
allRepartitionTopicsNumPartitions,
+ final Map<String, InternalTopicConfig>
allRepartitionTopicsNumPartitions,
final Cluster metadata) {
for (final Set<String> copartitionGroup : copartitionGroups) {
copartitionedTopicsValidator.validate(copartitionGroup,
allRepartitionTopicsNumPartitions, metadata);
@@ -993,7 +971,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
void validate(final Set<String> copartitionGroup,
- final Map<String, InternalTopicMetadata>
allRepartitionTopicsNumPartitions,
+ final Map<String, InternalTopicConfig>
allRepartitionTopicsNumPartitions,
final Cluster metadata) {
int numPartitions = UNKNOWN;
@@ -1019,9 +997,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
// if all topics for this co-partition group is repartition topics,
// then set the number of partitions to be the maximum of the
number of partitions.
if (numPartitions == UNKNOWN) {
- for (final Map.Entry<String, InternalTopicMetadata> entry:
allRepartitionTopicsNumPartitions.entrySet()) {
+ for (final Map.Entry<String, InternalTopicConfig> entry:
allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
- final int partitions = entry.getValue().numPartitions;
+ final int partitions =
entry.getValue().numberOfPartitions();
if (partitions > numPartitions) {
numPartitions = partitions;
}
@@ -1029,9 +1007,9 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
// enforce co-partitioning restrictions to repartition topics by
updating their number of partitions
- for (final Map.Entry<String, InternalTopicMetadata> entry :
allRepartitionTopicsNumPartitions.entrySet()) {
+ for (final Map.Entry<String, InternalTopicConfig> entry :
allRepartitionTopicsNumPartitions.entrySet()) {
if (copartitionGroup.contains(entry.getKey())) {
- entry.getValue().numPartitions = numPartitions;
+ entry.getValue().setNumberOfPartitions(numPartitions);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index d6119fc..a272aa5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -72,49 +72,46 @@ public class CopartitionedTopicsValidatorTest {
@Test
public void shouldEnforceCopartitioningOnRepartitionTopics() {
- final StreamsPartitionAssignor.InternalTopicMetadata metadata =
createTopicMetadata("repartitioned", 10);
+ final InternalTopicConfig config = createTopicConfig("repartitioned",
10);
- validator.validate(Utils.mkSet("first", "second",
metadata.config.name()),
- Collections.singletonMap(metadata.config.name(),
- metadata),
+ validator.validate(Utils.mkSet("first", "second", config.name()),
+ Collections.singletonMap(config.name(), config),
cluster.withPartitions(partitions));
- assertThat(metadata.numPartitions, equalTo(2));
+ assertThat(config.numberOfPartitions(), equalTo(2));
}
@Test
public void
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
- final StreamsPartitionAssignor.InternalTopicMetadata one =
createTopicMetadata("one", 1);
- final StreamsPartitionAssignor.InternalTopicMetadata two =
createTopicMetadata("two", 15);
- final StreamsPartitionAssignor.InternalTopicMetadata three =
createTopicMetadata("three", 5);
- final Map<String, StreamsPartitionAssignor.InternalTopicMetadata>
repartitionTopicConfig = new HashMap<>();
-
- repartitionTopicConfig.put(one.config.name(), one);
- repartitionTopicConfig.put(two.config.name(), two);
- repartitionTopicConfig.put(three.config.name(), three);
-
- validator.validate(Utils.mkSet(one.config.name(),
- two.config.name(),
- three.config.name()),
+ final InternalTopicConfig one = createTopicConfig("one", 1);
+ final InternalTopicConfig two = createTopicConfig("two", 15);
+ final InternalTopicConfig three = createTopicConfig("three", 5);
+ final Map<String, InternalTopicConfig> repartitionTopicConfig = new
HashMap<>();
+
+ repartitionTopicConfig.put(one.name(), one);
+ repartitionTopicConfig.put(two.name(), two);
+ repartitionTopicConfig.put(three.name(), three);
+
+ validator.validate(Utils.mkSet(one.name(),
+ two.name(),
+ three.name()),
repartitionTopicConfig,
cluster
);
- assertThat(one.numPartitions, equalTo(15));
- assertThat(two.numPartitions, equalTo(15));
- assertThat(three.numPartitions, equalTo(15));
+ assertThat(one.numberOfPartitions(), equalTo(15));
+ assertThat(two.numberOfPartitions(), equalTo(15));
+ assertThat(three.numberOfPartitions(), equalTo(15));
}
- private StreamsPartitionAssignor.InternalTopicMetadata
createTopicMetadata(final String repartitionTopic,
+ private InternalTopicConfig createTopicConfig(final String
repartitionTopic,
final int partitions) {
final InternalTopicConfig repartitionTopicConfig =
new RepartitionTopicConfig(repartitionTopic,
Collections.emptyMap());
- final StreamsPartitionAssignor.InternalTopicMetadata metadata =
- new
StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
- metadata.numPartitions = partitions;
- return metadata;
+ repartitionTopicConfig.setNumberOfPartitions(partitions);
+ return repartitionTopicConfig;
}
}
\ No newline at end of file