openinx commented on a change in pull request #1793:
URL: https://github.com/apache/iceberg/pull/1793#discussion_r553304000



##########
File path: 
flink/src/main/java/org/apache/iceberg/flink/source/StreamingReaderOperator.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.io.IOException;
+import java.util.Queue;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.MailboxExecutor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.table.data.RowData;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The operator that reads the {@link FlinkInputSplit splits} received from 
the preceding {@link
+ * StreamingMonitorFunction}. Contrary to the {@link StreamingMonitorFunction} 
which has a parallelism of 1,
+ * this operator can have DOP > 1.
+ *
+ * <p>As soon as a split descriptor is received, it is put in a queue, and use 
{@link MailboxExecutor}
+ * read the actual data of the split. This architecture allows the separation 
of the reading thread from the one split
+ * processing the checkpoint barriers, thus removing any potential 
back-pressure.
+ */
+public class StreamingReaderOperator extends AbstractStreamOperator<RowData>
+    implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingReaderOperator.class);
+  private final MailboxExecutorImpl executor;
+  private FlinkInputFormat format;
+
+  private transient SourceFunction.SourceContext<RowData> readerContext;
+
+  private transient ListState<FlinkInputSplit> checkpointState;
+  private transient Queue<FlinkInputSplit> splits;
+
+  private StreamingReaderOperator(
+      FlinkInputFormat format, ProcessingTimeService timeService, 
MailboxExecutor mailboxExecutor) {
+    this.format = Preconditions.checkNotNull(format, "The InputFormat should 
not be null.");
+    this.processingTimeService = timeService;
+    this.executor = (MailboxExecutorImpl) Preconditions.checkNotNull(
+        mailboxExecutor, "The mailboxExecutor should not be null.");
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
+
+    checkpointState = context.getOperatorStateStore().getListState(
+        new ListStateDescriptor<>("splits", new JavaSerializer<>()));
+
+    int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
+    splits = Lists.newLinkedList();
+    if (context.isRestored()) {
+      LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIdx);
+
+      for (FlinkInputSplit split : checkpointState.get()) {
+        splits.add(split);
+      }
+    }
+
+    this.readerContext = StreamSourceContexts.getSourceContext(
+        getOperatorConfig().getTimeCharacteristic(),
+        getProcessingTimeService(),
+        new Object(), // no actual locking needed
+        getContainingTask().getStreamStatusMaintainer(),
+        output,
+        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(),
+        -1);
+
+    if (!splits.isEmpty()) {
+      enqueueProcessSplits();
+    }
+  }
+
+  @Override
+  public void processElement(StreamRecord<FlinkInputSplit> element) {
+    splits.offer(element.getValue());
+    enqueueProcessSplits();
+  }
+
+  private void enqueueProcessSplits() {
+    executor.execute(this::processSplits, this.getClass().getSimpleName());
+  }
+
+  private void processSplits() throws IOException {
+    do {
+      FlinkInputSplit split = splits.poll();
+      if (split == null) {
+        return;
+      }
+
+      LOG.debug("load split: {}", split);
+      format.open(split);
+
+      RowData nextElement = null;
+      while (!format.reachedEnd()) {
+        nextElement = format.nextRecord(nextElement);
+        if (nextElement != null) {
+          readerContext.collect(nextElement);
+        } else {
+          break;
+        }
+      }
+
+      format.close();
+    } while (executor.isIdle());
+    enqueueProcessSplits();

Review comment:
       That's a good question.   This method `processSplits` will emit all 
records from one `CombinedScanTask` to downstream and then yield the current 
thread (run out of the `while` loop ) &  re-enqueue the `processSplits` in 
`Mailbox` (Which is similar to the producer-consumer model, the caller do the 
enqueue then the single thread `executor` to consume from it).   
   
   We will always yield from the `executor` thread after it has processed a 
split and then re-enqueue because we don't want to hold the __checkpointLock__ 
too long  (step.1) so that flink runtime won't wait long-time to trigger a new 
checkpoint (step.2) .   Otherwise if we hold the __checkpointLock__  and then 
process the splits one by one in the same `processSplits`,  finally the 
checkpoint will timeout so many times.  In an extreme case,  the flink 
streaming job will never checkpoint successfully if the splits data is huge.  
   
   (Notice, for step.1,  when processing splits in `executor` thread we will be 
granted the __checkpointLock__,  that means nobody can start a new checkpoint 
unless the `executor` finished the current split processing;  for step.2     
anybody who want to trigger a new checkpoint will need to grab the 
__checkpointLock__ firstly )




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to