[ 
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625196#comment-14625196
 ] 

ASF GitHub Bot commented on FLINK-1967:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/906#discussion_r34501525
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.runtime.io;
    +
    +import java.io.IOException;
    +
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.runtime.event.task.AbstractEvent;
    +import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
    +import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
    +import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
    +import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
    +import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
    +import org.apache.flink.runtime.io.network.buffer.Buffer;
    +import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
    +import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
    +import org.apache.flink.runtime.plugable.DeserializationDelegate;
    +import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.watermark.Watermark;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
    +import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
    +import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
    + *
    + * <p>
    + * This also keeps track of {@link Watermark} events and forwards them to 
event subscribers
    + * once the {@link Watermark} from all inputs advances.
    + * 
    + * @param <IN> The type of the record that can be read with this record 
reader.
    + */
    +public class StreamInputProcessor<IN> extends AbstractReader implements 
ReaderBase, StreamingReader {
    +
    +   @SuppressWarnings("unused")
    +   private static final Logger LOG = 
LoggerFactory.getLogger(StreamInputProcessor.class);
    +
    +   private final RecordDeserializer<DeserializationDelegate>[] 
recordDeserializers;
    +
    +   private RecordDeserializer<DeserializationDelegate> 
currentRecordDeserializer;
    +
    +   // We need to keep track of the channel from which a buffer came, so 
that we can
    +   // appropriately map the watermarks to input channels
    +   int currentChannel = -1;
    +
    +   private boolean isFinished;
    +
    +   private final BarrierBuffer barrierBuffer;
    +
    +   private long[] watermarks;
    +   private long lastEmittedWatermark;
    +
    +   private DeserializationDelegate deserializationDelegate;
    +
    +   @SuppressWarnings("unchecked")
    +   public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> 
inputSerializer) {
    +           super(InputGateUtil.createInputGate(inputGates));
    +
    +           barrierBuffer = new BarrierBuffer(inputGate, this);
    +
    +           StreamRecordSerializer<IN> inputRecordSerializer = new 
StreamRecordSerializer<IN>(inputSerializer);
    +           this.deserializationDelegate = new 
NonReusingDeserializationDelegate(inputRecordSerializer);
    +
    +           // Initialize one deserializer per input channel
    +           this.recordDeserializers = new 
SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
    +           for (int i = 0; i < recordDeserializers.length; i++) {
    +                   recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate>();
    +           }
    +
    +           watermarks = new long[inputGate.getNumberOfInputChannels()];
    +           for (int i = 0; i < inputGate.getNumberOfInputChannels(); i++) {
    +                   watermarks[i] = Long.MIN_VALUE;
    +           }
    +           lastEmittedWatermark = Long.MIN_VALUE;
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   public boolean processInput(OneInputStreamOperator<IN, ?> 
streamOperator) throws Exception {
    +           if (isFinished) {
    +                   return false;
    +           }
    +
    +           while (true) {
    +                   if (currentRecordDeserializer != null) {
    +                           DeserializationResult result = 
currentRecordDeserializer.getNextRecord(deserializationDelegate);
    +
    +                           if (result.isBufferConsumed()) {
    +                                   
currentRecordDeserializer.getCurrentBuffer().recycle();
    +                                   currentRecordDeserializer = null;
    +                           }
    +
    +                           if (result.isFullRecord()) {
    +                                   Object recordOrWatermark = 
deserializationDelegate.getInstance();
    +
    +                                   if (recordOrWatermark instanceof 
Watermark) {
    +                                           Watermark mark = (Watermark) 
recordOrWatermark;
    +                                           long watermarkMillis = 
mark.getTimestamp();
    +                                           if (watermarkMillis > 
watermarks[currentChannel]) {
    +                                                   
watermarks[currentChannel] = watermarkMillis;
    +                                                   long newMinWatermark = 
Long.MAX_VALUE;
    +                                                   for (long watermark : 
watermarks) {
    --- End diff --
    
    I guess this wont work if we have an iteration, as watermarks will get 
stuck at the feedback point :/


> Introduce (Event)time in Streaming
> ----------------------------------
>
>                 Key: FLINK-1967
>                 URL: https://issues.apache.org/jira/browse/FLINK-1967
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the 
> sources to add timestamps to records. This will also introduce punctuations 
> (or low watermarks) to allow windows to work correctly on unordered, 
> timestamped input data. In the process of this, the windowing subsystem also 
> needs to be adapted to use the punctuations. Furthermore, all operators need 
> to be made aware of punctuations and correctly forward them. Then, a new 
> operator must be introduced to to allow modification of timestamps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to