This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 487b954 MINOR: internal config objects should not be logged (#5389)
487b954 is described below
commit 487b95454233e981a65c184c7f3b86adf34058ef
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jul 24 19:33:55 2018 -0700
MINOR: internal config objects should not be logged (#5389)
Reviewers: Guozhang Wang <[email protected]>, Bill Bejeck
<[email protected]>
---
.../kafka/clients/admin/AdminClientConfig.java | 6 ++++-
.../org/apache/kafka/streams/StreamsConfig.java | 7 +++++-
.../processor/internals/InternalTopicManager.java | 18 +++++++-------
.../internals/StreamsPartitionAssignor.java | 28 ++++++++++++----------
4 files changed, 35 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index b5ca15a..058c491 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -174,7 +174,11 @@ public class AdminClientConfig extends AbstractConfig {
}
public AdminClientConfig(Map<?, ?> props) {
- super(CONFIG, props);
+ this(props, false);
+ }
+
+ protected AdminClientConfig(Map<?, ?> props, boolean doLog) {
+ super(CONFIG, props, doLog);
}
public static Set<String> configNames() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d1f7d93..54fcbc0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -830,7 +830,12 @@ public class StreamsConfig extends AbstractConfig {
* @param props properties that specify Kafka Streams and internal
consumer/producer configuration
*/
public StreamsConfig(final Map<?, ?> props) {
- super(CONFIG, props);
+ this(props, true);
+ }
+
+ protected StreamsConfig(final Map<?, ?> props,
+ final boolean doLog) {
+ super(CONFIG, props, doLog);
eosEnabled =
EXACTLY_ONCE.equals(getString(PROCESSING_GUARANTEE_CONFIG));
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index 2c2df04..6159ee2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -35,7 +35,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -44,6 +43,12 @@ public class InternalTopicManager {
private final static String INTERRUPTED_ERROR_MESSAGE = "Thread got
interrupted. This indicates a bug. " +
"Please report at https://issues.apache.org/jira/projects/KAFKA or
dev-mailing list (https://kafka.apache.org/contact).";
+ private static final class InternalAdminClientConfig extends
AdminClientConfig {
+ private InternalAdminClientConfig(final Map<?, ?> props) {
+ super(props, false);
+ }
+ }
+
private final Logger log;
private final long windowChangeLogAdditionalRetention;
private final Map<String, String> defaultTopicConfigs = new HashMap<>();
@@ -57,12 +62,12 @@ public class InternalTopicManager {
final StreamsConfig streamsConfig) {
this.adminClient = adminClient;
- LogContext logContext = new LogContext(String.format("stream-thread
[%s] ", Thread.currentThread().getName()));
+ final LogContext logContext = new
LogContext(String.format("stream-thread [%s] ",
Thread.currentThread().getName()));
log = logContext.logger(getClass());
replicationFactor =
streamsConfig.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG).shortValue();
windowChangeLogAdditionalRetention =
streamsConfig.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG);
- retries = new
AdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
+ retries = new
InternalAdminClientConfig(streamsConfig.getAdminConfigs("dummy")).getInt(AdminClientConfig.RETRIES_CONFIG);
log.debug("Configs:" + Utils.NL,
"\t{} = {}" + Utils.NL,
@@ -146,12 +151,7 @@ public class InternalTopicManager {
}
if (retry) {
- final Iterator<NewTopic> it = newTopics.iterator();
- while (it.hasNext()) {
- if (createTopicNames.contains(it.next().name())) {
- it.remove();
- }
- }
+ newTopics.removeIf(newTopic ->
createTopicNames.contains(newTopic.name()));
continue;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index d81d4f1..a56fb28 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -64,7 +64,6 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
private final static int VERSION_THREE = 3;
private final static int VERSION_FOUR = 4;
private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE;
- private int minReceivedMetadataVersion = UNKNOWN;
protected Set<Integer> supportedVersions = new HashSet<>();
private Logger log;
@@ -194,17 +193,19 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
}
}
- protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
new Comparator<TopicPartition>() {
- @Override
- public int compare(final TopicPartition p1,
- final TopicPartition p2) {
- final int result = p1.topic().compareTo(p2.topic());
+ private static final class InternalStreamsConfig extends StreamsConfig {
+ private InternalStreamsConfig(final Map<?, ?> props) {
+ super(props, false);
+ }
+ }
- if (result != 0) {
- return result;
- } else {
- return Integer.compare(p1.partition(), p2.partition());
- }
+ protected static final Comparator<TopicPartition> PARTITION_COMPARATOR =
(p1, p2) -> {
+ final int result = p1.topic().compareTo(p2.topic());
+
+ if (result != 0) {
+ return result;
+ } else {
+ return Integer.compare(p1.partition(), p2.partition());
}
};
@@ -236,7 +237,7 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
*/
@Override
public void configure(final Map<String, ?> configs) {
- final StreamsConfig streamsConfig = new StreamsConfig(configs);
+ final StreamsConfig streamsConfig = new InternalStreamsConfig(configs);
// Setting the logger with the passed in client thread name
logPrefix = String.format("stream-thread [%s] ",
streamsConfig.getString(CommonClientConfigs.CLIENT_ID_CONFIG));
@@ -394,7 +395,8 @@ public class StreamsPartitionAssignor implements
PartitionAssignor, Configurable
final Map<UUID, ClientMetadata> clientsMetadata = new HashMap<>();
final Set<String> futureConsumers = new HashSet<>();
- minReceivedMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+ int minReceivedMetadataVersion =
SubscriptionInfo.LATEST_SUPPORTED_VERSION;
+
supportedVersions.clear();
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry :
subscriptions.entrySet()) {