lnbest0707 opened a new pull request, #279:
URL: https://github.com/apache/flink-connector-kafka/pull/279

   Jira: https://issues.apache.org/jira/browse/FLINK-39837
   
   ## What changed
   
   Removed-cluster retention currently preserves the old ASSIGNED status 
together with each split. When the cluster is re-added after a rescale or 
global rebalance, those stale owners bypass normal assignment, so the restored 
ownership can leave newly available subtasks empty.
   
   This patch keeps retained splits as offset history only:
   
   * When a retained cluster is re-added, the enumerator starts a transient 
handoff attempt with a fresh ID and deadline, then requests retained offsets 
from every current reader.
   * Readers report their local retained offsets without reactivating the 
splits. The enumerator waits for all currentParallelism() responses, max-merges 
offsets for each split, converts retained ASSIGNED splits to UNASSIGNED, and 
starts the cluster through normal assignment.
   * A partial attempt that times out is discarded completely: no partial 
offsets are applied, no cluster is started, and the next refresh/retry uses a 
new handoff ID. Delayed responses from an old ID are ignored.
   * Reader re-registration invalidates the previous report for that reader. 
Readers drop a local retained shadow when the same split is freshly assigned to 
them, and ignore a stale retained copy that arrives after an active copy during 
restore.
   * Restored retained shadows are dropped when removed-cluster retention is 
disabled.
   
   The handoff maps and IDs are transient coordinator state. This does not 
change the checkpoint serializer or its version.
   
   ## Tests
   
   Automated checks run on this exact OSS branch:
   
   * mvn -pl flink-connector-kafka -DskipITs -DskipTests -Drat.skip=true package
     * Passed: checkstyle, Spotless, compilation, and packaging.
   * DOCKER_HOST=unix://${HOME}/.colima/default/docker.sock 
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock 
TESTCONTAINERS_HOST_OVERRIDE=127.0.0.1 mvn -pl flink-connector-kafka -DskipITs 
-Drat.skip=true 
-Dtest=DynamicKafkaSourceEnumeratorTest,DynamicKafkaSourceReaderTest test
     * Passed: 47 tests (32 enumerator, 15 reader).
     * Covers complete all-reader handoff and max-offset merge; fresh 
assignment after rescale; partial-response timeout without assignment or 
starting-offset fallback; new ID on retry; delayed old-response rejection; 
repeated timeout cleanup; reader re-registration invalidation; retry when 
periodic discovery is disabled; retention-disabled restore; reader reporting 
without local reactivation; same-reader shadow cleanup; and 
active-before-retained restore ordering.
   * DOCKER_HOST=unix://${HOME}/.colima/default/docker.sock 
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE=/var/run/docker.sock 
TESTCONTAINERS_HOST_OVERRIDE=127.0.0.1 mvn -pl flink-connector-kafka -DskipITs 
-Drat.skip=true 
-Dtest=DynamicKafkaSourceEnumStateSerializerTest,DynamicKafkaSourceSplitSerializerTest
 test
     * Passed: 6 serializer tests (3 enum-state, 3 split), confirming the 
existing checkpoint formats remain valid.
   
   Additional StreamLink end-to-end validation was run with the equivalent 
final connector behavior:
   
   * A/B comparison: 6 logical clusters x 8 partitions, parallelism 2 -> 8, 
remove/re-add 2 clusters. The previous connector kept the 16 retained splits on 
stale owners only (readers 2, 3, 6, 7); this change freshly assigned all 16 at 
the retained offsets across readers 0-7.
   * Repeated remove/re-add cycle: 13 of 16 splits had both old and new local 
shadows; the max merge selected every newer second-removal offset, then fresh 
assignment covered readers 0-7.
   * Rapid-churn sequence: 10 logical clusters x 12 partitions, parallelism 3 
-> 12 -> 5, with three retained clusters removed and partially re-added before 
the final re-add. Each cluster used a fresh handoff ID for the final attempt, 
all 12 reader reports were required, all 36 splits started from the latest 
retained offsets, and fresh assignment covered readers 0-11 before the 
downscale and readers 0-4 after the second restore.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to