AHeise commented on a change in pull request #18790:
URL: https://github.com/apache/flink/pull/18790#discussion_r824445935
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
##########
@@ -324,6 +324,8 @@ private SplitContext(String splitId, SplitStateT state) {
SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
if (sourceOutput == null) {
+ // The split output should have been created when
AddSplitsEvent was processed in
+ // SourceOperator. Here we just use this method to get the
created output.
Review comment:
```suggestion
// SourceOperator. Here we just use this method to get the
previously created output.
```
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -516,7 +525,14 @@ public void handleOperatorEvent(OperatorEvent event) {
checkWatermarkAlignment();
} else if (event instanceof AddSplitEvent) {
try {
- sourceReader.addSplits(((AddSplitEvent<SplitT>)
event).splits(splitSerializer));
+ List<SplitT> newSplits = ((AddSplitEvent<SplitT>)
event).splits(splitSerializer);
+ if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
+ outputPendingSplits.addAll(newSplits);
+ } else {
+ newSplits.forEach(
Review comment:
So I guess we have two cases:
- bulk addition of initial splits before first `emitNext`
- bulk addition of initial splits after first `emitNext`
You should ensure that both cases are covered by tests. I'd also recommend
to add inline (java)docs that spell out the cases explicitly. For that I'd
probably extract this `forEach` to a private method (e.g., `setupInitialSplit`
or find a better name).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
##########
@@ -423,6 +422,16 @@ private DataInputStatus emitNextNotReading(DataOutput<OUT>
output) throws Except
}
}
+ private void initializeMainOutput(DataOutput<OUT> output) {
+ currentMainOutput = eventTimeLogic.createMainOutput(output,
this::onWatermarkEmitted);
+ initializeLatencyMarkerEmitter(output);
+ lastInvokedOutput = output;
+ outputPendingSplits.forEach(
+ split ->
currentMainOutput.createOutputForSplit(split.splitId()));
Review comment:
For your concern, we should update the javadoc of SplitReader to say
that you always have to release.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]