AHeise commented on a change in pull request #10435: [FLINK-13955][runtime]
migrate ContinuousFileReaderOperator to the mailbox execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r370060580
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -54,39 +62,157 @@
* {@link ContinuousFileMonitoringFunction}. Contrary to the {@link
ContinuousFileMonitoringFunction}
* which has a parallelism of 1, this operator can have DOP > 1.
*
- * <p>As soon as a split descriptor is received, it is put in a queue, and
have another
- * thread read the actual data of the split. This architecture allows the
separation of the
- * reading thread from the one emitting the checkpoint barriers, thus removing
any potential
- * back-pressure.
+ * <p>This implementation uses {@link MailboxExecutor} to execute each action
and states:<ol>
+ * <li>start in {@link ReaderState#IDLE IDLE}</li>
+ * <li>upon receiving a split add it to the queue, switch to {@link
ReaderState#OPENING OPENING} and enqueue a
+ * {@link org.apache.flink.streaming.runtime.tasks.mailbox.Mail mail} with
self as {@link Runnable}</li>
+ * <li>open file, switch to {@link ReaderState#READING READING}, read one
record, re-enqueue self</li>
+ * <li>if no more records or splits available, switch back to {@link
ReaderState#IDLE IDLE}</li>
+ * </ol>
+ * On close:
+ * <ol>
+ * <li>if {@link ReaderState#IDLE IDLE} then close immediately</li>
+ * <li>otherwise switch to {@link ReaderState#CLOSING CLOSING}, call
{@link MailboxExecutor#yield() yield} in a loop
+ * until state is {@link ReaderState#CLOSED CLOSED}</li>
+ * <li>{@link MailboxExecutor#yield() yield()} causes remaining records
(and splits) to be processed in the same way as above</li>
+ * </ol></p>
+ * <p>Using {@link MailboxExecutor} allows to avoid explicit synchronization.
At most one mail should be enqueued at any
+ * given time.</p>
+ * <p>Using FSM approach allows to explicitly define states and enforce {@link
ReaderState#TRANSITIONS transitions} between them.</p>
*/
@Internal
public class ContinuousFileReaderOperator<OUT> extends
AbstractStreamOperator<OUT>
- implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>,
OutputTypeConfigurable<OUT> {
+ implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>,
OutputTypeConfigurable<OUT>, RunnableWithException {
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
- private FileInputFormat<OUT> format;
- private TypeSerializer<OUT> serializer;
+ private enum ReaderState {
+ IDLE {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ LOG.warn("not processing any records in IDLE
state");
+ return false;
+ }
+ },
+ /**
+ * A message is enqueued to process split, but no split is
opened.
+ */
+ OPENING { // the split was added and message to itself was
enqueued to process it
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.splits.isEmpty()) {
+ op.switchState(ReaderState.IDLE);
+ return false;
+ } else {
+ op.loadSplit(op.splits.poll());
+ op.switchState(ReaderState.READING);
+ return true;
+ }
+ }
+ },
+ /**
+ * A message is enqueued to process split and its processing
was started.
+ */
+ READING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ return true;
+ }
- private transient Object checkpointLock;
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ op.switchState(ReaderState.IDLE);
+ }
+ },
+ /**
+ * {@link #close()} was called but unprocessed data (records
and splits) remains and needs to be processed.
+ * {@link #close()} caller is blocked.
+ */
+ CLOSING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.currentSplit == null &&
!op.splits.isEmpty()) {
+ op.loadSplit(op.splits.poll());
+ }
+ return true;
+ }
- private transient SplitReader<OUT> reader;
- private transient SourceFunction.SourceContext<OUT> readerContext;
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ // need one more mail to unblock possible
yield() in close() method (todo: wait with timeout in yield)
+ op.enqueueMail();
+ op.switchState(CLOSED);
+ }
+ },
+ CLOSED {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ LOG.warn("not processing any records while
closed");
+ return false;
+ }
+ };
+
+ private static final Set<ReaderState> ACCEPT_SPLITS =
EnumSet.of(IDLE, OPENING, READING);
+ /**
+ * Possible transition FROM each state.
+ */
+ private static final Map<ReaderState, Set<ReaderState>>
TRANSITIONS;
+ static {
+ Map<ReaderState, Set<ReaderState>> tmpTransitions = new
HashMap<>();
+ tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED));
+ tmpTransitions.put(OPENING, EnumSet.of(READING,
CLOSING));
+ tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING,
CLOSING));
+ tmpTransitions.put(CLOSING, EnumSet.of(CLOSED));
+ tmpTransitions.put(CLOSED,
EnumSet.noneOf(ReaderState.class));
+ TRANSITIONS = new EnumMap<>(tmpTransitions);
+ }
+
+ public boolean isAcceptingSplits() {
+ return ACCEPT_SPLITS.contains(this);
+ }
+
+ public final boolean isTerminal() {
+ return this == CLOSED;
+ }
+
+ public boolean canSwitchTo(ReaderState next) {
+ return TRANSITIONS
+ .getOrDefault(this,
EnumSet.noneOf(ReaderState.class))
+ .contains(next);
+ }
Review comment:
Could be replaced by passing that information to the constructors of the
Enums.
`ReaderState(boolean acceptingSplits, boolean isTerminal, ReaderState...
allowedTransitions)`
Transitions may introduce cyclic dependencies, so not 100% sure if this part
is indeed working.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services