[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940878#comment-15940878
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107969496
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
    @@ -17,100 +17,164 @@
     
     package org.apache.flink.streaming.runtime.tasks;
     
    -import java.util.concurrent.ArrayBlockingQueue;
    -import java.util.concurrent.BlockingQueue;
    -import java.util.concurrent.TimeUnit;
    -
     import org.apache.flink.annotation.Internal;
     import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
    +import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
    +import org.apache.flink.runtime.checkpoint.CheckpointOptions;
    +import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
    +import org.apache.flink.streaming.api.graph.StreamConfig;
    +import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
    +import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
    +import org.apache.flink.streaming.api.operators.Output;
     import org.apache.flink.streaming.api.watermark.Watermark;
    -import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
    +import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
     import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.types.Either;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.LinkedList;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * TODO write javadoc
    + * <p>
    + * - open a list state per snapshot process
    + * - book-keep snapshot logs
    + * - Clean up state when a savepoint is complete - ONLY in-transit records 
who do NOT belong in other snapshots
    + *
    + * @param <IN>
    + */
     @Internal
    -public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> 
{
    +public class StreamIterationHead<IN> extends OneInputStreamTask<IN, IN> {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamIterationHead.class);
     
        private volatile boolean running = true;
     
    -   // 
------------------------------------------------------------------------
    -   
    +   private volatile RecordWriterOutput<IN>[] outputs;
    +
    +   private UpstreamLogger<IN> upstreamLogger;
    +
    +   private Object lock;
    +
    +   @Override
    +   public void init() throws Exception {
    +           this.lock = getCheckpointLock();
    +           getConfiguration().setStreamOperator(new 
UpstreamLogger(getConfiguration()));
    +           operatorChain = new OperatorChain<>(this);
    +           this.upstreamLogger = (UpstreamLogger<IN>) 
operatorChain.getHeadOperator();
    +   }
    +
        @Override
        protected void run() throws Exception {
    -           
    +
                final String iterationId = getConfiguration().getIterationId();
                if (iterationId == null || iterationId.length() == 0) {
                        throw new Exception("Missing iteration ID in the task 
configuration");
                }
    -           
    -           final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId ,
    -                           
getEnvironment().getTaskInfo().getIndexOfThisSubtask());
    -           
    +           final String brokerID = 
createBrokerIdString(getEnvironment().getJobID(), iterationId,
    +                   getEnvironment().getTaskInfo().getIndexOfThisSubtask());
                final long iterationWaitTime = 
getConfiguration().getIterationWaitTime();
                final boolean shouldWait = iterationWaitTime > 0;
     
    -           final BlockingQueue<StreamRecord<OUT>> dataChannel = new 
ArrayBlockingQueue<StreamRecord<OUT>>(1);
    +           final BlockingQueue<Either<StreamRecord<IN>, 
CheckpointBarrier>> dataChannel
    +                   = new ArrayBlockingQueue<>(1);
     
                // offer the queue for the tail
                BlockingQueueBroker.INSTANCE.handIn(brokerID, dataChannel);
                LOG.info("Iteration head {} added feedback queue under {}", 
getName(), brokerID);
     
                // do the work 
                try {
    -                   @SuppressWarnings("unchecked")
    -                   RecordWriterOutput<OUT>[] outputs = 
(RecordWriterOutput<OUT>[]) getStreamOutputs();
    +                   outputs = (RecordWriterOutput<IN>[]) getStreamOutputs();
     
                        // If timestamps are enabled we make sure to remove 
cyclic watermark dependencies
                        if (isSerializingTimestamps()) {
    -                           for (RecordWriterOutput<OUT> output : outputs) {
    +                           for (RecordWriterOutput<IN> output : outputs) {
                                        output.emitWatermark(new 
Watermark(Long.MAX_VALUE));
                                }
                        }
     
    +                   synchronized (lock) {
    +                           //emit in-flight events in the upstream log 
upon recovery
    +                           for (StreamRecord<IN> rec : 
upstreamLogger.getReplayLog()) {
    --- End diff --
    
    Maybe it would make sense to put this in a while loop and check "running" 
as well, to cancel early if the job is cancelled during replay


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to