This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit b2e6b80df6e78287cf0dab58b16ee7f77bf836a3 Author: Lucas Brutschy <[email protected]> AuthorDate: Mon May 27 23:01:53 2024 +0200 Resolve conflict from 11/25 trunk rebase - Create and inject StreamsHeartbeatRequestManager Find a way to inject the StreamsHeartbeatRequestManager into AsyncKafkaConsumer from Kafka Streams Also trigger assignment logic in Kafka Streams See https://github.com/lucasbru/kafka/pull/12 --- .../consumer/internals/RequestManagers.java | 11 +- .../internals/StreamsAssignmentInterface.java | 138 +++-- .../internals/StreamsHeartbeatRequestManager.java | 579 ++++++++++++++++++++- .../StreamsInstallAssignmentRequestManager.java | 2 +- .../StreamsPrepareAssignmentRequestManager.java | 2 +- ...ava => StreamsGroupUninitializedException.java} | 4 +- ...java => StreamsInvalidAssignmentException.java} | 4 +- .../org/apache/kafka/common/protocol/Errors.java | 10 +- .../common/requests/StreamsHeartbeatResponse.java | 14 +- .../common/requests/StreamsInitializeRequest.java | 17 +- .../common/requests/StreamsInitializeResponse.java | 16 + .../requests/StreamsInstallAssignmentRequest.java | 18 +- .../requests/StreamsInstallAssignmentResponse.java | 19 +- .../requests/StreamsPrepareAssignmentRequest.java | 18 +- .../requests/StreamsPrepareAssignmentResponse.java | 19 +- .../common/message/StreamsHeartbeatRequest.json | 6 +- .../common/message/StreamsHeartbeatResponse.json | 14 +- .../StreamsHeartbeatRequestManagerTest.java | 408 +++++++++++++++ .../internals/DefaultKafkaClientSupplier.java | 6 +- .../streams/processor/internals/StreamThread.java | 150 +++++- .../internals/StreamsRebalanceListener.java | 3 + .../processor/internals/StreamThreadTest.java | 50 +- 22 files changed, 1390 insertions(+), 118 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index 65dec12140b..f370cf15c67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -262,9 +262,18 @@ public class RequestManagers implements Closeable { streamsInstallAssignmentRequestManager = new StreamsInstallAssignmentRequestManager( streamsInstanceMetadata.get()); streamsHeartbeatRequestManager = new StreamsHeartbeatRequestManager( + logContext, + time, + config, + coordinator, streamsInitializeRequestManager, streamsPrepareAssignmentRequestManager, - streamsInstanceMetadata.get()); + membershipManager, + backgroundEventHandler, + metrics, + streamsInstanceMetadata.get(), + metadata + ); } else { heartbeatRequestManager = new HeartbeatRequestManager( logContext, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java index acc7e314362..d09ffbda4f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java @@ -16,28 +16,33 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.common.TopicPartition; + import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * Defines a self-contained object to exchange assignment-related metadata with the Kafka Streams instance. - * + * <p> * It's used to exchange information between the streams module and the clients module, and should be mostly self-contained */ public class StreamsAssignmentInterface { private UUID processID; - private String userEndPointHost; + private HostInfo endPoint; - private int userEndPointPort; + private String assignor; - private Map<String, SubTopology> subtopologyMap; + private Map<String, Subtopology> subtopologyMap; private Map<String, Object> assignmentConfiguration; @@ -51,15 +56,15 @@ public class StreamsAssignmentInterface { return processID; } - public String userEndPointHost() { - return userEndPointHost; + public HostInfo endPoint() { + return endPoint; } - public int userEndPointPort() { - return userEndPointPort; + public String assignor() { + return assignor; } - public Map<String, SubTopology> subtopologyMap() { + public Map<String, Subtopology> subtopologyMap() { return subtopologyMap; } @@ -67,6 +72,7 @@ public class StreamsAssignmentInterface { return assignmentConfiguration; } + // TODO: This needs to be used somewhere public Map<TaskId, Long> taskLags() { return taskLags; } @@ -84,19 +90,45 @@ public class StreamsAssignmentInterface { shutdownRequested.set(true); } + // TODO: This needs to be checked somewhere. public boolean shutdownRequested() { return shutdownRequested.get(); } + // TODO: This needs to be called somewhere public void setTaskLags(Map<TaskId, Long> taskLags) { this.taskLags = taskLags; } - // TODO: Reconciled assignment updated by the stream thread - public final Assignment reconciledAssignment = new Assignment(); + public final AtomicReference<Assignment> reconciledAssignment = new AtomicReference<>(); + + public final AtomicReference<Assignment> targetAssignment = new AtomicReference<>(); + + /** + * List of partitions available on each host. Updated by the streams protocol client. + */ + public final AtomicReference<Map<HostInfo, List<TopicPartition>>> partitionsByHost = new AtomicReference<>(); + + public static class HostInfo { + + public final String host; + + public final int port; + + public HostInfo(final String host, final int port) { + this.host = host; + this.port = port; + } + + @Override + public String toString() { + return "HostInfo{" + + "host='" + host + '\'' + + ", port=" + port + + '}'; + } - // TODO: Target assignment read by the stream thread - public final Assignment targetAssignment = new Assignment(); + } public static class Assignment { @@ -106,6 +138,37 @@ public class StreamsAssignmentInterface { public final Set<TaskId> warmupTasks = new HashSet<>(); + public Assignment() { + } + + public Assignment(final Set<TaskId> activeTasks, final Set<TaskId> standbyTasks, final Set<TaskId> warmupTasks) { + this.activeTasks.addAll(activeTasks); + this.standbyTasks.addAll(standbyTasks); + this.warmupTasks.addAll(warmupTasks); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Assignment that = (Assignment) o; + return Objects.equals(activeTasks, that.activeTasks) + && Objects.equals(standbyTasks, that.standbyTasks) + && Objects.equals(warmupTasks, that.warmupTasks); + } + + @Override + public int hashCode() { + return Objects.hash(activeTasks, standbyTasks, warmupTasks); + } + + public Assignment copy() { + return new Assignment(activeTasks, standbyTasks, warmupTasks); + } } public static class TopicInfo { @@ -130,53 +193,54 @@ public class StreamsAssignmentInterface { } public static class TaskId { + public final String subtopologyId; - public final int taskId; + public final int partitionId; - public int taskId() { - return taskId; + public int partitionId() { + return partitionId; } public String subtopologyId() { return subtopologyId; } - public TaskId(final String subtopologyId, final int taskId) { + public TaskId(final String subtopologyId, final int partitionId) { this.subtopologyId = subtopologyId; - this.taskId = taskId; + this.partitionId = partitionId; } @Override public String toString() { return "TaskId{" + "subtopologyId=" + subtopologyId + - ", taskId=" + taskId + + ", partitionId=" + partitionId + '}'; } } - public static class SubTopology { + public static class Subtopology { - public final Set<String> sinkTopics; public final Set<String> sourceTopics; + public final Set<String> sinkTopics; public final Map<String, TopicInfo> stateChangelogTopics; public final Map<String, TopicInfo> repartitionSourceTopics; - public SubTopology(final Set<String> sinkTopics, - final Set<String> sourceTopics, - final Map<String, TopicInfo> repartitionSourceTopics, - final Map<String, TopicInfo> stateChangelogTopics) { - this.sinkTopics = sinkTopics; + public Subtopology(final Set<String> sourceTopics, + final Set<String> sinkTopics, + final Map<String, TopicInfo> repartitionSourceTopics, + final Map<String, TopicInfo> stateChangelogTopics) { this.sourceTopics = sourceTopics; + this.sinkTopics = sinkTopics; this.stateChangelogTopics = stateChangelogTopics; this.repartitionSourceTopics = repartitionSourceTopics; } @Override public String toString() { - return "SubTopology{" + - "sinkTopics=" + sinkTopics + - ", sourceTopics=" + sourceTopics + + return "Subtopology{" + + "sourceTopics=" + sourceTopics + + ", sinkTopics=" + sinkTopics + ", stateChangelogTopics=" + stateChangelogTopics + ", repartitionSourceTopics=" + repartitionSourceTopics + '}'; @@ -184,15 +248,15 @@ public class StreamsAssignmentInterface { } public StreamsAssignmentInterface(UUID processID, - String userEndPointHost, - int userEndPointPort, - Map<String, SubTopology> subtopologyMap, + HostInfo endPoint, + String assignor, + Map<String, Subtopology> subtopologyMap, Map<String, Object> assignmentConfiguration, Map<String, String> clientTags - ) { + ) { this.processID = processID; - this.userEndPointHost = userEndPointHost; - this.userEndPointPort = userEndPointPort; + this.endPoint = endPoint; + this.assignor = assignor; this.subtopologyMap = subtopologyMap; this.assignmentConfiguration = assignmentConfiguration; this.taskLags = new HashMap<>(); @@ -204,8 +268,8 @@ public class StreamsAssignmentInterface { public String toString() { return "StreamsAssignmentMetadata{" + "processID=" + processID + - ", userEndPointHost='" + userEndPointHost + '\'' + - ", userEndPointPort=" + userEndPointPort + + ", endPoint='" + endPoint + '\'' + + ", assignor='" + assignor + '\'' + ", subtopologyMap=" + subtopologyMap + ", assignmentConfiguration=" + assignmentConfiguration + ", taskLags=" + taskLags + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java index 685ab28da69..ffa6bb2657f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManager.java @@ -18,15 +18,16 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment; import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; import org.apache.kafka.clients.consumer.internals.events.ErrorEvent; import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; import org.apache.kafka.common.message.StreamsHeartbeatRequestData; import org.apache.kafka.common.message.StreamsHeartbeatRequestData.TaskId; import org.apache.kafka.common.message.StreamsHeartbeatRequestData.HostInfo; @@ -42,27 +43,599 @@ import org.slf4j.Logger; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; public class StreamsHeartbeatRequestManager implements RequestManager { + private final Logger logger; + + private final CoordinatorRequestManager coordinatorRequestManager; + + private final StreamsHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; + + private final StreamsHeartbeatRequestManager.HeartbeatState heartbeatState; + + private final MembershipManager membershipManager; + + private final StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager; + + private final StreamsInitializeRequestManager streamsInitializeRequestManager; + + private final BackgroundEventHandler backgroundEventHandler; + + private final Timer pollTimer; + + private final HeartbeatMetricsManager metricsManager; + private StreamsAssignmentInterface streamsInterface; + private final Map<String, Uuid> assignedTopicIdCache; + + private final ConsumerMetadata metadata; + public StreamsHeartbeatRequestManager( + final LogContext logContext, + final Time time, + final ConsumerConfig config, + final CoordinatorRequestManager coordinatorRequestManager, final StreamsInitializeRequestManager streamsInitializeRequestManager, final StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager, - final StreamsAssignmentInterface streamsAssignmentInterface + final MembershipManager membershipManager, + final BackgroundEventHandler backgroundEventHandler, + final Metrics metrics, + final StreamsAssignmentInterface streamsAssignmentInterface, + final ConsumerMetadata metadata ) { + this.coordinatorRequestManager = coordinatorRequestManager; + this.logger = logContext.logger(getClass()); + this.membershipManager = membershipManager; + this.streamsInitializeRequestManager = streamsInitializeRequestManager; + this.streamsPrepareAssignmentRequestManager = streamsPrepareAssignmentRequestManager; + this.backgroundEventHandler = backgroundEventHandler; + int maxPollIntervalMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + long retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); + long retryBackoffMaxMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG); + this.heartbeatState = new StreamsHeartbeatRequestManager.HeartbeatState(streamsAssignmentInterface, membershipManager, + maxPollIntervalMs); + this.heartbeatRequestState = new StreamsHeartbeatRequestManager.HeartbeatRequestState(logContext, time, 0, retryBackoffMs, + retryBackoffMaxMs, maxPollIntervalMs); + this.pollTimer = time.timer(maxPollIntervalMs); + this.metricsManager = new HeartbeatMetricsManager(metrics); this.streamsInterface = streamsAssignmentInterface; + this.assignedTopicIdCache = new HashMap<>(); + this.metadata = metadata; } @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { - return null; + if (!coordinatorRequestManager.coordinator().isPresent() || + membershipManager.shouldSkipHeartbeat()) { + membershipManager.onHeartbeatRequestSkipped(); + return NetworkClientDelegate.PollResult.EMPTY; + } + pollTimer.update(currentTimeMs); + if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { + logger.warn("Consumer poll timeout has expired. This means the time between " + + "subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + + "which typically implies that the poll loop is spending too much time processing " + + "messages. You can address this either by increasing max.poll.interval.ms or by " + + "reducing the maximum size of batches returned in poll() with max.poll.records."); + + membershipManager.transitionToSendingLeaveGroup(true); + NetworkClientDelegate.UnsentRequest leaveHeartbeat = makeHeartbeatRequest(currentTimeMs, true); + + // We can ignore the leave response because we can join before or after receiving the response. + heartbeatRequestState.reset(); + heartbeatState.reset(); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, + Collections.singletonList(leaveHeartbeat)); + } + + boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); + if (!heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) { + return new NetworkClientDelegate.PollResult(heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); + } + + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs, false); + return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); + } + + @Override + public long maximumTimeToWait(long currentTimeMs) { + pollTimer.update(currentTimeMs); + if ( + pollTimer.isExpired() || + (membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight()) + ) { + return 0L; + } + return Math.min(pollTimer.remainingMs() / 2, heartbeatRequestState.nextHeartbeatMs(currentTimeMs)); + } + + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final long currentTimeMs, + final boolean ignoreResponse) { + NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(ignoreResponse); + heartbeatRequestState.onSendAttempt(currentTimeMs); + membershipManager.onHeartbeatRequestSent(); + metricsManager.recordHeartbeatSentMs(currentTimeMs); + return request; + } + + private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(final boolean ignoreResponse) { + NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( + new StreamsHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), + coordinatorRequestManager.coordinator()); + if (ignoreResponse) { + return logResponse(request); + } else { + return request.whenComplete((response, exception) -> { + long completionTimeMs = request.handler().completionTimeMs(); + if (response != null) { + metricsManager.recordRequestLatency(response.requestLatencyMs()); + onResponse((StreamsHeartbeatResponse) response.responseBody(), completionTimeMs); + } else { + onFailure(exception, completionTimeMs); + } + }); + } + } + + private NetworkClientDelegate.UnsentRequest logResponse(final NetworkClientDelegate.UnsentRequest request) { + return request.whenComplete((response, exception) -> { + if (response != null) { + metricsManager.recordRequestLatency(response.requestLatencyMs()); + Errors error = + Errors.forCode(((StreamsHeartbeatResponse) response.responseBody()).data().errorCode()); + if (error == Errors.NONE) { + logger.debug("StreamsHeartbeat responded successfully: {}", response); + } else { + logger.error("StreamsHeartbeat failed because of {}: {}", error, response); + } + } else { + logger.error("StreamsHeartbeat failed because of unexpected exception.", exception); + } + }); + } + + private void onFailure(final Throwable exception, final long responseTimeMs) { + this.heartbeatRequestState.onFailedAttempt(responseTimeMs); + this.heartbeatState.reset(); + if (exception instanceof RetriableException) { + String message = String.format("StreamsHeartbeatRequest failed because of the retriable exception. " + + "Will retry in %s ms: %s", + heartbeatRequestState.remainingBackoffMs(responseTimeMs), + exception.getMessage()); + logger.debug(message); + } else { + logger.error("StreamsHeartbeatRequest failed due to fatal error: {}", exception.getMessage()); + handleFatalFailure(exception); + } + } + + private void onResponse(final StreamsHeartbeatResponse response, long currentTimeMs) { + if (Errors.forCode(response.data().errorCode()) == Errors.NONE) { + onSuccessResponse(response, currentTimeMs); + } else { + onErrorResponse(response, currentTimeMs); + } + } + + private void onSuccessResponse(final StreamsHeartbeatResponse response, final long currentTimeMs) { + final StreamsHeartbeatResponseData data = response.data(); + + heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs()); + heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); + heartbeatRequestState.resetTimer(); + + if (data.shouldInitializeTopology()) { + streamsInitializeRequestManager.initialize(); + } + if (data.shouldComputeAssignment()) { + streamsPrepareAssignmentRequestManager.prepareAssignment(); + } + if (data.partitionsByHost() != null) { + streamsInterface.partitionsByHost.set(convertHostInfoMap(data)); + } + + ConsumerGroupHeartbeatResponseData cgData = new ConsumerGroupHeartbeatResponseData(); + cgData.setMemberId(data.memberId()); + cgData.setMemberEpoch(data.memberEpoch()); + cgData.setErrorCode(data.errorCode()); + cgData.setErrorMessage(data.errorMessage()); + cgData.setThrottleTimeMs(data.throttleTimeMs()); + cgData.setHeartbeatIntervalMs(data.heartbeatIntervalMs()); + + Errors assignmentError = response.assignmentError(); + String assignmentErrorMessage = response.data().errorMessage(); + if (assignmentError != Errors.NONE) { + logger.warn("Assignment incomplete. {}. {}", assignmentError.message(), assignmentErrorMessage); + switch (assignmentError) { + case STREAMS_SHUTDOWN_APPLICATION: + streamsInterface.requestShutdown(); + break; + case STREAMS_INCONSISTENT_TOPOLOGY: + case STREAMS_MISSING_SOURCE_TOPICS: + case STREAMS_GROUP_UNINITIALIZED: + break; + } + } + + if (data.activeTasks() != null && data.standbyTasks() != null && data.warmupTasks() != null) { + + setTargetAssignment(data); + setTargetAssignmentForConsumerGroup(data, cgData); + + } else { + if (data.activeTasks() != null || data.standbyTasks() != null || data.warmupTasks() != null) { + throw new IllegalStateException("Invalid response data, task collections must be all null or all non-null: " + data); + } + } + + membershipManager.onHeartbeatSuccess(cgData); + } + + private void setTargetAssignmentForConsumerGroup(final StreamsHeartbeatResponseData data, final ConsumerGroupHeartbeatResponseData cgData) { + Map<String, TopicPartitions> tps = new HashMap<>(); + data.activeTasks().forEach(taskId -> Stream.concat( + streamsInterface.subtopologyMap().get(taskId.subtopology()).sourceTopics.stream(), + streamsInterface.subtopologyMap().get(taskId.subtopology()).repartitionSourceTopics.keySet().stream() + ) + .forEach(topic -> { + final TopicPartitions toInsert = tps.computeIfAbsent(topic, k -> { + final Optional<Uuid> uuid = findTopicIdInGlobalOrLocalCache(topic); + if (uuid.isPresent()) { + TopicPartitions t = + new TopicPartitions(); + t.setTopicId(uuid.get()); + return t; + } else { + return null; + } + }); + if (toInsert != null) { + toInsert.partitions().addAll(taskId.partitions()); + } + })); + ConsumerGroupHeartbeatResponseData.Assignment cgAssignment = new ConsumerGroupHeartbeatResponseData.Assignment(); + cgAssignment.setTopicPartitions(new ArrayList<>(tps.values())); + cgData.setAssignment(cgAssignment); + } + + private void setTargetAssignment(final StreamsHeartbeatResponseData data) { + Assignment targetAssignment = new Assignment(); + updateTaskIdCollection(data.activeTasks(), targetAssignment.activeTasks); + updateTaskIdCollection(data.standbyTasks(), targetAssignment.standbyTasks); + updateTaskIdCollection(data.warmupTasks(), targetAssignment.warmupTasks); + streamsInterface.targetAssignment.set(targetAssignment); + } + + private static Map<StreamsAssignmentInterface.HostInfo, List<TopicPartition>> convertHostInfoMap( + final StreamsHeartbeatResponseData data) { + Map<StreamsAssignmentInterface.HostInfo, List<TopicPartition>> partitionsByHost = new HashMap<>(); + data.partitionsByHost().forEach(hostInfo -> { + List<TopicPartition> topicPartitions = hostInfo.partitions().stream() + .flatMap(partition -> + partition.partitions().stream().map(partitionId -> new TopicPartition(partition.topic(), partitionId))) + .collect(Collectors.toList()); + partitionsByHost.put(new StreamsAssignmentInterface.HostInfo(hostInfo.host(), hostInfo.port()), topicPartitions); + }); + return partitionsByHost; + } + + private void updateTaskIdCollection( + final List<StreamsHeartbeatResponseData.TaskId> source, + final Set<StreamsAssignmentInterface.TaskId> target + ) { + target.clear(); + source.forEach(taskId -> { + taskId.partitions().forEach(partition -> { + target.add(new StreamsAssignmentInterface.TaskId(taskId.subtopology(), partition)); + }); + }); + } + + private void onErrorResponse(final StreamsHeartbeatResponse response, + final long currentTimeMs) { + Errors error = Errors.forCode(response.data().errorCode()); + String errorMessage = response.data().errorMessage(); + String message; + + this.heartbeatState.reset(); + this.heartbeatRequestState.onFailedAttempt(currentTimeMs); + membershipManager.onHeartbeatFailure(); + + switch (error) { + case NOT_COORDINATOR: + // the manager should retry immediately when the coordinator node becomes available again + message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is incorrect. " + + "Will attempt to find the coordinator again and retry", + coordinatorRequestManager.coordinator()); + logInfo(message, response, currentTimeMs); + coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs); + // Skip backoff so that the next HB is sent as soon as the new coordinator is discovered + heartbeatRequestState.reset(); + break; + + case COORDINATOR_NOT_AVAILABLE: + message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is not available. " + + "Will attempt to find the coordinator again and retry", + coordinatorRequestManager.coordinator()); + logInfo(message, response, currentTimeMs); + coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs); + // Skip backoff so that the next HB is sent as soon as the new coordinator is discovered + heartbeatRequestState.reset(); + break; + + case COORDINATOR_LOAD_IN_PROGRESS: + // the manager will backoff and retry + message = String.format("StreamsHeartbeatRequest failed because the group coordinator %s is still loading." + + "Will retry", + coordinatorRequestManager.coordinator()); + logInfo(message, response, currentTimeMs); + break; + + case GROUP_AUTHORIZATION_FAILED: + GroupAuthorizationException exception = + GroupAuthorizationException.forGroupId(membershipManager.groupId()); + logger.error("StreamsHeartbeatRequest failed due to group authorization failure: {}", exception.getMessage()); + handleFatalFailure(error.exception(exception.getMessage())); + break; + + case UNRELEASED_INSTANCE_ID: + logger.error("StreamsHeartbeatRequest failed due to the instance id {} was not released: {}", + membershipManager.groupInstanceId().orElse("null"), errorMessage); + handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage)); + break; + + case INVALID_REQUEST: + case GROUP_MAX_SIZE_REACHED: + case UNSUPPORTED_ASSIGNOR: + case UNSUPPORTED_VERSION: + logger.error("StreamsHeartbeatRequest failed due to error: {}", error); + handleFatalFailure(error.exception(errorMessage)); + break; + + case FENCED_MEMBER_EPOCH: + message = String.format("StreamsHeartbeatRequest failed for member %s because epoch %s is fenced.", + membershipManager.memberId(), membershipManager.memberEpoch()); + logInfo(message, response, currentTimeMs); + membershipManager.transitionToFenced(); + // Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment + heartbeatRequestState.reset(); + break; + + case UNKNOWN_MEMBER_ID: + message = String.format("StreamsHeartbeatRequest failed because member %s is unknown.", + membershipManager.memberId()); + logInfo(message, response, currentTimeMs); + membershipManager.transitionToFenced(); + // Skip backoff so that a next HB to rejoin is sent as soon as the fenced member releases its assignment + heartbeatRequestState.reset(); + break; + + default: + // If the manager receives an unknown error - there could be a bug in the code or a new error code + logger.error("StreamsHeartbeatRequest failed due to unexpected error: {}", error); + handleFatalFailure(error.exception(errorMessage)); + break; + } + } + + private void logInfo(final String message, + final StreamsHeartbeatResponse response, + final long currentTimeMs) { + logger.info("{} in {}ms: {}", + message, + heartbeatRequestState.remainingBackoffMs(currentTimeMs), + response.data().errorMessage()); + } + + private void handleFatalFailure(Throwable error) { + backgroundEventHandler.add(new ErrorEvent(error)); + membershipManager.transitionToFatal(); + } + + /** + * Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The object extends + * {@link RequestState} to enable exponential backoff and duplicated request handling. The two fields that it holds are: + */ + static class HeartbeatRequestState extends RequestState { + + /** + * heartbeatTimer tracks the time since the last heartbeat was sent + */ + private final Timer heartbeatTimer; + + /** + * The heartbeat interval which is acquired/updated through the heartbeat request + */ + private long heartbeatIntervalMs; + + public HeartbeatRequestState( + final LogContext logContext, + final Time time, + final long heartbeatIntervalMs, + final long retryBackoffMs, + final long retryBackoffMaxMs, + final double jitter) { + super(logContext, StreamsHeartbeatRequestManager.HeartbeatRequestState.class.getName(), retryBackoffMs, 2, retryBackoffMaxMs, + jitter); + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.heartbeatTimer = time.timer(heartbeatIntervalMs); + } + + private void update(final long currentTimeMs) { + this.heartbeatTimer.update(currentTimeMs); + } + + public void resetTimer() { + this.heartbeatTimer.reset(heartbeatIntervalMs); + } + + @Override + public boolean canSendRequest(final long currentTimeMs) { + update(currentTimeMs); + return heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs); + } + + public long nextHeartbeatMs(final long currentTimeMs) { + if (heartbeatTimer.remainingMs() == 0) { + return this.remainingBackoffMs(currentTimeMs); + } + return heartbeatTimer.remainingMs(); + } + + private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { + if (this.heartbeatIntervalMs == heartbeatIntervalMs) { + // no need to update the timer if the interval hasn't changed + return; + } + this.heartbeatIntervalMs = heartbeatIntervalMs; + this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); + } + } + + private Optional<Uuid> findTopicIdInGlobalOrLocalCache(String topicName) { + Uuid idFromMetadataCache = metadata.topicIds().getOrDefault(topicName, null); + if (idFromMetadataCache != null) { + // Add topic name to local cache, so it can be reused if included in a next target + // assignment if metadata cache not available. + assignedTopicIdCache.put(topicName, idFromMetadataCache); + return Optional.of(idFromMetadataCache); + } else { + Uuid idFromLocalCache = assignedTopicIdCache.getOrDefault(topicName, null); + return Optional.ofNullable(idFromLocalCache); + } + } + + static class HeartbeatState { + + private final MembershipManager membershipManager; + private final int rebalanceTimeoutMs; + private final StreamsHeartbeatRequestManager.HeartbeatState.SentFields sentFields; + + /* + * StreamsGroupMetadata holds the metadata for the streams group + */ + private final StreamsAssignmentInterface streamsInterface; + + public HeartbeatState( + final StreamsAssignmentInterface streamsInterface, + final MembershipManager membershipManager, + final int rebalanceTimeoutMs) { + this.membershipManager = membershipManager; + this.rebalanceTimeoutMs = rebalanceTimeoutMs; + this.sentFields = new StreamsHeartbeatRequestManager.HeartbeatState.SentFields(); + this.streamsInterface = streamsInterface; + } + + public void reset() { + sentFields.reset(); + } + + public StreamsHeartbeatRequestData buildRequestData() { + StreamsHeartbeatRequestData data = new StreamsHeartbeatRequestData(); + + // GroupId - always sent + data.setGroupId(membershipManager.groupId()); + + // MemberId - always sent, empty until it has been received from the coordinator + data.setMemberId(membershipManager.memberId()); + + // MemberEpoch - always sent + data.setMemberEpoch(membershipManager.memberEpoch()); + + // InstanceId - set if present + membershipManager.groupInstanceId().ifPresent(data::setInstanceId); + + boolean joining = membershipManager.state() == MemberState.JOINING; + + // RebalanceTimeoutMs - only sent when joining or if it has changed since the last heartbeat + if (joining || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) { + data.setRebalanceTimeoutMs(rebalanceTimeoutMs); + sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs; + } + + // Immutable -- only sent when joining + if (joining) { + data.setProcessId(streamsInterface.processID().toString()); + HostInfo hostInfo = new HostInfo(); + hostInfo.setHost(streamsInterface.endPoint().host); + hostInfo.setPort(streamsInterface.endPoint().port); + data.setHostInfo(hostInfo); + data.setAssignor(streamsInterface.assignor()); + data.setClientTags(streamsInterface.clientTags().entrySet().stream().map(entry -> { + StreamsHeartbeatRequestData.KeyValue tag = new StreamsHeartbeatRequestData.KeyValue(); + tag.setKey(entry.getKey()); + tag.setValue(entry.getValue()); + return tag; + }).collect(Collectors.toList())); + data.setAssignmentConfigs(streamsInterface.assignmentConfiguration().entrySet().stream().map(entry -> { + StreamsHeartbeatRequestData.KeyValue config = new StreamsHeartbeatRequestData.KeyValue(); + config.setKey(entry.getKey()); + config.setValue(entry.getValue().toString()); + return config; + }).collect(Collectors.toList())); + } else { + // FIXME: This seems to be a bug in the RPC serialization code. The default should be null, but it's empty. + data.setAssignor(null); + } + + if (streamsInterface.shutdownRequested()) { + data.setShutdownApplication(true); + } + + Assignment reconciledAssignment = streamsInterface.reconciledAssignment.get(); + + if (reconciledAssignment != null) { + if (!reconciledAssignment.equals(sentFields.assignment)) { + data.setActiveTasks(convertTaskIdCollection(reconciledAssignment.activeTasks)); + data.setStandbyTasks(convertTaskIdCollection(reconciledAssignment.standbyTasks)); + data.setWarmupTasks(convertTaskIdCollection(reconciledAssignment.warmupTasks)); + sentFields.assignment = reconciledAssignment; + } + } + + return data; + } + + private List<TaskId> convertTaskIdCollection(final Set<StreamsAssignmentInterface.TaskId> tasks) { + return tasks.stream() + .collect( + Collectors.groupingBy(StreamsAssignmentInterface.TaskId::subtopologyId, + Collectors.mapping(StreamsAssignmentInterface.TaskId::partitionId, Collectors.toList())) + ) + .entrySet() + .stream() + .map(entry -> { + TaskId id = new TaskId(); + id.setSubtopology(entry.getKey()); + id.setPartitions(entry.getValue()); + return id; + }) + .collect(Collectors.toList()); + } + + // Fields of StreamsHeartbeatRequest sent in the most recent request + static class SentFields { + + private int rebalanceTimeoutMs = -1; + private Assignment assignment = null; + + SentFields() { + } + + void reset() { + rebalanceTimeoutMs = -1; + assignment = null; + } + } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java index 7936e885b56..b47572637bb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsInstallAssignmentRequestManager.java @@ -28,7 +28,7 @@ public class StreamsInstallAssignmentRequestManager implements RequestManager { @Override public PollResult poll(final long currentTimeMs) { - return null; + return PollResult.EMPTY; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java index 069661f4893..fb4f59cdf96 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsPrepareAssignmentRequestManager.java @@ -28,7 +28,7 @@ public class StreamsPrepareAssignmentRequestManager implements RequestManager { @Override public PollResult poll(final long currentTimeMs) { - return null; + return PollResult.EMPTY; } public void prepareAssignment() { diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java similarity index 86% rename from clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java rename to clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java index ffd0b63d8e3..204ba1e2186 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/StreamsMissingInternalTopicsException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsGroupUninitializedException.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.common.errors; -public class StreamsMissingInternalTopicsException extends ApiException { - public StreamsMissingInternalTopicsException(String message) { +public class StreamsGroupUninitializedException extends ApiException { + public StreamsGroupUninitializedException(String message) { super(message); } } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java similarity index 86% rename from clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java rename to clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java index dd6509f90f0..8fae67b6b4b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignment.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/StreamsInvalidAssignmentException.java @@ -16,8 +16,8 @@ */ package org.apache.kafka.common.errors; -public class StreamsInvalidAssignment extends ApiException { - public StreamsInvalidAssignment(String message) { +public class StreamsInvalidAssignmentException extends ApiException { + public StreamsInvalidAssignmentException(String message) { super(message); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 9790d87863a..f9f063051b7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -21,9 +21,9 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.StreamsInconsistentTopologyException; -import org.apache.kafka.common.errors.StreamsInvalidAssignment; +import org.apache.kafka.common.errors.StreamsInvalidAssignmentException; import org.apache.kafka.common.errors.StreamsInvalidTopologyException; -import org.apache.kafka.common.errors.StreamsMissingInternalTopicsException; +import org.apache.kafka.common.errors.StreamsGroupUninitializedException; import org.apache.kafka.common.errors.StreamsMissingSourceTopicsException; import org.apache.kafka.common.errors.StreamsShutdownApplicationException; import org.apache.kafka.common.errors.ClusterAuthorizationException; @@ -428,8 +428,10 @@ public enum Errors { StreamsMissingSourceTopicsException::new), STREAMS_MISSING_INTERNAL_TOPICS(133, "One or more internal topics are missing.", StreamsMissingInternalTopicsException::new), - STREAMS_SHUTDOWN_APPLICATION(134, "A client requested the shutdown of the whole application.", - StreamsShutdownApplicationException::new); + STREAMS_GROUP_UNINITIALIZED(134, "The group is not (fully) initialized, broker-side topology information or internal topics are missing.", + StreamsGroupUninitializedException::new), + STREAMS_SHUTDOWN_APPLICATION(135, "A client requested the shutdown of the whole application.", + StreamsShutdownApplicationException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java index 1e8c7a5b7e9..835fec5c8ff 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsHeartbeatResponse.java @@ -41,7 +41,7 @@ import java.util.Map; * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION} * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY} * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS} - * - {@link Errors#STREAMS_MISSING_INTERNAL_TOPICS} + * - {@link Errors#STREAMS_GROUP_UNINITIALIZED} */ public class StreamsHeartbeatResponse extends AbstractResponse { private final StreamsHeartbeatResponseData data; @@ -71,6 +71,18 @@ public class StreamsHeartbeatResponse extends AbstractResponse { data.setThrottleTimeMs(throttleTimeMs); } + /** + * Possible error codes. + * + * - {@link Errors#STREAMS_SHUTDOWN_APPLICATION} + * - {@link Errors#STREAMS_INCONSISTENT_TOPOLOGY} + * - {@link Errors#STREAMS_MISSING_SOURCE_TOPICS} + * - {@link Errors#STREAMS_GROUP_UNINITIALIZED} + */ + public Errors assignmentError() { + return Errors.forCode(data.assignmentErrorCode()); + } + public static StreamsHeartbeatResponse parse(ByteBuffer buffer, short version) { return new StreamsHeartbeatResponse(new StreamsHeartbeatResponseData( new ByteBufferAccessor(buffer), version)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java index 04072dd8565..83761d805de 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeRequest.java @@ -1,9 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.StreamsInitializeRequestData; import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java index 7b7d19961de..81c281aba98 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInitializeResponse.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.StreamsInitializeResponseData; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java index d793101c537..7063ec61676 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentRequest.java @@ -1,10 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; -import org.apache.kafka.common.message.StreamsInstallAssignmentRequestData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java index 3a09420f796..cc389760577 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsInstallAssignmentResponse.java @@ -1,10 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; -import org.apache.kafka.common.message.StreamsInitializeResponseData; import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java index 3c83273bf4f..adeef79393b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentRequest.java @@ -1,10 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; -import org.apache.kafka.common.message.StreamsPrepareAssignmentRequestData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java index 99997022783..f0c478ec81b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StreamsPrepareAssignmentResponse.java @@ -1,10 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; -import org.apache.kafka.common.message.StreamsInstallAssignmentResponseData; import org.apache.kafka.common.message.StreamsPrepareAssignmentResponseData; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json index c97d0b912ff..cd8df8c477e 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatRequest.json @@ -34,11 +34,11 @@ { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1, "about": "-1 if it didn't change since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." }, - // If the topology hash does not match the server's topology hash, return INVALID_TOPOLOGY and send ShouldInitializeTopology to one consumer { "name": "TopologyHash", "type": "bytes", "versions": "0+", "nullableVersions": "0+", "default": null, "about": "The hash of the topology. Only sent when memberEpoch = 0. Null otherwise. Used to check if topology corresponds to server-topology. " }, + { "name": "Assignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": null, + "about": "The desired assignor. Picked by majority vote among clients. If null, the client votes for the broker to pick the assignor." }, - // The coordinator knows which partitions to fetch from given this information { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, { "name": "StandbyTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", @@ -80,8 +80,6 @@ ], "commonStructs": [ { "name": "TaskId", "versions": "0+", "fields": [ - // We are using strings here, to avoid fixing a structure on task IDs which will - // allow introducing stable naming for subtopologies in the future. { "name": "Subtopology", "type": "string", "versions": "0+", "about": "subtopology ID" }, { "name": "Partitions", "type": "[]int32", "versions": "0+", diff --git a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json index 272a5a4cd51..513e1d78911 100644 --- a/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json +++ b/clients/src/main/resources/common/message/StreamsHeartbeatResponse.json @@ -30,10 +30,6 @@ // - UNSUPPORTED_ASSIGNOR (version 0+) // - UNRELEASED_INSTANCE_ID (version 0+) // - GROUP_MAX_SIZE_REACHED (version 0+) - // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+) - // - STREAMS_MISSING_SOURCE_TOPICS (version 0+) - // - STREAMS_MISSING_INTERNAL_TOPICS (version 0+) - // - STREAMS_SHUTDOWN_APPLICATION (version 0+) "fields": [ // Same as consumer group heart beat { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -55,6 +51,16 @@ { "name": "ShouldInitializeTopology", "type": "bool", "versions": "0+", "default": false, "about": "true if this streams application should initialize the topology on the broker" }, + // Possible errors: + // - STREAMS_INCONSISTENT_TOPOLOGY (version 0+) + // - STREAMS_MISSING_SOURCE_TOPICS (version 0+) + // - STREAMS_SHUTDOWN_APPLICATION (version 0+) + // - STREAMS_GROUP_UNITIALIZED (version 0+) + { "name": "AssignmentErrorCode", "type": "int16", "versions": "0+", "default": 0, + "about": "An error code relating to the current state of the assignment to this member." }, + { "name": "AssignmentErrorMessage", "type": "string", "versions": "0+", "default": "", + "about": "An error message relating to the current state of the assignment to this member." }, + // The streams app knows which partitions to fetch from given this information { "name": "ActiveTasks", "type": "[]TaskId", "versions": "0+", "nullableVersions": "0+", "default": "null", "about": "Local assignment for this consumer. Null if unchanged since last heartbeat." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java new file mode 100644 index 00000000000..17b99c9c72f --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsHeartbeatRequestManagerTest.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.HostInfo; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Subtopology; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.TaskId; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.TopicInfo; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.TopicPartitions; +import org.apache.kafka.common.message.StreamsHeartbeatResponseData; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.StreamsHeartbeatRequest; +import org.apache.kafka.common.requests.StreamsHeartbeatResponse; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class StreamsHeartbeatRequestManagerTest { + + public static final String TEST_GROUP_ID = "testGroupId"; + public static final String TEST_MEMBER_ID = "testMemberId"; + public static final int TEST_MEMBER_EPOCH = 5; + public static final String TEST_INSTANCE_ID = "instanceId"; + public static final int TEST_THROTTLE_TIME_MS = 5; + private StreamsHeartbeatRequestManager heartbeatRequestManager; + + private Time time; + + private StreamsAssignmentInterface streamsAssignmentInterface; + + private ConsumerConfig config; + + @Mock + private CoordinatorRequestManager coordinatorRequestManager; + + @Mock + private StreamsInitializeRequestManager streamsInitializeRequestManager; + + @Mock + private StreamsPrepareAssignmentRequestManager streamsPrepareAssignmentRequestManager; + + @Mock + private MembershipManager membershipManager; + + @Mock + private BackgroundEventHandler backgroundEventHandler; + + @Mock + private Metrics metrics; + + @Mock + private ConsumerMetadata metadata; + + // Static data for testing + private final UUID processID = new UUID(1, 1); + + private final HostInfo endPoint = new HostInfo("localhost", 9092); + + private final String assignor = "test"; + + private final Map<String, Subtopology> subtopologyMap = new HashMap<>(); + + private final Map<String, Object> assignmentConfiguration = new HashMap<>(); + + private final Map<String, String> clientTags = new HashMap<>(); + + private final Node coordinatorNode = new Node(1, "localhost", 9092); + + @BeforeEach + void setUp() { + config = config(); + + subtopologyMap.clear(); + assignmentConfiguration.clear(); + clientTags.clear(); + streamsAssignmentInterface = + new StreamsAssignmentInterface( + processID, + endPoint, + assignor, + subtopologyMap, + assignmentConfiguration, + clientTags + ); + LogContext logContext = new LogContext("test"); + time = new MockTime(); + + MockitoAnnotations.openMocks(this); + when(metrics.sensor(anyString())).thenReturn(mock(Sensor.class)); + heartbeatRequestManager = new StreamsHeartbeatRequestManager( + logContext, + time, + config, + coordinatorRequestManager, + streamsInitializeRequestManager, + streamsPrepareAssignmentRequestManager, + membershipManager, + backgroundEventHandler, + metrics, + streamsAssignmentInterface, + metadata + ); + + when(membershipManager.groupId()).thenReturn(TEST_GROUP_ID); + when(membershipManager.memberId()).thenReturn(TEST_MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(TEST_MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(TEST_INSTANCE_ID)); + } + + + @Test + void testNoHeartbeatIfCoordinatorUnknown() { + when(membershipManager.shouldHeartbeatNow()).thenReturn(true); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.unsentRequests.size()); + verify(membershipManager).onHeartbeatRequestSkipped(); + } + + @Test + void testNoHeartbeatIfHeartbeatSkipped() { + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(0, result.unsentRequests.size()); + verify(membershipManager).onHeartbeatRequestSkipped(); + } + + @Test + void testHeartbeatWhenCoordinatorKnown() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); + + StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) result.unsentRequests.get(0).requestBuilder().build(); + + assertEquals(TEST_GROUP_ID, request.data().groupId()); + assertEquals(TEST_MEMBER_ID, request.data().memberId()); + assertEquals(TEST_MEMBER_EPOCH, request.data().memberEpoch()); + assertEquals(TEST_INSTANCE_ID, request.data().instanceId()); + + // Static information is null + assertNull(request.data().processId()); + assertNull(request.data().hostInfo()); + assertNull(request.data().assignor()); + assertNull(request.data().assignmentConfigs()); + assertNull(request.data().clientTags()); + } + + @Test + void testFullStaticInformationWhenJoining() { + mockJoiningState(); + assignmentConfiguration.put("config1", "value1"); + clientTags.put("clientTag1", "value2"); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); + + StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) result.unsentRequests.get(0).requestBuilder().build(); + + assertEquals(processID.toString(), request.data().processId()); + assertEquals(endPoint.host, request.data().hostInfo().host()); + assertEquals(endPoint.port, request.data().hostInfo().port()); + assertEquals(assignor, request.data().assignor()); + assertEquals(1, request.data().assignmentConfigs().size()); + assertEquals("config1", request.data().assignmentConfigs().get(0).key()); + assertEquals("value1", request.data().assignmentConfigs().get(0).value()); + assertEquals(1, request.data().clientTags().size()); + assertEquals("clientTag1", request.data().clientTags().get(0).key()); + assertEquals("value2", request.data().clientTags().get(0).value()); + } + + @Test + void testShutdownRequested() { + mockJoiningState(); + streamsAssignmentInterface.requestShutdown(); + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + assertEquals(Optional.of(coordinatorNode), result.unsentRequests.get(0).node()); + + StreamsHeartbeatRequest request = (StreamsHeartbeatRequest) result.unsentRequests.get(0).requestBuilder().build(); + + assertEquals(true, request.data().shutdownApplication()); + } + + @Test + void testSuccessfulResponse() { + mockJoiningState(); + + final Uuid uuid0 = Uuid.randomUuid(); + final Uuid uuid1 = Uuid.randomUuid(); + + final TopicInfo emptyTopicInfo = new TopicInfo(Optional.empty(), Collections.emptyMap()); + + when(metadata.topicIds()).thenReturn( + mkMap( + mkEntry("source0", uuid0), + mkEntry("repartition0", uuid1) + )); + + streamsAssignmentInterface.subtopologyMap().put("0", + new Subtopology( + Collections.singleton("source0"), + Collections.singleton("sink0"), + Collections.singletonMap("repartition0", emptyTopicInfo), + Collections.singletonMap("changelog0", emptyTopicInfo) + )); + streamsAssignmentInterface.subtopologyMap().put("1", + new Subtopology( + Collections.singleton("source1"), + Collections.singleton("sink1"), + Collections.singletonMap("repartition1", emptyTopicInfo), + Collections.singletonMap("changelog1", emptyTopicInfo) + )); + streamsAssignmentInterface.subtopologyMap().put("2", + new Subtopology( + Collections.singleton("source2"), + Collections.singleton("sink2"), + Collections.singletonMap("repartition2", emptyTopicInfo), + Collections.singletonMap("changelog2", emptyTopicInfo) + )); + + StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setMemberId(TEST_MEMBER_ID) + .setMemberEpoch(TEST_MEMBER_EPOCH) + .setThrottleTimeMs(TEST_THROTTLE_TIME_MS) + .setHeartbeatIntervalMs(1000) + .setActiveTasks(Collections.singletonList( + new StreamsHeartbeatResponseData.TaskId().setSubtopology("0").setPartitions(Collections.singletonList(0)))) + .setStandbyTasks(Collections.singletonList( + new StreamsHeartbeatResponseData.TaskId().setSubtopology("1").setPartitions(Collections.singletonList(1)))) + .setWarmupTasks(Collections.singletonList( + new StreamsHeartbeatResponseData.TaskId().setSubtopology("2").setPartitions(Collections.singletonList(2)))); + + mockResponse(data); + + ArgumentCaptor<ConsumerGroupHeartbeatResponseData> captor = ArgumentCaptor.forClass(ConsumerGroupHeartbeatResponseData.class); + verify(membershipManager, times(1)).onHeartbeatSuccess(captor.capture()); + ConsumerGroupHeartbeatResponseData response = captor.getValue(); + assertEquals(Errors.NONE.code(), response.errorCode()); + assertEquals(TEST_MEMBER_ID, response.memberId()); + assertEquals(TEST_MEMBER_EPOCH, response.memberEpoch()); + assertEquals(TEST_THROTTLE_TIME_MS, response.throttleTimeMs()); + assertEquals(1000, response.heartbeatIntervalMs()); + final List<TopicPartitions> tps = response.assignment().topicPartitions(); + assertEquals(2, tps.size()); + assertEquals(mkSet(uuid0, uuid1), tps.stream().map(TopicPartitions::topicId).collect(Collectors.toSet())); + assertEquals(Collections.singletonList(0), tps.get(0).partitions()); + assertEquals(Collections.singletonList(0), tps.get(1).partitions()); + + final Assignment targetAssignment = streamsAssignmentInterface.targetAssignment.get(); + assertEquals(1, targetAssignment.activeTasks.size()); + final TaskId activeTaskId = targetAssignment.activeTasks.stream().findFirst().get(); + assertEquals(activeTaskId.subtopologyId, "0"); + assertEquals(activeTaskId.partitionId, 0); + + assertEquals(1, targetAssignment.standbyTasks.size()); + final TaskId standbyTaskId = targetAssignment.standbyTasks.stream().findFirst().get(); + assertEquals(standbyTaskId.subtopologyId, "1"); + assertEquals(standbyTaskId.partitionId, 1); + + assertEquals(1, targetAssignment.warmupTasks.size()); + final TaskId warmupTaskId = targetAssignment.warmupTasks.stream().findFirst().get(); + assertEquals(warmupTaskId.subtopologyId, "2"); + assertEquals(warmupTaskId.partitionId, 2); + + } + + + @Test + void testPrepareAssignment() { + mockJoiningState(); + + StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setMemberEpoch(TEST_MEMBER_EPOCH) + .setShouldComputeAssignment(true); + + mockResponse(data); + + verify(streamsPrepareAssignmentRequestManager).prepareAssignment(); + } + + @Test + void testInitializeTopology() { + mockJoiningState(); + + StreamsHeartbeatResponseData data = new StreamsHeartbeatResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0) + .setMemberEpoch(TEST_MEMBER_EPOCH) + .setShouldInitializeTopology(true); + + mockResponse(data); + + verify(streamsInitializeRequestManager).initialize(); + } + + private void mockResponse(final StreamsHeartbeatResponseData data) { + + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + + assertEquals(1, result.unsentRequests.size()); + final UnsentRequest unsentRequest = result.unsentRequests.get(0); + assertEquals(Optional.of(coordinatorNode), unsentRequest.node()); + + ClientResponse response = createHeartbeatResponse(unsentRequest, data); + + unsentRequest.handler().onComplete(response); + } + + private void mockJoiningState() { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + when(membershipManager.state()).thenReturn(MemberState.JOINING); + } + + private ClientResponse createHeartbeatResponse( + final NetworkClientDelegate.UnsentRequest request, + final StreamsHeartbeatResponseData data + ) { + StreamsHeartbeatResponse response = new StreamsHeartbeatResponse(data); + return new ClientResponse( + new RequestHeader(ApiKeys.STREAMS_HEARTBEAT, ApiKeys.STREAMS_HEARTBEAT.latestVersion(), "client-id", 1), + request.handler(), + "0", + time.milliseconds(), + time.milliseconds(), + false, + null, + null, + response); + } + + private ConsumerConfig config() { + Properties prop = new Properties(); + prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS)); + return new ConsumerConfig(prop); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java index 71ac9982d67..1ec2053b3ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultKafkaClientSupplier.java @@ -49,9 +49,9 @@ public class DefaultKafkaClientSupplier implements KafkaClientSupplier { } @Override - public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { - ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(); - ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(); + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, final StreamsAssignmentInterface assignmentInterface) { + final ByteArrayDeserializer keyDeserializer = new ByteArrayDeserializer(); + final ByteArrayDeserializer valueDeserializer = new ByteArrayDeserializer(); return new AsyncKafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(config, keyDeserializer, valueDeserializer)), keyDeserializer, valueDeserializer, Optional.of(assignmentInterface)); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 7433896fab8..f0a27b913f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -27,11 +27,14 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface; -import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.SubTopology; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Assignment; +import org.apache.kafka.clients.consumer.internals.StreamsAssignmentInterface.Subtopology; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; @@ -57,7 +60,6 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.assignment.ProcessId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; -import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.assignment.AssignorError; import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -67,6 +69,7 @@ import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager.Def import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.internals.ThreadCache; +import java.util.List; import org.slf4j.Logger; import java.time.Duration; @@ -85,12 +88,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.streams.internals.StreamsConfigUtils.eosEnabled; import static org.apache.kafka.streams.processor.internals.ClientUtils.adminClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.consumerClientId; import static org.apache.kafka.streams.processor.internals.ClientUtils.restoreConsumerClientId; +@SuppressWarnings("ClassDataAbstractionCoupling") public class StreamThread extends Thread implements ProcessingThread { private static final String THREAD_ID_SUBSTRING = "-StreamThread-"; @@ -347,6 +352,10 @@ public class StreamThread extends Thread implements ProcessingThread { // handler for, eg MissingSourceTopicException with named topologies private final Queue<StreamsException> nonFatalExceptionsToHandle; + // These are used only with the Streams Rebalance Protocol client + private final StreamsAssignmentInterface streamsAssignmentInterface; + private final StreamsMetadataState streamsMetadataState; + // These are used to signal from outside the stream thread, but the variables themselves are internal to the thread private final AtomicLong cacheResizeSize = new AtomicLong(-1L); private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false); @@ -483,6 +492,7 @@ public class StreamThread extends Thread implements ProcessingThread { } final Consumer<byte[], byte[]> mainConsumer; + final StreamsAssignmentInterface streamsAssignmentInterface; if (consumerConfigs.containsKey(ConsumerConfig.GROUP_PROTOCOL_CONFIG) && consumerConfigs.get(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toString().equalsIgnoreCase(GroupProtocol.CONSUMER.name)) { if (topologyMetadata.hasNamedTopologies()) { @@ -490,7 +500,7 @@ public class StreamThread extends Thread implements ProcessingThread { } log.info("Streams rebalance protocol enabled"); - StreamsAssignmentInterface streamsAssignmentInterface = + streamsAssignmentInterface = initAssignmentInterface(processId, config, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)), @@ -499,6 +509,7 @@ public class StreamThread extends Thread implements ProcessingThread { mainConsumer = clientSupplier.getStreamsRebalanceProtocolConsumer(consumerConfigs, streamsAssignmentInterface); } else { mainConsumer = clientSupplier.getConsumer(consumerConfigs); + streamsAssignmentInterface = null; } taskManager.setMainConsumer(mainConsumer); @@ -526,8 +537,9 @@ public class StreamThread extends Thread implements ProcessingThread { referenceContainer.nonFatalExceptionsToHandle, shutdownErrorHook, streamsUncaughtExceptionHandler, - cache::resize - ); + cache::resize, + streamsAssignmentInterface, + referenceContainer.streamsMetadataState); return streamThread.updateThreadMetadata(adminClientId(clientId)); } @@ -541,26 +553,27 @@ public class StreamThread extends Thread implements ProcessingThread { } } - private static StreamsAssignmentInterface initAssignmentInterface(UUID processId, - StreamsConfig config, - HostInfo hostInfo, + private static StreamsAssignmentInterface initAssignmentInterface(final UUID processId, + final StreamsConfig config, + final HostInfo hostInfo, final TopologyMetadata topologyMetadata) { final InternalTopologyBuilder internalTopologyBuilder = topologyMetadata.lookupBuilderForNamedTopology(null); - Map<String, SubTopology> subtopologyMap = new HashMap<>(); - for (Map.Entry<Subtopology, TopicsInfo> topicsInfoEntry: internalTopologyBuilder.subtopologyToTopicsInfo().entrySet()) { + final Map<String, Subtopology> subtopologyMap = new HashMap<>(); + for (final Map.Entry<TopologyMetadata.Subtopology, TopicsInfo> topicsInfoEntry : internalTopologyBuilder.subtopologyToTopicsInfo() + .entrySet()) { subtopologyMap.put( String.valueOf(topicsInfoEntry.getKey().nodeGroupId), - new SubTopology( + new Subtopology( topicsInfoEntry.getValue().sourceTopics, topicsInfoEntry.getValue().sinkTopics, topicsInfoEntry.getValue().repartitionSourceTopics.entrySet() .stream() - .collect(Collectors.toMap(Map.Entry::getKey , e-> + .collect(Collectors.toMap(Map.Entry::getKey, e -> new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))), topicsInfoEntry.getValue().stateChangelogTopics.entrySet() .stream() - .collect(Collectors.toMap(Map.Entry::getKey , e-> + .collect(Collectors.toMap(Map.Entry::getKey, e -> new StreamsAssignmentInterface.TopicInfo(e.getValue().numberOfPartitions(), e.getValue().topicConfigs))) ) @@ -569,22 +582,28 @@ public class StreamThread extends Thread implements ProcessingThread { // TODO: Which of these are actually needed? // TODO: Maybe we want to split this into assignment properties and internal topic configuration properties - HashMap<String, Object> assignmentProperties = new HashMap<>(); + final HashMap<String, Object> assignmentProperties = new HashMap<>(); assignmentProperties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, config.getInt(StreamsConfig.REPLICATION_FACTOR_CONFIG)); assignmentProperties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)); assignmentProperties.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, config.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG)); - assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG)); + assignmentProperties.put(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, + config.getLong(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG)); assignmentProperties.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, config.getInt(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG)); - assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); - assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); + assignmentProperties.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, + config.getLong(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, + config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + config.getString(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, + config.getList(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG)); + assignmentProperties.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + config.getInt(StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG)); return new StreamsAssignmentInterface( processId, - hostInfo.host(), - hostInfo.port(), + new StreamsAssignmentInterface.HostInfo(hostInfo.host(), hostInfo.port()), + null, subtopologyMap, assignmentProperties, config.getClientTags() @@ -662,8 +681,9 @@ public class StreamThread extends Thread implements ProcessingThread { final Queue<StreamsException> nonFatalExceptionsToHandle, final Runnable shutdownErrorHook, final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler, - final java.util.function.Consumer<Long> cacheResizer - ) { + final java.util.function.Consumer<Long> cacheResizer, + final StreamsAssignmentInterface streamsAssignmentInterface, + final StreamsMetadataState streamsMetadataState) { super(threadId); this.stateLock = new Object(); this.adminClient = adminClient; @@ -684,6 +704,8 @@ public class StreamThread extends Thread implements ProcessingThread { this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; this.cacheResizer = cacheResizer; + this.streamsAssignmentInterface = streamsAssignmentInterface; + this.streamsMetadataState = streamsMetadataState; // The following sensors are created here but their references are not stored in this object, since within // this object they are not recorded. The sensors are created here so that the stream threads starts with all @@ -1350,6 +1372,86 @@ public class StreamThread extends Thread implements ProcessingThread { return pollLatency; } + public void maybeHandleAssignmentFromStreamsRebalanceProtocol() { + if (streamsAssignmentInterface != null) { + + if (streamsAssignmentInterface.shutdownRequested()) { + assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code()); + } + + // Process metadata from Streams Rebalance Protocol + final Map<StreamsAssignmentInterface.HostInfo, List<TopicPartition>> hostInfoListMap = streamsAssignmentInterface.partitionsByHost.get(); + final Map<HostInfo, Set<TopicPartition>> convertedHostInfoMap = new HashMap<>(); + hostInfoListMap.forEach((hostInfo, topicPartitions) -> + convertedHostInfoMap.put(new HostInfo(hostInfo.host, hostInfo.port), new HashSet<>(topicPartitions))); + streamsMetadataState.onChange( + convertedHostInfoMap, + Collections.emptyMap(), // TODO: We cannot differentiate between standby and active tasks here?! + getTopicPartitionInfo(convertedHostInfoMap) + ); + + // Process assignment from Streams Rebalance Protocol + final Assignment newAssignment = streamsAssignmentInterface.targetAssignment.get(); + if (!newAssignment.equals(streamsAssignmentInterface.reconciledAssignment.get())) { + + final Map<TaskId, Set<TopicPartition>> activeTasksWithPartitions = + pairWithTopicPartitions(newAssignment.activeTasks.stream()); + final Map<TaskId, Set<TopicPartition>> standbyTasksWithPartitions = + pairWithTopicPartitions(Stream.concat(newAssignment.standbyTasks.stream(), newAssignment.warmupTasks.stream())); + + log.info("Processing new assignment {} from Streams Rebalance Protocol", newAssignment); + + taskManager.handleAssignment( + activeTasksWithPartitions, + standbyTasksWithPartitions + ); + } + } + } + + static Map<TopicPartition, PartitionInfo> getTopicPartitionInfo(final Map<HostInfo, Set<TopicPartition>> partitionsByHost) { + final Map<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<>(); + for (final Set<TopicPartition> value : partitionsByHost.values()) { + for (final TopicPartition topicPartition : value) { + topicToPartitionInfo.put( + topicPartition, + new PartitionInfo( + topicPartition.topic(), + topicPartition.partition(), + null, + new Node[0], + new Node[0] + ) + ); + } + } + return topicToPartitionInfo; + } + + + private Map<TaskId, Set<TopicPartition>> pairWithTopicPartitions(final Stream<StreamsAssignmentInterface.TaskId> taskIdStream) { + return taskIdStream + .collect(Collectors.toMap( + this::toTaskId, + task -> toTopicPartitions(task, streamsAssignmentInterface.subtopologyMap().get(task.subtopologyId)) + )); + } + + private TaskId toTaskId(final StreamsAssignmentInterface.TaskId task) { + return new TaskId(Integer.parseInt(task.subtopologyId), task.partitionId); + } + + private Set<TopicPartition> toTopicPartitions(final StreamsAssignmentInterface.TaskId task, + final Subtopology subTopology) { + return + Stream.concat( + subTopology.sourceTopics.stream(), + subTopology.repartitionSourceTopics.keySet().stream() + ) + .map(t -> new TopicPartition(t, task.partitionId)) + .collect(Collectors.toSet()); + } + /** * Get the next batch of records by polling. * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index 9ee34d8398b..49428724b1a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -51,6 +51,9 @@ public class StreamsRebalanceListener implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(final Collection<TopicPartition> partitions) { + + streamThread.maybeHandleAssignmentFromStreamsRebalanceProtocol(); + // NB: all task management is already handled by: // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 8a3e6c17d8a..9efb5780732 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1447,8 +1447,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ).updateThreadMetadata(adminClientId(CLIENT_ID)); + null, + null, + null).updateThreadMetadata(adminClientId(CLIENT_ID)); final StreamsException thrown = assertThrows(StreamsException.class, thread::run); @@ -2673,8 +2674,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(State.PENDING_SHUTDOWN); @@ -2731,8 +2733,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(State.PENDING_SHUTDOWN); @@ -2798,8 +2801,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(State.PENDING_SHUTDOWN); @@ -2861,8 +2865,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(State.PENDING_SHUTDOWN); @@ -2921,8 +2926,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(State.PENDING_SHUTDOWN); @@ -3154,8 +3160,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ); + null, + null, + null); final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<>()); final Metric testMetric = new KafkaMetric( new Object(), @@ -3210,8 +3217,9 @@ public class StreamThreadTest { new LinkedList<>(), null, (e, b) -> { }, - null - ) { + null, + null, + null) { @Override void runOnceWithProcessingThreads() { setState(StreamThread.State.PENDING_SHUTDOWN); @@ -3589,8 +3597,9 @@ public class StreamThreadTest { new LinkedList<>(), null, null, - null - ); + null, + null, + null); } private TaskManager mockTaskManager(final Task runningTask) { @@ -3710,8 +3719,9 @@ public class StreamThreadTest { new LinkedList<>(), null, HANDLER, - null - ); + null, + null, + null); } private void runOnce(final boolean processingThreadsEnabled) {
