lnbest0707 opened a new pull request, #279: URL: https://github.com/apache/flink-connector-kafka/pull/279
Jira: https://issues.apache.org/jira/browse/FLINK-39837 ## What changed Removed-cluster retention currently preserves the old ASSIGNED status together with each split. When the cluster is re-added after a rescale or global rebalance, those stale owners bypass normal assignment, so the restored ownership can leave newly available subtasks empty. This patch keeps retained splits as offset history only: * When a retained cluster is re-added, the enumerator starts a transient handoff attempt with a fresh ID and deadline, then requests retained offsets from every current reader. * Readers report their local retained offsets without reactivating the splits. The enumerator waits for all currentParallelism() responses, max-merges offsets for each split, converts retained ASSIGNED splits to UNASSIGNED, and starts the cluster through normal assignment. * A partial attempt that times out is discarded completely: no partial offsets are applied, no cluster is started, and the next refresh/retry uses a new handoff ID. Delayed responses from an old ID are ignored. * Reader re-registration invalidates the previous report for that reader. Readers drop a local retained shadow when the same split is freshly assigned to them, and ignore a stale retained copy that arrives after an active copy during restore. * Restored retained shadows are dropped when removed-cluster retention is disabled. The handoff maps and IDs are transient coordinator state. This does not change the checkpoint serializer or its version. ## Tests Automated checks run on this exact OSS branch: * mvn -pl flink-connector-kafka -DskipITs -DskipTests -Drat.skip=true package * Passed: checkstyle, Spotless, compilation, and packaging. * DOCKER_HOST=unix://${HOME}/.colima/default/docker.sock TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock TESTCONTAINERS_HOST_OVERRIDE=127.0.0.1 mvn -pl flink-connector-kafka -DskipITs -Drat.skip=true -Dtest=DynamicKafkaSourceEnumeratorTest,DynamicKafkaSourceReaderTest test * Passed: 47 tests (32 enumerator, 15 reader). * Covers complete all-reader handoff and max-offset merge; fresh assignment after rescale; partial-response timeout without assignment or starting-offset fallback; new ID on retry; delayed old-response rejection; repeated timeout cleanup; reader re-registration invalidation; retry when periodic discovery is disabled; retention-disabled restore; reader reporting without local reactivation; same-reader shadow cleanup; and active-before-retained restore ordering. * DOCKER_HOST=unix://${HOME}/.colima/default/docker.sock TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock TESTCONTAINERS_HOST_OVERRIDE=127.0.0.1 mvn -pl flink-connector-kafka -DskipITs -Drat.skip=true -Dtest=DynamicKafkaSourceEnumStateSerializerTest,DynamicKafkaSourceSplitSerializerTest test * Passed: 6 serializer tests (3 enum-state, 3 split), confirming the existing checkpoint formats remain valid. Additional StreamLink end-to-end validation was run with the equivalent final connector behavior: * A/B comparison: 6 logical clusters x 8 partitions, parallelism 2 -> 8, remove/re-add 2 clusters. The previous connector kept the 16 retained splits on stale owners only (readers 2, 3, 6, 7); this change freshly assigned all 16 at the retained offsets across readers 0-7. * Repeated remove/re-add cycle: 13 of 16 splits had both old and new local shadows; the max merge selected every newer second-removal offset, then fresh assignment covered readers 0-7. * Rapid-churn sequence: 10 logical clusters x 12 partitions, parallelism 3 -> 12 -> 5, with three retained clusters removed and partially re-added before the final re-add. Each cluster used a fresh handoff ID for the final attempt, all 12 reader reports were required, all 36 splits started from the latest retained offsets, and fresh assignment covered readers 0-11 before the downscale and readers 0-4 after the second restore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
