pnowojski commented on a change in pull request #10435: [FLINK-13955][runtime]
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r372939346
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -67,361 +89,422 @@
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
- private FileInputFormat<OUT> format;
- private TypeSerializer<OUT> serializer;
+ private enum ReaderState {
+ IDLE {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ throw new IllegalStateException("not processing
any records in IDLE state");
+ }
+ },
+ /**
+ * A message is enqueued to process split, but no split is
opened.
+ */
+ OPENING { // the split was added and message to itself was
enqueued to process it
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.splits.isEmpty()) {
+ op.switchState(ReaderState.IDLE);
+ return false;
+ } else {
+ op.loadSplit(op.splits.poll());
+ op.switchState(ReaderState.READING);
+ return true;
+ }
+ }
+ },
+ /**
+ * A message is enqueued to process split and its processing
was started.
+ */
+ READING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ return true;
+ }
+
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ op.switchState(ReaderState.IDLE);
+ }
+ },
+ /**
+ * {@link #close()} was called but unprocessed data (records
and splits) remains and needs to be processed.
+ * {@link #close()} caller is blocked.
+ */
+ CLOSING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.currentSplit == null &&
!op.splits.isEmpty()) {
+ op.loadSplit(op.splits.poll());
+ }
+ return true;
+ }
+
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ // need one more mail to unblock possible
yield() in close() method (todo: wait with timeout in yield)
+ op.enqueueMail();
+ op.switchState(CLOSED);
+ }
+ },
+ CLOSED {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ LOG.warn("not processing any records while
closed");
+ return false;
+ }
+ };
+
+ private static final Set<ReaderState> ACCEPT_SPLITS =
EnumSet.of(IDLE, OPENING, READING);
+ /**
+ * Possible transition FROM each state.
+ */
+ private static final Map<ReaderState, Set<ReaderState>>
TRANSITIONS;
+ static {
+ Map<ReaderState, Set<ReaderState>> tmpTransitions = new
HashMap<>();
+ tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED));
+ tmpTransitions.put(OPENING, EnumSet.of(READING,
CLOSING));
+ tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING,
CLOSING));
+ tmpTransitions.put(CLOSING, EnumSet.of(CLOSED));
+ tmpTransitions.put(CLOSED,
EnumSet.noneOf(ReaderState.class));
+ TRANSITIONS = new EnumMap<>(tmpTransitions);
+ }
- private transient Object checkpointLock;
+ public boolean isAcceptingSplits() {
+ return ACCEPT_SPLITS.contains(this);
+ }
- private transient SplitReader<OUT> reader;
- private transient SourceFunction.SourceContext<OUT> readerContext;
+ public final boolean isTerminal() {
+ return this == CLOSED;
+ }
+ public boolean canSwitchTo(ReaderState next) {
+ return TRANSITIONS
+ .getOrDefault(this,
EnumSet.noneOf(ReaderState.class))
+ .contains(next);
+ }
+
+ /**
+ * Prepare to process new record OR split.
+ * @return true if should read the record
+ */
+ public abstract boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException;
+
+ public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ }
+ }
+
+ private transient FileInputFormat<OUT> format;
+ private TypeSerializer<OUT> serializer;
+ private transient MailboxExecutor executor;
+ private transient OUT reusedRecord;
+ private transient SourceFunction.SourceContext<OUT> sourceContext;
private transient ListState<TimestampedFileInputSplit>
checkpointedState;
- private transient List<TimestampedFileInputSplit> restoredReaderState;
+ /**
+ * MUST only be changed via {@link #switchState(ReaderState)
switchState}.
+ */
+ private transient ReaderState state = ReaderState.IDLE;
+ private transient PriorityQueue<TimestampedFileInputSplit> splits;
+ private transient TimestampedFileInputSplit currentSplit; // can't work
just on queue tail because it can change because it's PQ
+ private transient Counter completedSplitsCounter;
+
+ private final transient RunnableWithException runnable = () -> {
+ try {
+ processRecord();
+ } catch (Exception e) {
+ switchState(ReaderState.CLOSED);
+ throw e;
+ }
+ };
+ @VisibleForTesting
public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
this.format = checkNotNull(format);
}
- @Override
- public void setOutputType(TypeInformation<OUT> outTypeInfo,
ExecutionConfig executionConfig) {
- this.serializer = outTypeInfo.createSerializer(executionConfig);
+ public ContinuousFileReaderOperator(FileInputFormat<OUT> format,
MailboxExecutor mailboxExecutor) {
+ this.format = checkNotNull(format);
+ this.executor = checkNotNull(mailboxExecutor);
}
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- checkState(checkpointedState == null, "The reader state has
already been initialized.");
+ checkState(checkpointedState == null, "The reader state has
already been initialized.");
checkpointedState =
context.getOperatorStateStore().getSerializableListState("splits");
int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
- if (context.isRestored()) {
- LOG.info("Restoring state for the {} (taskIdx={}).",
getClass().getSimpleName(), subtaskIdx);
-
- // this may not be null in case we migrate from a
previous Flink version.
- if (restoredReaderState == null) {
- restoredReaderState = new ArrayList<>();
- for (TimestampedFileInputSplit split :
checkpointedState.get()) {
- restoredReaderState.add(split);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} (taskIdx={}) restored
{}.", getClass().getSimpleName(), subtaskIdx, restoredReaderState);
- }
- }
- } else {
+ if (!context.isRestored()) {
LOG.info("No state to restore for the {}
(taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
+ return;
+ }
+
+ LOG.info("Restoring state for the {} (taskIdx={}).",
getClass().getSimpleName(), subtaskIdx);
+
+ splits = splits == null ? new PriorityQueue<>() : splits;
+ for (TimestampedFileInputSplit split : checkpointedState.get())
{
+ splits.add(split);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} (taskIdx={}) restored {}.",
getClass().getSimpleName(), subtaskIdx, splits);
}
}
@Override
public void open() throws Exception {
super.open();
- checkState(this.reader == null, "The reader is already
initialized.");
checkState(this.serializer != null, "The serializer has not
been set. " +
"Probably the setOutputType() was not called. Please
report it.");
+ this.state = ReaderState.IDLE;
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
- this.checkpointLock = getContainingTask().getCheckpointLock();
-
- // set the reader context based on the time characteristic
- final TimeCharacteristic timeCharacteristic =
getOperatorConfig().getTimeCharacteristic();
- final long watermarkInterval =
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
- this.readerContext = StreamSourceContexts.getSourceContext(
- timeCharacteristic,
- getProcessingTimeService(),
- checkpointLock,
- getContainingTask().getStreamStatusMaintainer(),
- output,
- watermarkInterval,
- -1);
-
- // and initialize the split reading thread
- this.reader = new SplitReader<>(format, serializer,
readerContext, checkpointLock, restoredReaderState);
- this.restoredReaderState = null;
- this.reader.start();
+
+ this.sourceContext = StreamSourceContexts.getSourceContext(
+ getOperatorConfig().getTimeCharacteristic(),
+ getProcessingTimeService(),
+ new Object(), // no actual locking needed
+ getContainingTask().getStreamStatusMaintainer(),
+ output,
+
getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+ -1);
+
+ this.reusedRecord = serializer.createInstance();
+ this.completedSplitsCounter =
getMetricGroup().counter("numSplitsProcessed");
+ this.executor = executor != null ? executor :
getContainingTask().getMailboxExecutorFactory().createExecutor(getOperatorConfig().getChainIndex());
+ this.splits = this.splits == null ? new PriorityQueue<>() :
this.splits;
+
+ if (!splits.isEmpty()) {
+ enqueueMail();
+ }
}
@Override
public void processElement(StreamRecord<TimestampedFileInputSplit>
element) throws Exception {
- reader.addSplit(element.getValue());
+ Preconditions.checkState(state.isAcceptingSplits());
+ splits.offer(element.getValue());
+ if (state == ReaderState.IDLE) {
+ enqueueMail();
+ }
}
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- // we do nothing because we emit our own watermarks if needed.
+ private void enqueueMail() {
+ Preconditions.checkState(!state.isTerminal(), "can't enqueue
mail in terminal state %s", state);
+ executor.execute(runnable, "ContinuousFileReaderOperator");
+ if (state == ReaderState.IDLE) {
+ switchState(ReaderState.OPENING);
+ }
}
- @Override
- public void dispose() throws Exception {
- super.dispose();
-
- // first try to cancel it properly and
- // give it some time until it finishes
- reader.cancel();
- try {
- reader.join(200);
- } catch (InterruptedException e) {
- // we can ignore this
+ private void processRecord() throws IOException {
+ if (!state.prepareToProcessRecord(this)) {
+ return;
}
- // if the above did not work, then interrupt the thread
repeatedly
- while (reader.isAlive()) {
+ readAndCollectRecord();
- StringBuilder bld = new StringBuilder();
- StackTraceElement[] stack = reader.getStackTrace();
- for (StackTraceElement e : stack) {
- bld.append(e).append('\n');
- }
- LOG.warn("The reader is stuck in method:\n {}",
bld.toString());
-
- reader.interrupt();
- try {
- reader.join(50);
- } catch (InterruptedException e) {
- // we can ignore this
- }
+ if (format.reachedEnd()) {
+ onSplitProcessed();
+ } else {
+ enqueueMail();
}
- reader = null;
- readerContext = null;
- restoredReaderState = null;
- format = null;
- serializer = null;
}
- @Override
- public void close() throws Exception {
- super.close();
-
- waitSplitReaderFinished();
-
- output.close();
- }
+ private void onSplitProcessed() throws IOException {
+ completedSplitsCounter.inc();
+ LOG.debug("split {} processed: {}",
completedSplitsCounter.getCount(), currentSplit);
+ format.close();
+ currentSplit = null;
- private void waitSplitReaderFinished() throws InterruptedException {
- // make sure that we hold the checkpointing lock
- assert Thread.holdsLock(checkpointLock);
-
- // close the reader to signal that no more splits will come. By
doing this,
- // the reader will exit as soon as it finishes processing the
already pending splits.
- // This method will wait until then. Further cleaning up is
handled by the dispose().
+ if (splits.isEmpty()) {
+ state.onNoMoreData(this);
+ return;
+ }
- while (reader != null && reader.isAlive() &&
reader.isRunning()) {
- reader.close();
- checkpointLock.wait();
+ if (state == ReaderState.READING) {
+ switchState(ReaderState.OPENING);
}
- // finally if we are operating on event or ingestion time,
- // emit the long-max watermark indicating the end of the stream,
- // like a normal source would do.
+ enqueueMail();
+ }
- if (readerContext != null) {
- readerContext.emitWatermark(Watermark.MAX_WATERMARK);
- readerContext.close();
- readerContext = null;
+ private void readAndCollectRecord() throws IOException {
+ Preconditions.checkState(state == ReaderState.READING || state
== ReaderState.CLOSING, "can't process record in state %s", state);
+ if (format.reachedEnd()) {
+ return;
+ }
+ OUT out = format.nextRecord(this.reusedRecord);
+ if (out != null) {
+ sourceContext.collect(out);
}
}
- private class SplitReader<OT> extends Thread {
-
- private volatile boolean shouldClose;
-
- private volatile boolean isRunning;
-
- private final FileInputFormat<OT> format;
- private final TypeSerializer<OT> serializer;
-
- private final Object checkpointLock;
- private final SourceFunction.SourceContext<OT> readerContext;
-
- private final Queue<TimestampedFileInputSplit> pendingSplits;
-
- private TimestampedFileInputSplit currentSplit;
-
- private volatile boolean isSplitOpen;
-
- private SplitReader(FileInputFormat<OT> format,
- TypeSerializer<OT> serializer,
- SourceFunction.SourceContext<OT>
readerContext,
- Object checkpointLock,
- List<TimestampedFileInputSplit>
restoredState) {
+ private void loadSplit(TimestampedFileInputSplit split) throws
IOException {
+ Preconditions.checkState(state != ReaderState.READING && state
!= ReaderState.CLOSED, "can't load split in state %s", state);
+ Preconditions.checkNotNull(split, "split is null");
+ LOG.debug("load split: {}", split);
+ currentSplit = split;
+ format.openInputFormat();
+ if (format instanceof CheckpointableInputFormat &&
currentSplit.getSplitState() != null) {
+ // recovering after a node failure with an input
+ // format that supports resetting the offset
+ ((CheckpointableInputFormat<TimestampedFileInputSplit,
Serializable>) format).
+ reopen(currentSplit,
currentSplit.getSplitState());
+ } else {
+ // we either have a new split, or we recovered from a
node
+ // failure but the input format does not support
resetting the offset.
+ format.open(currentSplit);
+ }
- this.format = checkNotNull(format, "Unspecified
FileInputFormat.");
- this.serializer = checkNotNull(serializer, "Unspecified
Serializer.");
- this.readerContext = checkNotNull(readerContext,
"Unspecified Reader Context.");
- this.checkpointLock = checkNotNull(checkpointLock,
"Unspecified checkpoint lock.");
+ // reset the restored state to null for the next iteration
+ currentSplit.resetSplitState();
+ }
- this.shouldClose = false;
- this.isRunning = true;
+ private void switchState(ReaderState newState) {
+ if (state != newState) {
+ Preconditions.checkState(state.canSwitchTo(newState),
"can't switch state from terminal state %s to %s", state, newState);
+ LOG.debug("switch state: {} -> {}", state, newState);
+ state = newState;
+ }
+ }
- this.pendingSplits = new PriorityQueue<>();
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // we do nothing because we emit our own watermarks if needed.
+ }
- // this is the case where a task recovers from a
previous failed attempt
- if (restoredState != null) {
- this.pendingSplits.addAll(restoredState);
+ @Override
+ public void dispose() throws Exception {
+ Exception e = null;
+ if (state != ReaderState.CLOSED) {
+ try {
+ cleanUp();
+ } catch (Exception ex) {
+ e = ExceptionUtils.firstOrSuppressed(ex, e);
}
}
-
- private void addSplit(TimestampedFileInputSplit split) {
- checkNotNull(split, "Cannot insert a null value in the
pending splits queue.");
- synchronized (checkpointLock) {
- this.pendingSplits.add(split);
- }
+ {
+ checkpointedState = null;
+ completedSplitsCounter = null;
+ currentSplit = null;
+ executor = null;
+ format = null;
+ sourceContext = null;
+ reusedRecord = null;
+ serializer = null;
+ splits = null;
}
-
- public boolean isRunning() {
- return this.isRunning;
+ try {
+ super.dispose();
+ } catch (Exception ex) {
+ e = ExceptionUtils.firstOrSuppressed(ex, e);
}
+ if (e != null) {
+ throw e;
+ }
+ }
- @Override
- public void run() {
- try {
+ @Override
+ public void close() throws Exception {
+ LOG.debug("closing");
+ super.close();
- Counter completedSplitsCounter =
getMetricGroup().counter("numSplitsProcessed");
- this.format.openInputFormat();
-
- while (this.isRunning) {
-
- synchronized (checkpointLock) {
-
- if (currentSplit == null) {
- currentSplit =
this.pendingSplits.poll();
-
- // if the list of
pending splits is empty (currentSplit == null) then:
- // 1) if close() was
called on the operator then exit the while loop
- // 2) if not wait 50
ms and try again to fetch a new split to read
-
- if (currentSplit ==
null) {
- if
(this.shouldClose) {
-
isRunning = false;
- } else {
-
checkpointLock.wait(50);
- }
- continue;
- }
- }
-
- if (this.format instanceof
CheckpointableInputFormat && currentSplit.getSplitState() != null) {
- // recovering after a
node failure with an input
- // format that supports
resetting the offset
-
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>)
this.format).
-
reopen(currentSplit, currentSplit.getSplitState());
- } else {
- // we either have a new
split, or we recovered from a node
- // failure but the
input format does not support resetting the offset.
-
this.format.open(currentSplit);
- }
-
- // reset the restored state to
null for the next iteration
-
this.currentSplit.resetSplitState();
- this.isSplitOpen = true;
- }
-
- LOG.debug("Reading split: " +
currentSplit);
-
- try {
- OT nextElement =
serializer.createInstance();
- while (!format.reachedEnd()) {
- synchronized
(checkpointLock) {
- nextElement =
format.nextRecord(nextElement);
- if (nextElement
!= null) {
-
readerContext.collect(nextElement);
- } else {
- break;
- }
- }
- }
- completedSplitsCounter.inc();
-
- } finally {
- // close and prepare for the
next iteration
- synchronized (checkpointLock) {
- this.format.close();
- this.isSplitOpen =
false;
- this.currentSplit =
null;
- }
- }
+ switch (state) {
+ case IDLE:
+ switchState(ReaderState.CLOSED);
+ break;
+ case CLOSED:
+ LOG.warn("operator is already closed, doing
nothing");
+ return;
+ default:
+ switchState(ReaderState.CLOSING);
+ while (!state.isTerminal()) {
+ executor.yield();
}
+ }
- } catch (Throwable e) {
-
-
getContainingTask().handleAsyncException("Caught exception when processing
split: " + currentSplit, e);
+ try {
+ sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+ } catch (Exception e) {
+ LOG.warn("unable to emit watermark while closing", e);
+ }
- } finally {
- synchronized (checkpointLock) {
- LOG.debug("Reader terminated, and
exiting...");
+ cleanUp();
+ }
- try {
- this.format.closeInputFormat();
- } catch (IOException e) {
-
getContainingTask().handleAsyncException(
- "Caught exception from
" + this.format.getClass().getName() + ".closeInputFormat() : " +
e.getMessage(), e);
- }
- this.isSplitOpen = false;
- this.currentSplit = null;
- this.isRunning = false;
+ private void cleanUp() {
+ LOG.debug("cleanup, state={}", state);
- checkpointLock.notifyAll();
- }
- }
- }
+ RunnableWithException[] runClose = {
+ () -> sourceContext.close(),
+ () -> output.close(),
+ () -> format.close(),
+ () -> format.closeInputFormat()};
+ Exception ef = null;
- private List<TimestampedFileInputSplit> getReaderState() throws
IOException {
- List<TimestampedFileInputSplit> snapshot = new
ArrayList<>(this.pendingSplits.size());
- if (currentSplit != null) {
- if (this.format instanceof
CheckpointableInputFormat && this.isSplitOpen) {
- Serializable formatState =
-
((CheckpointableInputFormat<TimestampedFileInputSplit, Serializable>)
this.format).getCurrentState();
-
this.currentSplit.setSplitState(formatState);
- }
- snapshot.add(this.currentSplit);
+ for (RunnableWithException r : runClose) {
+ try {
+ r.run();
+ } catch (Exception e) {
+ ef = ExceptionUtils.firstOrSuppressed(ef, e);
}
- snapshot.addAll(this.pendingSplits);
- return snapshot;
}
-
- public void cancel() {
- this.isRunning = false;
- }
-
- public void close() {
- this.shouldClose = true;
+ currentSplit = null;
+ if (ef != null) {
+ throw new FlinkRuntimeException("Unable to properly
cleanup ContinuousFileReaderOperator", ef);
}
}
- // --------------------- Checkpointing
--------------------------
-
@Override
public void snapshotState(StateSnapshotContext context) throws
Exception {
super.snapshotState(context);
checkState(checkpointedState != null,
- "The operator state has not been properly
initialized.");
+ "The operator state has not been properly
initialized.");
Review comment:
nit:
1. incorrect formatting (and in couple of lines below).
2. please try to not mix formatting (and refactoring) changes with
functional changes
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services