AHeise commented on a change in pull request #10435: [FLINK-13955][runtime]
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370050076
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -96,332 +222,274 @@ public void initializeState(StateInitializationContext
context) throws Exception
checkpointedState =
context.getOperatorStateStore().getSerializableListState("splits");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
- if (context.isRestored()) {
- LOG.info("Restoring state for the {} (taskIdx={}).",
getClass().getSimpleName(), subtaskIdx);
+ if (!context.isRestored()) {
+ LOG.info("No state to restore for the {}
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+ return;
+ }
+ if (splits != null) {
// this may not be null in case we migrate from a
previous Flink version.
- if (restoredReaderState == null) {
- restoredReaderState = new ArrayList<>();
- for (TimestampedFileInputSplit split :
checkpointedState.get()) {
- restoredReaderState.add(split);
- }
+ return;
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} (taskIdx={}) restored
{}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
- }
- }
- } else {
- LOG.info("No state to restore for the {}
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+ LOG.info("Restoring state for the {} (taskIdx={}).",
getClass().getSimpleName(), subtaskIdx);
+
+ splits = new PriorityQueue<>();
+ for (TimestampedFileInputSplit split : checkpointedState.get())
{
+ splits.add(split);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} (taskIdx={}) restored {}.",
getClass().getSimpleName(), subtaskIdx, splits);
}
}
@Override
public void open() throws Exception {
super.open();
- checkState(this.reader == null, "The reader is already
initialized.");
checkState(this.serializer != null, "The serializer has not
been set. " +
"Probably the setOutputType() was not called. Please
report it.");
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
- this.checkpointLock = getContainingTask().getCheckpointLock();
-
- // set the reader context based on the time characteristic
- final TimeCharacteristic timeCharacteristic =
getOperatorConfig().getTimeCharacteristic();
- final long watermarkInterval =
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
- this.readerContext = StreamSourceContexts.getSourceContext(
- timeCharacteristic,
- getProcessingTimeService(),
- checkpointLock,
- getContainingTask().getStreamStatusMaintainer(),
- output,
- watermarkInterval,
- -1);
-
- // and initialize the split reading thread
- this.reader = new SplitReader<>(format, serializer,
readerContext, checkpointLock, restoredReaderState);
- this.restoredReaderState = null;
- this.reader.start();
+
+ this.sourceContext = StreamSourceContexts.getSourceContext(
+ getOperatorConfig().getTimeCharacteristic(),
+ getProcessingTimeService(),
+ new Object(), // no actual locking needed
+ getContainingTask().getStreamStatusMaintainer(),
+ output,
+
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+ -1);
+
+ this.reusedRecord = serializer.createInstance();
+ this.completedSplitsCounter =
getMetricGroup().counter("numSplitsProcessed");
+ this.executor =
getContainingTask().getMailboxExecutorFactory().createExecutor(getOperatorConfig().getChainIndex());
Review comment:
Ideally, you should only retrieve an executor through
YieldingOperatorFactory.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services