[
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625196#comment-14625196
]
ASF GitHub Bot commented on FLINK-1967:
---------------------------------------
Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/906#discussion_r34501525
--- Diff:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
+import
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.plugable.DeserializationDelegate;
+import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Input reader for {@link
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This also keeps track of {@link Watermark} events and forwards them to
event subscribers
+ * once the {@link Watermark} from all inputs advances.
+ *
+ * @param <IN> The type of the record that can be read with this record
reader.
+ */
+public class StreamInputProcessor<IN> extends AbstractReader implements
ReaderBase, StreamingReader {
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG =
LoggerFactory.getLogger(StreamInputProcessor.class);
+
+ private final RecordDeserializer<DeserializationDelegate>[]
recordDeserializers;
+
+ private RecordDeserializer<DeserializationDelegate>
currentRecordDeserializer;
+
+ // We need to keep track of the channel from which a buffer came, so
that we can
+ // appropriately map the watermarks to input channels
+ int currentChannel = -1;
+
+ private boolean isFinished;
+
+ private final BarrierBuffer barrierBuffer;
+
+ private long[] watermarks;
+ private long lastEmittedWatermark;
+
+ private DeserializationDelegate deserializationDelegate;
+
+ @SuppressWarnings("unchecked")
+ public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN>
inputSerializer) {
+ super(InputGateUtil.createInputGate(inputGates));
+
+ barrierBuffer = new BarrierBuffer(inputGate, this);
+
+ StreamRecordSerializer<IN> inputRecordSerializer = new
StreamRecordSerializer<IN>(inputSerializer);
+ this.deserializationDelegate = new
NonReusingDeserializationDelegate(inputRecordSerializer);
+
+ // Initialize one deserializer per input channel
+ this.recordDeserializers = new
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
+ for (int i = 0; i < recordDeserializers.length; i++) {
+ recordDeserializers[i] = new
SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
+ }
+
+ watermarks = new long[inputGate.getNumberOfInputChannels()];
+ for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
+ watermarks[i] = Long.MIN_VALUE;
+ }
+ lastEmittedWatermark = Long.MIN_VALUE;
+ }
+
+ @SuppressWarnings("unchecked")
+ public boolean processInput(OneInputStreamOperator<IN, ?>
streamOperator) throws Exception {
+ if (isFinished) {
+ return false;
+ }
+
+ while (true) {
+ if (currentRecordDeserializer != null) {
+ DeserializationResult result =
currentRecordDeserializer.getNextRecord(deserializationDelegate);
+
+ if (result.isBufferConsumed()) {
+
currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer = null;
+ }
+
+ if (result.isFullRecord()) {
+ Object recordOrWatermark =
deserializationDelegate.getInstance();
+
+ if (recordOrWatermark instanceof
Watermark) {
+ Watermark mark = (Watermark)
recordOrWatermark;
+ long watermarkMillis =
mark.getTimestamp();
+ if (watermarkMillis >
watermarks[currentChannel]) {
+
watermarks[currentChannel] = watermarkMillis;
+ long newMinWatermark =
Long.MAX_VALUE;
+ for (long watermark :
watermarks) {
--- End diff --
I guess this wont work if we have an iteration, as watermarks will get
stuck at the feedback point :/
> Introduce (Event)time in Streaming
> ----------------------------------
>
> Key: FLINK-1967
> URL: https://issues.apache.org/jira/browse/FLINK-1967
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the
> sources to add timestamps to records. This will also introduce punctuations
> (or low watermarks) to allow windows to work correctly on unordered,
> timestamped input data. In the process of this, the windowing subsystem also
> needs to be adapted to use the punctuations. Furthermore, all operators need
> to be made aware of punctuations and correctly forward them. Then, a new
> operator must be introduced to to allow modification of timestamps.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)