leonardBang commented on code in PR #27149: URL: https://github.com/apache/flink/pull/27149#discussion_r2472317867
########## flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A decorative interface {@link Source}. Implementing this interface indicates that the source + * operator needs to report splits to the enumerator and receive reassignment. + */ +@PublicEvolving +public interface SupportSplitReassignmentOnRecovery {} Review Comment: ```suggestion public interface SupportsSplitReassignmentOnRecovery {} ``` ########## flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java: ########## @@ -83,25 +87,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { @Override public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) { - unassignedSplits.addAll(splits); + // add back to same subtaskId. + putPendingAssignments(subtaskId, splits); } @Override public void addReader(int subtaskId) { - List<MockSourceSplit> assignment = new ArrayList<>(); - for (MockSourceSplit split : unassignedSplits) { - if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) { - assignment.add(split); + ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId); + List<MockSourceSplit> splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration(); + + List<MockSourceSplit> redistributedSplits = new ArrayList<>(); + List<MockSourceSplit> addBackSplits = new ArrayList<>(); + for (MockSourceSplit split : splitsOnRecovery) { + if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split not existed in globalSplitAssignment, mean that it's registered first + // time, can be redistibuted. Review Comment: ```suggestion // if the split is not present in globalSplitAssignment, it means that this split is being registered // for the first time and is eligible for redistribution. ``` ########## flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java: ########## @@ -83,25 +87,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { @Override public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) { - unassignedSplits.addAll(splits); + // add back to same subtaskId. + putPendingAssignments(subtaskId, splits); } @Override public void addReader(int subtaskId) { - List<MockSourceSplit> assignment = new ArrayList<>(); - for (MockSourceSplit split : unassignedSplits) { - if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) { - assignment.add(split); + ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId); + List<MockSourceSplit> splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration(); + + List<MockSourceSplit> redistributedSplits = new ArrayList<>(); + List<MockSourceSplit> addBackSplits = new ArrayList<>(); + for (MockSourceSplit split : splitsOnRecovery) { + if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split not existed in globalSplitAssignment, mean that it's registered first + // time, can be redistibuted. + redistributedSplits.add(split); + } else if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split already is assigned to other substaskId, just ignore it. Otherwise, + // addback to this subtaskId again. Review Comment: ```suggestion // if split is already assigned to other sub-task, just ignore it. Otherwise, // add back to this sub-task again. ``` ########## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -215,6 +215,8 @@ private enum OperatingMode { /** Watermark identifier to whether the watermark are aligned. */ private final Map<String, Boolean> watermarkIsAlignedMap; + private final boolean supportSupportSplitReassignmentOnRecovery; Review Comment: ```suggestion private final boolean supportsSplitReassignmentOnRecovery; ``` ########## flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java: ########## @@ -28,20 +29,21 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.stream.Collectors; /** A mock {@link SplitEnumerator} for unit tests. */ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set<MockSourceSplit>>, SupportsBatchSnapshot { - private final SortedSet<MockSourceSplit> unassignedSplits; + // 扩成16个partition, unas Review Comment: change to English ########## flink-core/src/main/java/org/apache/flink/api/connector/source/SupportSplitReassignmentOnRecovery.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * A decorative interface {@link Source}. Implementing this interface indicates that the source Review Comment: ```suggestion * A decorative interface for {@link Source}. Implementing this interface indicates that the source ``` ########## flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java: ########## @@ -50,22 +52,24 @@ public class MockSplitEnumerator public MockSplitEnumerator(int numSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) { this(new HashSet<>(), enumContext); + List<MockSourceSplit> unassignedSplits = new ArrayList<>(); for (int i = 0; i < numSplits; i++) { unassignedSplits.add(new MockSourceSplit(i)); } + calculateAndPutPendingAssignments(unassignedSplits); } public MockSplitEnumerator( Set<MockSourceSplit> unassignedSplits, SplitEnumeratorContext<MockSourceSplit> enumContext) { - this.unassignedSplits = - new TreeSet<>(Comparator.comparingInt(o -> Integer.parseInt(o.splitId()))); - this.unassignedSplits.addAll(unassignedSplits); + this.pendingSplitAssignment = new HashMap<>(); + this.globalSplitAssignment = new HashMap<>(); this.enumContext = enumContext; this.handledSourceEvent = new ArrayList<>(); this.successfulCheckpoints = new ArrayList<>(); this.started = false; this.closed = false; + calculateAndPutPendingAssignments(unassignedSplits); Review Comment: `reassignSplits(unassignedSplits)` ? ########## flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java: ########## @@ -83,25 +87,36 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { @Override public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) { - unassignedSplits.addAll(splits); + // add back to same subtaskId. + putPendingAssignments(subtaskId, splits); } @Override public void addReader(int subtaskId) { - List<MockSourceSplit> assignment = new ArrayList<>(); - for (MockSourceSplit split : unassignedSplits) { - if (Integer.parseInt(split.splitId()) % enumContext.currentParallelism() == subtaskId) { - assignment.add(split); + ReaderInfo readerInfo = enumContext.registeredReaders().get(subtaskId); + List<MockSourceSplit> splitsOnRecovery = readerInfo.getReportedSplitsOnRegistration(); + + List<MockSourceSplit> redistributedSplits = new ArrayList<>(); + List<MockSourceSplit> addBackSplits = new ArrayList<>(); + for (MockSourceSplit split : splitsOnRecovery) { + if (!globalSplitAssignment.containsKey(split.splitId())) { + // if split not existed in globalSplitAssignment, mean that it's registered first + // time, can be redistibuted. Review Comment: Hint: you can ask AI to improve java doc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
