This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 3a793b094cd MINOR: only log error when rack aware assignment is
enabled (#14415)
3a793b094cd is described below
commit 3a793b094cd612b932d2c2b5d38c9f3204583abc
Author: Hao Li <[email protected]>
AuthorDate: Fri Sep 29 10:16:29 2023 -0700
MINOR: only log error when rack aware assignment is enabled (#14415)
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../processor/internals/assignment/RackAwareTaskAssignor.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java
index 0c8f9492a3e..18e3d78d4d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java
@@ -212,8 +212,11 @@ public class RackAwareTaskAssignor {
KeyValue<String, String> previousRackInfo = null;
for (final Map.Entry<String, Optional<String>> rackEntry :
entry.getValue().entrySet()) {
if (!rackEntry.getValue().isPresent()) {
- log.error(String.format("RackId doesn't exist for process
%s and consumer %s",
- processId, rackEntry.getKey()));
+ if
(!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
{
+ log.error(
+ String.format("RackId doesn't exist for process %s
and consumer %s",
+ processId, rackEntry.getKey()));
+ }
return false;
}
if (previousRackInfo == null) {
@@ -232,7 +235,9 @@ public class RackAwareTaskAssignor {
}
}
if (previousRackInfo == null) {
- log.error(String.format("RackId doesn't exist for process %s",
processId));
+ if
(!StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy))
{
+ log.error(String.format("RackId doesn't exist for process
%s", processId));
+ }
return false;
}
racksForProcess.put(entry.getKey(), previousRackInfo.value);