lindong28 commented on code in PR #21589:
URL: https://github.com/apache/flink/pull/21589#discussion_r1066995500


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java:
##########
@@ -113,4 +115,26 @@ public SingleThreadMultiplexSourceReaderBase(
             SourceReaderContext context) {
         super(elementsQueue, splitFetcherManager, recordEmitter, config, 
context);
     }
+
+    /**
+     * This constructor behaves like {@link 
#SingleThreadMultiplexSourceReaderBase(Supplier,
+     * RecordEmitter, Configuration, SourceReaderContext)}, but accepts a 
specific {@link
+     * FutureCompletingBlockingQueue}, {@link RecordEvaluator} and {@link
+     * SingleThreadFetcherManager}.
+     */
+    public SingleThreadMultiplexSourceReaderBase(
+            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> 
elementsQueue,
+            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
+            RecordEmitter<E, T, SplitStateT> recordEmitter,
+            @Nullable RecordEvaluator<T> eofRecordEvaluator,

Review Comment:
   Should we also add `setEofRecordEvaluator(...)` to KafkaSourceBuilder as 
specified in FLIP-208?
   
   Same for PulsarSourceBuilder.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java:
##########
@@ -93,6 +93,14 @@ public void addSplits(List<SplitT> splitsToAdd) {
         }
     }
 
+    @Override
+    public void finishSplits(List<SplitT> finishedSplits) {

Review Comment:
   Would it be better to name it as `removeSplits()` for consistency with 
`addSplits()`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsDeletion.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.splitreader;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * A change to delete splits.
+ *
+ * @param <SplitT> the split type.
+ */
+@Experimental

Review Comment:
   I suppose `SplitsDeletion` is an internal class that will only used within 
the Flink codebase since it is not specified in FLIP-208. Should we mark it 
@internal?
   
   We would need to re-open the discussion in FLIP-208 if we want to add a 
public API.



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FinishSplitsTask.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.fetcher;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import 
org.apache.flink.connector.base.source.reader.splitreader.SplitsDeletion;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/** The task to finish reading some splits. */
+public class FinishSplitsTask<SplitT extends SourceSplit> implements 
SplitFetcherTask {

Review Comment:
   Would it be better to name it as `RemoveSplitsTask` for consistency with 
`AddSplitsTask`?
   
   And should we annotate this class with `@Internal` similar to what we do for 
`AddSplitsTask`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsDeletion.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.splitreader;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.List;
+
+/**
+ * A change to delete splits.
+ *
+ * @param <SplitT> the split type.
+ */
+@Experimental
+public class SplitsDeletion<SplitT> extends SplitsChange<SplitT> {

Review Comment:
   Should we update `KafkaPartitionSplitReader#handleSplitsChanges` to handle 
`SplitsDeletion`?
   
   Also, would it be better to name it `SplitsRemoval` for consistency with 
`SplitAddition`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java:
##########
@@ -328,14 +348,82 @@ private SplitContext(String splitId, SplitStateT state) {
             this.splitId = splitId;
         }
 
-        SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
+        SourceOutput<T> getOrCreateSplitOutput(
+                ReaderOutput<T> mainOutput,
+                @Nullable RecordEvaluator<T> recordEvaluator,
+                SplitT split,
+                SplitFetcherManager<E, SplitT> splitFetcherManager) {
             if (sourceOutput == null) {
                 // The split output should have been created when 
AddSplitsEvent was processed in
                 // SourceOperator. Here we just use this method to get the 
previously created
                 // output.
                 sourceOutput = mainOutput.createOutputForSplit(splitId);
+                if (recordEvaluator != null) {
+                    sourceOutput =
+                            new SourceOutputWrapper<>(
+                                    split, recordEvaluator, sourceOutput, 
splitFetcherManager);
+                }
             }
             return sourceOutput;
         }
     }
+
+    private static final class SourceOutputWrapper<E, T, SplitT extends 
SourceSplit>
+            implements SourceOutput<T> {
+        final SplitT split;
+        final RecordEvaluator<T> recordEvaluator;
+        final SourceOutput<T> sourceOutput;
+        final SplitFetcherManager<E, SplitT> splitFetcherManager;
+
+        private boolean isStreamEnd = false;
+
+        public SourceOutputWrapper(
+                SplitT split,
+                RecordEvaluator<T> recordEvaluator,
+                SourceOutput<T> sourceOutput,
+                SplitFetcherManager<E, SplitT> splitFetcherManager) {
+            this.split = split;
+            this.recordEvaluator = recordEvaluator;
+            this.sourceOutput = sourceOutput;
+            this.splitFetcherManager = splitFetcherManager;
+        }
+
+        @Override
+        public void emitWatermark(Watermark watermark) {
+            sourceOutput.emitWatermark(watermark);
+        }
+
+        @Override
+        public void markIdle() {
+            sourceOutput.markIdle();
+        }
+
+        @Override
+        public void markActive() {
+            sourceOutput.markActive();
+        }
+
+        @Override
+        public void collect(T record) {
+            if (!isStreamEnd) {
+                handleEndOfStreamRecord(record);

Review Comment:
   According to the Java doc of `RecordEvaluator#isEndOfStream(..)`, the given 
record wouldn't be emitted from the source if the returned result is true. 
Should we enforce this guarantee here?
   
   Same for `collect(T record, long timestamp)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to