This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13a7544 MINOR: Fixed log in Topology Builder. (#5477)
13a7544 is described below
commit 13a7544418a139d3b2f3891da796cfa33065cbec
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Thu Aug 9 09:52:36 2018 +0530
MINOR: Fixed log in Topology Builder. (#5477)
- fix log statement in Topology Builder.
- addressed some warnings shown by Intellij
Reviewers: Viktor Somogyi <[email protected]>, Satish Duggana
<[email protected]>, Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/processor/TaskId.java | 22 +++++++++-------------
.../internals/InternalTopologyBuilder.java | 10 +++++-----
2 files changed, 14 insertions(+), 18 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
index 44550ae..79919a6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
@@ -43,19 +43,19 @@ public class TaskId implements Comparable<TaskId> {
}
/**
- * @throws TaskIdFormatException if the string is not a valid {@link
TaskId}
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link
TaskId}
*/
- public static TaskId parse(final String string) {
- final int index = string.indexOf('_');
- if (index <= 0 || index + 1 >= string.length()) throw new
TaskIdFormatException(string);
+ public static TaskId parse(final String taskIdStr) {
+ final int index = taskIdStr.indexOf('_');
+ if (index <= 0 || index + 1 >= taskIdStr.length()) throw new
TaskIdFormatException(taskIdStr);
try {
- final int topicGroupId = Integer.parseInt(string.substring(0,
index));
- final int partition = Integer.parseInt(string.substring(index +
1));
+ final int topicGroupId = Integer.parseInt(taskIdStr.substring(0,
index));
+ final int partition = Integer.parseInt(taskIdStr.substring(index +
1));
return new TaskId(topicGroupId, partition);
} catch (final Exception e) {
- throw new TaskIdFormatException(string);
+ throw new TaskIdFormatException(taskIdStr);
}
}
@@ -104,11 +104,7 @@ public class TaskId implements Comparable<TaskId> {
@Override
public int compareTo(final TaskId other) {
- return
- this.topicGroupId < other.topicGroupId ? -1 :
- (this.topicGroupId > other.topicGroupId ? 1 :
- (this.partition < other.partition ? -1 :
- (this.partition > other.partition ? 1 :
- 0)));
+ final int compare = Integer.compare(this.topicGroupId,
other.topicGroupId);
+ return compare != 0 ? compare : Integer.compare(this.partition,
other.partition);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 250105a..b7e4303 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -998,7 +998,7 @@ public class InternalTopologyBuilder {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the
application id
final String internalTopic = decorateTopic(topic);
- repartitionTopics.put(internalTopic, new
RepartitionTopicConfig(internalTopic, Collections.<String, String>emptyMap()));
+ repartitionTopics.put(internalTopic, new
RepartitionTopicConfig(internalTopic, Collections.emptyMap()));
sourceTopics.add(internalTopic);
} else {
sourceTopics.add(topic);
@@ -1324,7 +1324,7 @@ public class InternalTopologyBuilder {
description.addSubtopology(new Subtopology(
subtopologyId,
- new HashSet<TopologyDescription.Node>(nodesByName.values())));
+ new HashSet<>(nodesByName.values())));
}
public final static class GlobalStore implements
TopologyDescription.GlobalStore {
@@ -1716,9 +1716,9 @@ public class InternalTopologyBuilder {
final StringBuilder sb = new StringBuilder();
sb.append("Topologies:\n ");
final TopologyDescription.Subtopology[] sortedSubtopologies =
- subtopologies.descendingSet().toArray(new
TopologyDescription.Subtopology[subtopologies.size()]);
+ subtopologies.descendingSet().toArray(new Subtopology[0]);
final TopologyDescription.GlobalStore[] sortedGlobalStores =
- globalStores.descendingSet().toArray(new
TopologyDescription.GlobalStore[globalStores.size()]);
+ globalStores.descendingSet().toArray(new GlobalStore[0]);
int expectedId = 0;
int subtopologiesIndex = sortedSubtopologies.length - 1;
int globalStoresIndex = sortedGlobalStores.length - 1;
@@ -1819,7 +1819,7 @@ public class InternalTopologyBuilder {
public void updateSubscribedTopics(final Set<String> topics, final String
logPrefix) {
final SubscriptionUpdates subscriptionUpdates = new
SubscriptionUpdates();
- log.debug("{}found {} topics possibly matching regex", topics,
logPrefix);
+ log.debug("{}found {} topics possibly matching regex", logPrefix,
topics);
// update the topic groups with the returned subscription set for
regex pattern subscriptions
subscriptionUpdates.updateTopics(topics);
updateSubscriptions(subscriptionUpdates, logPrefix);