rkhachatryan commented on a change in pull request #10435:
[FLINK-13955][runtime] migrate ContinuousFileReaderOperator to the mailbox
execution model
URL: https://github.com/apache/flink/pull/10435#discussion_r373036424
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
##########
@@ -67,361 +89,422 @@
private static final Logger LOG =
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
- private FileInputFormat<OUT> format;
- private TypeSerializer<OUT> serializer;
+ private enum ReaderState {
+ IDLE {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ throw new IllegalStateException("not processing
any records in IDLE state");
+ }
+ },
+ /**
+ * A message is enqueued to process split, but no split is
opened.
+ */
+ OPENING { // the split was added and message to itself was
enqueued to process it
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.splits.isEmpty()) {
+ op.switchState(ReaderState.IDLE);
+ return false;
+ } else {
+ op.loadSplit(op.splits.poll());
+ op.switchState(ReaderState.READING);
+ return true;
+ }
+ }
+ },
+ /**
+ * A message is enqueued to process split and its processing
was started.
+ */
+ READING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ return true;
+ }
+
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ op.switchState(ReaderState.IDLE);
+ }
+ },
+ /**
+ * {@link #close()} was called but unprocessed data (records
and splits) remains and needs to be processed.
+ * {@link #close()} caller is blocked.
+ */
+ CLOSING {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException {
+ if (op.currentSplit == null &&
!op.splits.isEmpty()) {
+ op.loadSplit(op.splits.poll());
+ }
+ return true;
+ }
+
+ @Override
+ public void
onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ // need one more mail to unblock possible
yield() in close() method (todo: wait with timeout in yield)
+ op.enqueueMail();
+ op.switchState(CLOSED);
+ }
+ },
+ CLOSED {
+ @Override
+ public boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) {
+ LOG.warn("not processing any records while
closed");
+ return false;
+ }
+ };
+
+ private static final Set<ReaderState> ACCEPT_SPLITS =
EnumSet.of(IDLE, OPENING, READING);
+ /**
+ * Possible transition FROM each state.
+ */
+ private static final Map<ReaderState, Set<ReaderState>>
TRANSITIONS;
+ static {
+ Map<ReaderState, Set<ReaderState>> tmpTransitions = new
HashMap<>();
+ tmpTransitions.put(IDLE, EnumSet.of(OPENING, CLOSED));
+ tmpTransitions.put(OPENING, EnumSet.of(READING,
CLOSING));
+ tmpTransitions.put(READING, EnumSet.of(IDLE, OPENING,
CLOSING));
+ tmpTransitions.put(CLOSING, EnumSet.of(CLOSED));
+ tmpTransitions.put(CLOSED,
EnumSet.noneOf(ReaderState.class));
+ TRANSITIONS = new EnumMap<>(tmpTransitions);
+ }
- private transient Object checkpointLock;
+ public boolean isAcceptingSplits() {
+ return ACCEPT_SPLITS.contains(this);
+ }
- private transient SplitReader<OUT> reader;
- private transient SourceFunction.SourceContext<OUT> readerContext;
+ public final boolean isTerminal() {
+ return this == CLOSED;
+ }
+ public boolean canSwitchTo(ReaderState next) {
+ return TRANSITIONS
+ .getOrDefault(this,
EnumSet.noneOf(ReaderState.class))
+ .contains(next);
+ }
+
+ /**
+ * Prepare to process new record OR split.
+ * @return true if should read the record
+ */
+ public abstract boolean
prepareToProcessRecord(ContinuousFileReaderOperator<?> op) throws IOException;
+
+ public void onNoMoreData(ContinuousFileReaderOperator<?> op) {
+ }
+ }
+
+ private transient FileInputFormat<OUT> format;
+ private TypeSerializer<OUT> serializer;
+ private transient MailboxExecutor executor;
+ private transient OUT reusedRecord;
+ private transient SourceFunction.SourceContext<OUT> sourceContext;
private transient ListState<TimestampedFileInputSplit>
checkpointedState;
- private transient List<TimestampedFileInputSplit> restoredReaderState;
+ /**
+ * MUST only be changed via {@link #switchState(ReaderState)
switchState}.
+ */
+ private transient ReaderState state = ReaderState.IDLE;
+ private transient PriorityQueue<TimestampedFileInputSplit> splits;
+ private transient TimestampedFileInputSplit currentSplit; // can't work
just on queue tail because it can change because it's PQ
+ private transient Counter completedSplitsCounter;
Review comment:
`final`: the fields are nulled out in `dispose()` method (as in the previous
version; not sure how important it is). And some of them can't be initialized
in the constructor, because they use parent (`AbstractStreamOperator`)
`transient` fields.
`transient`: you're right, but I think it makes sense to keep `transient`
modifier consistent at least within a hierarchy
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services