pnowojski commented on a change in pull request #8826: [FLINK-12479][operators]
Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r299904841
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
##########
@@ -18,213 +18,17 @@
package org.apache.flink.streaming.runtime.io;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
-import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
-import org.apache.flink.streaming.runtime.tasks.OperatorChain;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.runtime.io.AvailabilityListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
+import java.io.Closeable;
/**
- * Input reader for {@link
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * <p>This internally uses a {@link StatusWatermarkValve} to keep track of
{@link Watermark} and
- * {@link StreamStatus} events, and forwards them to event subscribers once the
- * {@link StatusWatermarkValve} determines the {@link Watermark} from all
inputs has advanced, or
- * that a {@link StreamStatus} needs to be propagated downstream to denote a
status change.
- *
- * <p>Forwarding elements, watermarks, or status status elements must be
protected by synchronizing
- * on the given lock object. This ensures that we don't call methods on a
- * {@link OneInputStreamOperator} concurrently with the timer callback or
other things.
- *
- * @param <IN> The type of the record that can be read with this record reader.
+ * Interface for processing records by StreamTask.
*/
-@Internal
-public class StreamInputProcessor<IN> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(StreamInputProcessor.class);
-
- private final StreamTaskInput input;
-
- private final Object lock;
-
- private final OperatorChain<?, ?> operatorChain;
-
- // ---------------- Status and Watermark Valve ------------------
-
- /** Valve that controls how watermarks and stream statuses are
forwarded. */
- private StatusWatermarkValve statusWatermarkValve;
-
- private final StreamStatusMaintainer streamStatusMaintainer;
-
- private final OneInputStreamOperator<IN, ?> streamOperator;
-
- // ---------------- Metrics ------------------
-
- private final WatermarkGauge watermarkGauge;
- private Counter numRecordsIn;
-
- @SuppressWarnings("unchecked")
- public StreamInputProcessor(
- InputGate[] inputGates,
- TypeSerializer<IN> inputSerializer,
- StreamTask<?, ?> checkpointedTask,
- CheckpointingMode checkpointMode,
- Object lock,
- IOManager ioManager,
- Configuration taskManagerConfig,
- StreamStatusMaintainer streamStatusMaintainer,
- OneInputStreamOperator<IN, ?> streamOperator,
- TaskIOMetricGroup metrics,
- WatermarkGauge watermarkGauge,
- String taskName,
- OperatorChain<?, ?> operatorChain) throws IOException {
-
- InputGate inputGate = InputGateUtil.createInputGate(inputGates);
-
- CheckpointBarrierHandler barrierHandler =
InputProcessorUtil.createCheckpointBarrierHandler(
- checkpointedTask,
- checkpointMode,
- ioManager,
- inputGate,
- taskManagerConfig,
- taskName);
- this.input = new StreamTaskNetworkInput(barrierHandler,
inputSerializer, ioManager, 0);
-
- this.lock = checkNotNull(lock);
-
- this.streamStatusMaintainer =
checkNotNull(streamStatusMaintainer);
- this.streamOperator = checkNotNull(streamOperator);
-
- this.statusWatermarkValve = new StatusWatermarkValve(
- inputGate.getNumberOfInputChannels(),
- new ForwardingValveOutputHandler(streamOperator, lock));
-
- this.watermarkGauge = watermarkGauge;
- metrics.gauge("checkpointAlignmentTime",
barrierHandler::getAlignmentDurationNanos);
-
- this.operatorChain = checkNotNull(operatorChain);
- }
-
- public boolean processInput() throws Exception {
- initializeNumRecordsIn();
-
- StreamElement recordOrMark = input.pollNextNullable();
- if (recordOrMark == null) {
- input.isAvailable().get();
- return !checkFinished();
- }
- int channel = input.getLastChannel();
- checkState(channel != StreamTaskInput.UNSPECIFIED);
-
- processElement(recordOrMark, channel);
- return true;
- }
-
- private void processElement(StreamElement recordOrMark, int channel)
throws Exception {
- if (recordOrMark.isRecord()) {
- // now we can do the actual processing
- StreamRecord<IN> record = recordOrMark.asRecord();
- synchronized (lock) {
- numRecordsIn.inc();
- streamOperator.setKeyContextElement1(record);
- streamOperator.processElement(record);
- }
- }
- else if (recordOrMark.isWatermark()) {
- // handle watermark
-
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel);
- } else if (recordOrMark.isStreamStatus()) {
- // handle stream status
-
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
- } else if (recordOrMark.isLatencyMarker()) {
- // handle latency marker
- synchronized (lock) {
-
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
- }
- } else {
- throw new UnsupportedOperationException("Unknown type
of StreamElement");
- }
- }
-
- private void initializeNumRecordsIn() {
- if (numRecordsIn == null) {
- try {
- numRecordsIn = ((OperatorMetricGroup)
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
- } catch (Exception e) {
- LOG.warn("An exception occurred during the
metrics setup.", e);
- numRecordsIn = new SimpleCounter();
- }
- }
- }
-
- private boolean checkFinished() throws Exception {
- boolean isFinished = input.isFinished();
- if (isFinished) {
- synchronized (lock) {
- operatorChain.endInput(1);
- }
- }
- return isFinished;
- }
-
- public void cleanup() throws Exception {
- input.close();
- }
-
- private class ForwardingValveOutputHandler implements
StatusWatermarkValve.ValveOutputHandler {
- private final OneInputStreamOperator<IN, ?> operator;
- private final Object lock;
-
- private ForwardingValveOutputHandler(final
OneInputStreamOperator<IN, ?> operator, final Object lock) {
- this.operator = checkNotNull(operator);
- this.lock = checkNotNull(lock);
- }
-
- @Override
- public void handleWatermark(Watermark watermark) {
- try {
- synchronized (lock) {
-
watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
- operator.processWatermark(watermark);
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception occurred
while processing valve output watermark: ", e);
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void handleStreamStatus(StreamStatus streamStatus) {
- try {
- synchronized (lock) {
-
streamStatusMaintainer.toggleStreamStatus(streamStatus);
- }
- } catch (Exception e) {
- throw new RuntimeException("Exception occurred
while processing valve output stream status: ", e);
- }
- }
- }
+public interface StreamInputProcessor extends AvailabilityListener, Closeable {
+ /**
+ * @return false if probably there are not more records available and
caller should check
+ * {@link #isAvailable()}. return true if {@link #isAvailable()} check
can be safely omitted.
Review comment:
I've rephrased your rephrased version to:
```
/**
* @return true if {@link StreamInputProcessor} estimates that more
records can be processed
* immediately. Otherwise false, which means that there are no more
records available at the
* moment and the caller should check {@link #isFinished()} and/or
{@link #isAvailable()}.
*/
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services