This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59d3a56  Minor: Replace InternalTopicMetadata with InternalTopicConfig 
(#6886)
59d3a56 is described below

commit 59d3a56740a8c668ff8151b88d00c7adc371a50c
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Thu Jun 6 06:58:58 2019 -0700

    Minor: Replace InternalTopicMetadata with InternalTopicConfig (#6886)
    
    Quick tech debt cleanup. For some reason StreamsPartitionAssignor uses an 
InternalTopicMetadata class which wraps an InternalTopicConfig object along 
with the number of partitions. But InternalTopicConfig already has a 
numPartitions field, so we should just use it directly instead.
    
    Reviewers: Guozhang Wang <[email protected]>, Bruno Cadonna 
<[email protected]>,  Bill Bejeck <[email protected]>
---
 .../processor/internals/InternalTopicConfig.java   |  6 +-
 .../internals/StreamsPartitionAssignor.java        | 66 ++++++++--------------
 .../CopartitionedTopicsValidatorTest.java          | 47 ++++++++-------
 3 files changed, 46 insertions(+), 73 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
index aa565e4..9005311 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicConfig.java
@@ -26,10 +26,11 @@ import java.util.Objects;
  * the internal topics we create for change-logs and repartitioning etc.
  */
 public abstract class InternalTopicConfig {
+
     final String name;
     final Map<String, String> topicConfigs;
 
-    private int numberOfPartitions = -1;
+    private int numberOfPartitions = StreamsPartitionAssignor.UNKNOWN;
 
     InternalTopicConfig(final String name, final Map<String, String> 
topicConfigs) {
         Objects.requireNonNull(name, "name can't be null");
@@ -53,9 +54,6 @@ public abstract class InternalTopicConfig {
     }
 
     public int numberOfPartitions() {
-        if (numberOfPartitions == -1) {
-            throw new IllegalStateException("Number of partitions not 
specified.");
-        }
         return numberOfPartitions;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 33cce13..714bb0e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -58,7 +58,7 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 
 public class StreamsPartitionAssignor implements PartitionAssignor, 
Configurable {
 
-    private final static int UNKNOWN = -1;
+    final static int UNKNOWN = -1;
     private final static int VERSION_ONE = 1;
     private final static int VERSION_TWO = 2;
     private final static int VERSION_THREE = 3;
@@ -174,25 +174,6 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         }
     }
 
-    static class InternalTopicMetadata {
-        public final InternalTopicConfig config;
-
-        public int numPartitions;
-
-        InternalTopicMetadata(final InternalTopicConfig config) {
-            this.config = config;
-            this.numPartitions = UNKNOWN;
-        }
-
-        @Override
-        public String toString() {
-            return "InternalTopicMetadata(" +
-                    "config=" + config +
-                    ", numPartitions=" + numPartitions +
-                    ")";
-        }
-    }
-
     private static final class InternalStreamsConfig extends StreamsConfig {
         private InternalStreamsConfig(final Map<?, ?> props) {
             super(props, false);
@@ -458,7 +439,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         // the maximum of the depending sub-topologies source topics' number 
of partitions
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = 
taskManager.builder().topicGroups();
 
-        final Map<String, InternalTopicMetadata> repartitionTopicMetadata = 
new HashMap<>();
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata = new 
HashMap<>();
         for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
             for (final String topic : topicsInfo.sourceTopics) {
                 if 
(!topicsInfo.repartitionSourceTopics.keySet().contains(topic) &&
@@ -469,7 +450,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 }
             }
             for (final InternalTopicConfig topic: 
topicsInfo.repartitionSourceTopics.values()) {
-                repartitionTopicMetadata.put(topic.name(), new 
InternalTopicMetadata(topic));
+                repartitionTopicMetadata.put(topic.name(), topic);
             }
         }
 
@@ -479,10 +460,10 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
 
             for (final InternalTopologyBuilder.TopicsInfo topicsInfo : 
topicGroups.values()) {
                 for (final String topicName : 
topicsInfo.repartitionSourceTopics.keySet()) {
-                    int numPartitions = 
repartitionTopicMetadata.get(topicName).numPartitions;
+                    int numPartitions = 
repartitionTopicMetadata.get(topicName).numberOfPartitions();
 
-                    // try set the number of partitions for this repartition 
topic if it is not set yet
                     if (numPartitions == UNKNOWN) {
+                        // try set the number of partitions for this 
repartition topic if it is not set yet
                         for (final InternalTopologyBuilder.TopicsInfo 
otherTopicsInfo : topicGroups.values()) {
                             final Set<String> otherSinkTopics = 
otherTopicsInfo.sinkTopics;
 
@@ -494,7 +475,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                                     // It is possible the sourceTopic is 
another internal topic, i.e,
                                     // map().join().join(map())
                                     if 
(repartitionTopicMetadata.containsKey(sourceTopicName)) {
-                                        numPartitionsCandidate = 
repartitionTopicMetadata.get(sourceTopicName).numPartitions;
+                                        numPartitionsCandidate = 
repartitionTopicMetadata.get(sourceTopicName).numberOfPartitions();
                                     } else {
                                         numPartitionsCandidate = 
metadata.partitionCountForTopic(sourceTopicName);
                                     }
@@ -510,7 +491,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                         if (numPartitions == UNKNOWN) {
                             numPartitionsNeeded = true;
                         } else {
-                            
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
+                            
repartitionTopicMetadata.get(topicName).setNumberOfPartitions(numPartitions);
                         }
                     }
                 }
@@ -530,9 +511,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         // augment the metadata with the newly computed number of partitions 
for all the
         // repartition source topics
         final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions 
= new HashMap<>();
-        for (final Map.Entry<String, InternalTopicMetadata> entry : 
repartitionTopicMetadata.entrySet()) {
+        for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
             final String topic = entry.getKey();
-            final int numPartitions = entry.getValue().numPartitions;
+            final int numPartitions = entry.getValue().numberOfPartitions();
 
             for (int partition = 0; partition < numPartitions; partition++) {
                 allRepartitionTopicPartitions.put(new TopicPartition(topic, 
partition),
@@ -591,7 +572,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         }
 
         // add tasks to state change log topic subscribers
-        final Map<String, InternalTopicMetadata> changelogTopicMetadata = new 
HashMap<>();
+        final Map<String, InternalTopicConfig> changelogTopicMetadata = new 
HashMap<>();
         for (final Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> 
entry : topicGroups.entrySet()) {
             final int topicGroupId = entry.getKey();
             final Map<String, InternalTopicConfig> stateChangelogTopics = 
entry.getValue().stateChangelogTopics;
@@ -605,10 +586,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                             numPartitions = task.partition + 1;
                         }
                     }
-                    final InternalTopicMetadata topicMetadata = new 
InternalTopicMetadata(topicConfig);
-                    topicMetadata.numPartitions = numPartitions;
+                    topicConfig.setNumberOfPartitions(numPartitions);
 
-                    changelogTopicMetadata.put(topicConfig.name(), 
topicMetadata);
+                    changelogTopicMetadata.put(topicConfig.name(), 
topicConfig);
                 } else {
                     log.debug("No tasks found for topic group {}", 
topicGroupId);
                 }
@@ -949,17 +929,15 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
      *
      * @param topicPartitions Map that contains the topic names to be created 
with the number of partitions
      */
-    private void prepareTopic(final Map<String, InternalTopicMetadata> 
topicPartitions) {
+    private void prepareTopic(final Map<String, InternalTopicConfig> 
topicPartitions) {
         log.debug("Starting to validate internal topics {} in partition 
assignor.", topicPartitions);
 
         // first construct the topics to make ready
         final Map<String, InternalTopicConfig> topicsToMakeReady = new 
HashMap<>();
 
-        for (final InternalTopicMetadata metadata : topicPartitions.values()) {
-            final InternalTopicConfig topic = metadata.config;
-            final int numPartitions = metadata.numPartitions;
-
-            if (numPartitions < 0) {
+        for (final InternalTopicConfig topic : topicPartitions.values()) {
+            final int numPartitions = topic.numberOfPartitions();
+            if (numPartitions == UNKNOWN) {
                 throw new StreamsException(String.format("%sTopic [%s] number 
of partitions not defined", logPrefix, topic.name()));
             }
 
@@ -975,7 +953,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
     }
 
     private void ensureCopartitioning(final Collection<Set<String>> 
copartitionGroups,
-                                      final Map<String, InternalTopicMetadata> 
allRepartitionTopicsNumPartitions,
+                                      final Map<String, InternalTopicConfig> 
allRepartitionTopicsNumPartitions,
                                       final Cluster metadata) {
         for (final Set<String> copartitionGroup : copartitionGroups) {
             copartitionedTopicsValidator.validate(copartitionGroup, 
allRepartitionTopicsNumPartitions, metadata);
@@ -993,7 +971,7 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
         }
 
         void validate(final Set<String> copartitionGroup,
-                      final Map<String, InternalTopicMetadata> 
allRepartitionTopicsNumPartitions,
+                      final Map<String, InternalTopicConfig> 
allRepartitionTopicsNumPartitions,
                       final Cluster metadata) {
             int numPartitions = UNKNOWN;
 
@@ -1019,9 +997,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
             // if all topics for this co-partition group is repartition topics,
             // then set the number of partitions to be the maximum of the 
number of partitions.
             if (numPartitions == UNKNOWN) {
-                for (final Map.Entry<String, InternalTopicMetadata> entry: 
allRepartitionTopicsNumPartitions.entrySet()) {
+                for (final Map.Entry<String, InternalTopicConfig> entry: 
allRepartitionTopicsNumPartitions.entrySet()) {
                     if (copartitionGroup.contains(entry.getKey())) {
-                        final int partitions = entry.getValue().numPartitions;
+                        final int partitions = 
entry.getValue().numberOfPartitions();
                         if (partitions > numPartitions) {
                             numPartitions = partitions;
                         }
@@ -1029,9 +1007,9 @@ public class StreamsPartitionAssignor implements 
PartitionAssignor, Configurable
                 }
             }
             // enforce co-partitioning restrictions to repartition topics by 
updating their number of partitions
-            for (final Map.Entry<String, InternalTopicMetadata> entry : 
allRepartitionTopicsNumPartitions.entrySet()) {
+            for (final Map.Entry<String, InternalTopicConfig> entry : 
allRepartitionTopicsNumPartitions.entrySet()) {
                 if (copartitionGroup.contains(entry.getKey())) {
-                    entry.getValue().numPartitions = numPartitions;
+                    entry.getValue().setNumberOfPartitions(numPartitions);
                 }
             }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index d6119fc..a272aa5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
@@ -72,49 +72,46 @@ public class CopartitionedTopicsValidatorTest {
 
     @Test
     public void shouldEnforceCopartitioningOnRepartitionTopics() {
-        final StreamsPartitionAssignor.InternalTopicMetadata metadata = 
createTopicMetadata("repartitioned", 10);
+        final InternalTopicConfig config = createTopicConfig("repartitioned", 
10);
 
-        validator.validate(Utils.mkSet("first", "second", 
metadata.config.name()),
-                           Collections.singletonMap(metadata.config.name(),
-                                                    metadata),
+        validator.validate(Utils.mkSet("first", "second", config.name()),
+                           Collections.singletonMap(config.name(), config),
                            cluster.withPartitions(partitions));
 
-        assertThat(metadata.numPartitions, equalTo(2));
+        assertThat(config.numberOfPartitions(), equalTo(2));
     }
 
 
     @Test
     public void 
shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartitionTopics() {
-        final StreamsPartitionAssignor.InternalTopicMetadata one = 
createTopicMetadata("one", 1);
-        final StreamsPartitionAssignor.InternalTopicMetadata two = 
createTopicMetadata("two", 15);
-        final StreamsPartitionAssignor.InternalTopicMetadata three = 
createTopicMetadata("three", 5);
-        final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> 
repartitionTopicConfig = new HashMap<>();
-
-        repartitionTopicConfig.put(one.config.name(), one);
-        repartitionTopicConfig.put(two.config.name(), two);
-        repartitionTopicConfig.put(three.config.name(), three);
-
-        validator.validate(Utils.mkSet(one.config.name(),
-                                       two.config.name(),
-                                       three.config.name()),
+        final InternalTopicConfig one = createTopicConfig("one", 1);
+        final InternalTopicConfig two = createTopicConfig("two", 15);
+        final InternalTopicConfig three = createTopicConfig("three", 5);
+        final Map<String, InternalTopicConfig> repartitionTopicConfig = new 
HashMap<>();
+
+        repartitionTopicConfig.put(one.name(), one);
+        repartitionTopicConfig.put(two.name(), two);
+        repartitionTopicConfig.put(three.name(), three);
+
+        validator.validate(Utils.mkSet(one.name(),
+                                       two.name(),
+                                       three.name()),
                            repartitionTopicConfig,
                            cluster
         );
 
-        assertThat(one.numPartitions, equalTo(15));
-        assertThat(two.numPartitions, equalTo(15));
-        assertThat(three.numPartitions, equalTo(15));
+        assertThat(one.numberOfPartitions(), equalTo(15));
+        assertThat(two.numberOfPartitions(), equalTo(15));
+        assertThat(three.numberOfPartitions(), equalTo(15));
     }
 
-    private StreamsPartitionAssignor.InternalTopicMetadata 
createTopicMetadata(final String repartitionTopic,
+    private InternalTopicConfig createTopicConfig(final String 
repartitionTopic,
                                                                                
final int partitions) {
         final InternalTopicConfig repartitionTopicConfig =
             new RepartitionTopicConfig(repartitionTopic, 
Collections.emptyMap());
 
-        final StreamsPartitionAssignor.InternalTopicMetadata metadata =
-            new 
StreamsPartitionAssignor.InternalTopicMetadata(repartitionTopicConfig);
-        metadata.numPartitions = partitions;
-        return metadata;
+        repartitionTopicConfig.setNumberOfPartitions(partitions);
+        return repartitionTopicConfig;
     }
 
 }
\ No newline at end of file

Reply via email to