Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107969496
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
    @@ -17,100 +17,164 @@
     
     package org.apache.flink.streaming.runtime.tasks;
     
    -import java.util.concurrent.ArrayBlockingQueue;
    -import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -
     import org.apache.flink.annotation.Internal;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
     import org.apache.flink.streaming.api.watermark.Watermark;
    -import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
    +import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.types.Either;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * TODO write javadoc
    + * <p>
    + * - open a list state per snapshot process
    + * - book-keep snapshot logs
    + * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
    + *
    + * @param <IN>
    + */
     @Internal
    -public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> 
{
    +public class StreamIterationHead<IN> extends OneInputStreamTask<IN, IN> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
     
        private volatile boolean running = true;
     
    -   // 
------------------------------------------------------------------------
    -   
    +   private volatile RecordWriterOutput<IN>[] outputs;
    +
    +   private UpstreamLogger<IN> upstreamLogger;
    +
    +   private Object lock;
    +
    +   @Override
    +   public void init() throws Exception {
    +           this.lock = getCheckpointLock();
    +           getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
    +           operatorChain = new OperatorChain<>(this);
    +           this.upstreamLogger = (UpstreamLogger<IN>) 
operatorChain.getHeadOperator();
    +   }
    +
        @Override
        protected void run() throws Exception {
    -           
    +
                final String iterationId = getConfiguration().getIterationId();
                if (iterationId == null || iterationId.length() == 0) {
                        throw new Exception("Missing iteration ID in the task 
configuration");
                }
    -           
    -           final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
    -                           
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
    -           
    +           final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
    +                   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
                final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
                final boolean shouldWait = iterationWaitTime > 0;
     
    -           final BlockingQueue<StreamRecord<OUT>> dataChannel = new 
ArrayBlockingQueue<StreamRecord<OUT>>(1);
    +           final BlockingQueue<Either<StreamRecord<IN>, 
CheckpointBarrier>> dataChannel
    +                   = new ArrayBlockingQueue<>(1);
     
                // offer the queue for the tail
                BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
                LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
     
                // do the work 
                try {
    -                   @SuppressWarnings("unchecked")
    -                   RecordWriterOutput<OUT>[] outputs = 
(RecordWriterOutput<OUT>[]) getStreamOutputs();
    +                   outputs = (RecordWriterOutput<IN>[]) getStreamOutputs();
     
                        // If timestamps are enabled we make sure to remove 
cyclic watermark dependencies
                        if (isSerializingTimestamps()) {
    -                           for (RecordWriterOutput<OUT> output : outputs) {
    +                           for (RecordWriterOutput<IN> output : outputs) {
                                        output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
                                }
                        }
     
    +                   synchronized (lock) {
    +                           //emit in-flight events in the upstream log 
upon recovery
    +                           for (StreamRecord<IN> rec : 
upstreamLogger.getReplayLog()) {
    --- End diff --
    
    Maybe it would make sense to put this in a while loop and check "running" 
as well, to cancel early if the job is cancelled during replay


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to