This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 0cb3393 MINOR: Fixed log in Topology Builder. (#5477)
0cb3393 is described below
commit 0cb3393043688058bccf1b979701b1469661d395
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Aug 9 09:52:36 2018 +0530
MINOR: Fixed log in Topology Builder. (#5477)
- fix log statement in Topology Builder.
- addressed some warnings shown by Intellij
Reviewers: Viktor Somogyi <[email protected]>, Satish Duggana
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/processor/TaskId.java | 26 +++++++++-------------
.../internals/InternalTopologyBuilder.java | 14 ++++++------
2 files changed, 18 insertions(+), 22 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index f4c9ce0..01feb77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -43,19 +43,19 @@ public class TaskId implements Comparable<TaskId> {
}
/**
- * @throws TaskIdFormatException if the string is not a valid {@link
TaskId}
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link
TaskId}
*/
- public static TaskId parse(String string) {
- int index = string.indexOf('_');
- if (index <= 0 || index + 1 >= string.length()) throw new
TaskIdFormatException(string);
+ public static TaskId parse(final String taskIdStr) {
+ final int index = taskIdStr.indexOf('_');
+ if (index <= 0 || index + 1 >= taskIdStr.length()) throw new
TaskIdFormatException(taskIdStr);
try {
- int topicGroupId = Integer.parseInt(string.substring(0, index));
- int partition = Integer.parseInt(string.substring(index + 1));
+ final int topicGroupId = Integer.parseInt(taskIdStr.substring(0,
index));
+ final int partition = Integer.parseInt(taskIdStr.substring(index +
1));
return new TaskId(topicGroupId, partition);
- } catch (Exception e) {
- throw new TaskIdFormatException(string);
+ } catch (final Exception e) {
+ throw new TaskIdFormatException(taskIdStr);
}
}
@@ -103,12 +103,8 @@ public class TaskId implements Comparable<TaskId> {
}
@Override
- public int compareTo(TaskId other) {
- return
- this.topicGroupId < other.topicGroupId ? -1 :
- (this.topicGroupId > other.topicGroupId ? 1 :
- (this.partition < other.partition ? -1 :
- (this.partition > other.partition ? 1 :
- 0)));
+ public int compareTo(final TaskId other) {
+ final int compare = Integer.compare(this.topicGroupId,
other.topicGroupId);
+ return compare != 0 ? compare : Integer.compare(this.partition,
other.partition);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 5b4b4d73..73db937 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1027,7 +1027,7 @@ public class InternalTopologyBuilder {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the
application id
final String internalTopic = decorateTopic(topic);
- repartitionTopics.put(internalTopic, new
RepartitionTopicConfig(internalTopic, Collections.<String, String>emptyMap()));
+ repartitionTopics.put(internalTopic, new
RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
sourceTopics.add(internalTopic);
} else {
sourceTopics.add(topic);
@@ -1353,7 +1353,7 @@ public class InternalTopologyBuilder {
description.addSubtopology(new Subtopology(
subtopologyId,
- new HashSet<TopologyDescription.Node>(nodesByName.values())));
+ new HashSet<>(nodesByName.values())));
}
public final static class GlobalStore implements
TopologyDescription.GlobalStore {
@@ -1744,10 +1744,10 @@ public class InternalTopologyBuilder {
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("Topologies:\n ");
- final TopologyDescription.Subtopology[] sortedSubtopologies =
- subtopologies.descendingSet().toArray(new
TopologyDescription.Subtopology[subtopologies.size()]);
- final TopologyDescription.GlobalStore[] sortedGlobalStores =
- globalStores.descendingSet().toArray(new
TopologyDescription.GlobalStore[globalStores.size()]);
+ final TopologyDescription.Subtopology[] sortedSubtopologies =
+ subtopologies.descendingSet().toArray(new Subtopology[0]);
+ final TopologyDescription.GlobalStore[] sortedGlobalStores =
+ globalStores.descendingSet().toArray(new GlobalStore[0]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
@@ -1848,7 +1848,7 @@ public class InternalTopologyBuilder {
public void updateSubscribedTopics(final Set<String> topics, final String
logPrefix) {
final SubscriptionUpdates subscriptionUpdates = new
SubscriptionUpdates();
- log.debug("{}found {} topics possibly matching regex", topics,
logPrefix);
+ log.debug("{}found {} topics possibly matching regex", logPrefix,
topics);
// update the topic groups with the returned subscription set for
regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
updateSubscriptions(subscriptionUpdates, logPrefix);