Savonitar commented on code in PR #279:
URL:
https://github.com/apache/flink-connector-kafka/pull/279#discussion_r3505979200
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -796,26 +857,303 @@ private void retainRemovedClusterEnumeratorStates(
}
long retainedUntilMs = System.currentTimeMillis() +
removedClusterStateRetentionMs;
- activeClusterEnumeratorStates.entrySet().stream()
- .filter(entry ->
!activeKafkaClusterIds.contains(entry.getKey()))
- .forEach(
- entry ->
- retainedClusterEnumeratorStates.put(
- entry.getKey(),
- new
DynamicKafkaSourceEnumState.RetainedClusterState(
- entry.getValue(),
retainedUntilMs)));
+ for (Entry<String, KafkaSourceEnumState> entry :
activeClusterEnumeratorStates.entrySet()) {
+ if (activeKafkaClusterIds.contains(entry.getKey())) {
+ continue;
+ }
+ retainedClusterEnumeratorStates.put(
+ entry.getKey(),
+ new DynamicKafkaSourceEnumState.RetainedClusterState(
+ entry.getValue(), retainedUntilMs));
+ retainedSplitOffsetHandoffs.remove(entry.getKey());
+ }
}
private void pruneExpiredRetainedClusterEnumeratorStates() {
if (removedClusterStateRetentionMs <= 0) {
retainedClusterEnumeratorStates.clear();
+ retainedSplitOffsetHandoffs.clear();
return;
}
long currentTimeMillis = System.currentTimeMillis();
retainedClusterEnumeratorStates
.entrySet()
.removeIf(entry -> entry.getValue().getRetainedUntilMs() <=
currentTimeMillis);
+
retainedSplitOffsetHandoffs.keySet().retainAll(retainedClusterEnumeratorStates.keySet());
+ }
+
+ private void pruneExpiredRetainedSplitOffsetHandoffs() {
+ long currentTimeMillis = System.currentTimeMillis();
+ retainedSplitOffsetHandoffs
+ .entrySet()
+ .removeIf(
+ entry -> {
+ RetainedSplitOffsetHandoff handoff =
entry.getValue();
+ if (!handoff.isExpired(currentTimeMillis)) {
+ return false;
+ }
+ logger.debug(
+ "Discarding timed out retained split
offset handoff for cluster {}: handoffId={}",
+ entry.getKey(),
+ handoff.handoffId);
+ handoff.offsetsByReader.clear();
+ return true;
+ });
+ }
+
+ private boolean isRetainedClusterReadyForAssignment(
+ String kafkaClusterId,
+ DynamicKafkaSourceEnumState.RetainedClusterState
retainedClusterState) {
+ Set<String> activeTopics =
+ latestClusterTopicsMap.getOrDefault(kafkaClusterId,
Collections.emptySet());
+ return
filterStateByTopics(retainedClusterState.getKafkaSourceEnumState(),
activeTopics)
+ .stream()
+ .noneMatch(
+ splitStatus ->
+
splitStatus.assignmentStatus().equals(AssignmentStatus.ASSIGNED));
+ }
+
+ private void startRetainedSplitOffsetHandoff(String kafkaClusterId) {
+ if (retainedSplitOffsetHandoffs.containsKey(kafkaClusterId)) {
+ return;
+ }
+
+ // Keep the attempt bounded without making a fast metadata refresh
interval shorter than a
+ // reader source-event round trip.
+ long handoffTimeoutMs =
+ Math.max(
+ kafkaMetadataServiceDiscoveryIntervalMs,
+ RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS);
+ long deadlineMs = System.currentTimeMillis() + handoffTimeoutMs;
+ RetainedSplitOffsetHandoff handoff =
+ new
RetainedSplitOffsetHandoff(++nextRetainedSplitOffsetHandoffId, deadlineMs);
+ retainedSplitOffsetHandoffs.put(kafkaClusterId, handoff);
+ scheduleRetainedSplitOffsetHandoffRetryIfNeeded();
+ for (int readerId : enumContext.registeredReaders().keySet()) {
+ sendRetainedSplitOffsetRequestToReader(kafkaClusterId, handoff,
readerId);
+ }
+ }
+
+ private void scheduleRetainedSplitOffsetHandoffRetryIfNeeded() {
+ if (kafkaMetadataServiceDiscoveryIntervalMs > 0
+ || retainedSplitOffsetHandoffRetryScheduled) {
+ return;
+ }
+
+ // One-time metadata discovery has no later refresh to discard an
expired handoff. Keep a
+ // single lightweight retry loop after the first handoff; it does not
fetch metadata.
+ retainedSplitOffsetHandoffRetryScheduled = true;
+ kafkaMetadataServiceDiscoveryContext.<Void>callAsync(
+ () -> null,
+ (ignored, t) -> {
+ if (t != null) {
+ throw new RuntimeException("Retained split offset
handoff retry failed", t);
+ }
+ pruneExpiredRetainedClusterEnumeratorStates();
+ pruneExpiredRetainedSplitOffsetHandoffs();
+ maybeStartReadyRetainedClusterEnumerators();
+ },
+ RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS,
+ RETAINED_SPLIT_OFFSET_HANDOFF_MIN_TIMEOUT_MS);
+ }
+
+ private void sendPendingRetainedSplitOffsetRequestsToReader(int readerId) {
+ retainedSplitOffsetHandoffs.forEach(
+ (kafkaClusterId, handoff) ->
+ sendRetainedSplitOffsetRequestToReader(kafkaClusterId,
handoff, readerId));
+ }
+
+ private void sendRetainedSplitOffsetRequestToReader(
+ String kafkaClusterId, RetainedSplitOffsetHandoff handoff, int
readerId) {
+ RequestRetainedSplitOffsetsEvent requestEvent =
+ new RequestRetainedSplitOffsetsEvent(handoff.handoffId,
kafkaClusterId);
+ logger.debug(
+ "Requesting retained split offsets from reader {}: {}",
readerId, requestEvent);
+ enumContext.sendEventToSourceReader(readerId, requestEvent);
+ }
+
+ private void handleRetainedSplitOffsetsEvent(
+ int subtaskId, RetainedSplitOffsetsEvent
retainedSplitOffsetsEvent) {
+ if (!enumContext.registeredReaders().containsKey(subtaskId)) {
+ logger.debug("Ignoring retained split offsets from unavailable
reader {}", subtaskId);
+ return;
+ }
+ pruneExpiredRetainedClusterEnumeratorStates();
+ String kafkaClusterId = retainedSplitOffsetsEvent.getKafkaClusterId();
+ RetainedSplitOffsetHandoff handoff =
retainedSplitOffsetHandoffs.get(kafkaClusterId);
+ if (handoff == null || handoff.handoffId !=
retainedSplitOffsetsEvent.getHandoffId()) {
+ logger.debug(
+ "Ignoring stale retained split offsets from reader {}: {}",
+ subtaskId,
+ retainedSplitOffsetsEvent);
+ return;
+ }
+ if (handoff.isExpired(System.currentTimeMillis())) {
+ logger.debug(
+ "Ignoring retained split offsets from timed out handoff
{}: {}",
+ handoff.handoffId,
+ retainedSplitOffsetsEvent);
+ clearRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+ return;
+ }
+
+ handoff.offsetsByReader.put(subtaskId,
retainedSplitOffsetsEvent.getRetainedSplitOffsets());
+ if (handoff.offsetsByReader.size() >=
enumContext.currentParallelism()) {
+ applyRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+ }
+ maybeStartReadyRetainedClusterEnumerators();
+ }
+
+ private void applyRetainedSplitOffsetHandoff(
+ String kafkaClusterId, RetainedSplitOffsetHandoff handoff) {
+ DynamicKafkaSourceEnumState.RetainedClusterState retainedClusterState =
+ retainedClusterEnumeratorStates.get(kafkaClusterId);
+ if (retainedClusterState == null) {
+ clearRetainedSplitOffsetHandoff(kafkaClusterId, handoff);
+ return;
+ }
+
+ KafkaSourceEnumState kafkaSourceEnumState =
retainedClusterState.getKafkaSourceEnumState();
+ Map<String, Long> retainedSplitOffsets = handoff.mergedOffsets();
+ Set<String> activeTopics =
+ latestClusterTopicsMap.getOrDefault(kafkaClusterId,
Collections.emptySet());
+ Set<SplitAndAssignmentStatus> updatedSplits = new HashSet<>();
+ for (SplitAndAssignmentStatus splitStatus :
kafkaSourceEnumState.splits()) {
+ if (!activeTopics.contains(splitStatus.split().getTopic())) {
+ updatedSplits.add(splitStatus);
+ continue;
+ }
+ if
(splitStatus.assignmentStatus().equals(AssignmentStatus.ASSIGNED)) {
+ Long retainedSplitOffset =
+ retainedSplitOffsets.get(
+ toDynamicSplitId(kafkaClusterId,
splitStatus.split()));
+ if (retainedSplitOffset == null) {
+ // No reader retains this offset anymore; let normal
discovery recreate it.
Review Comment:
1. Is my understanding correct, in this case, it will be re-discovered as a
"new" partition, so it comes back from newDiscoveryOffsetsInitializer
(earliest) rather than the configured starting offset, e.g. a latest(), and
source would rewind to the start of the topic.
Could this null branch fall back to the split's own retained offset instead
of dropping it, so it resumes from the last known position?
2. In this case, will sink topic have duplicates even in EOS mode?
##########
docs/content/docs/connectors/datastream/dynamic-kafka.md:
##########
@@ -242,6 +242,8 @@ By default, metadata removal also removes that cluster's
split offsets from subs
To keep removed cluster offsets available for a later re-add or restore, set
`stream-metadata-removed-cluster-retention-ms` to a positive duration. For
example,
`604800000` retains removed cluster state for seven days before the source
stops checkpointing it.
+If the cluster is re-added, the source uses the retained offsets but computes
fresh reader
+assignments instead of reusing their previous owners.
Review Comment:
should we mention this branch/reset behavior
https://github.com/apache/flink-connector-kafka/pull/279/changes#r3505979200
which sounds like an exception? if we go with the suggested workflow.
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java:
##########
@@ -748,6 +796,19 @@ public void addReader(int subtaskId) {
// assign pending splits from the sub enumerator
clusterEnumeratorMap.forEach(
(cluster, subEnumerator) ->
subEnumerator.addReader(subtaskId));
+ if (!retainedSplitOffsetHandoffs.isEmpty()) {
+ pruneExpiredRetainedSplitOffsetHandoffs();
+ // A reader can join while a re-added cluster is waiting for
offset handoff. Send
+ // metadata first so the reader has reconciled its local retained
state before it
+ // answers the request. Restart the attempt so delayed responses
from the reader's
+ // previous attempt cannot count as its replacement report.
+ retainedSplitOffsetHandoffs
+ .values()
+ .forEach(handoff -> handoff.offsetsByReader.clear());
Review Comment:
Can I clarify, a single slow subtask that keeps going down and coming back
up can repeatedly clear the other reader's reports and involve a delay?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]