[ 
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940873#comment-15940873
 ] 

ASF GitHub Bot commented on FLINK-3257:
---------------------------------------

Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1668#discussion_r107967886
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 ---
    @@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
         * Creates the identification string with which head and tail task find 
the shared blocking
         * queue for the back channel. The identification string is unique per 
parallel head/tail pair
         * per iteration per job.
    -    * 
    -    * @param jid The job ID.
    -    * @param iterationID The id of the iteration in the job.
    +    *
    +    * @param jid          The job ID.
    +    * @param iterationID  The id of the iteration in the job.
         * @param subtaskIndex The parallel subtask number
         * @return The identification string.
         */
        public static String createBrokerIdString(JobID jid, String 
iterationID, int subtaskIndex) {
                return jid + "-" + iterationID + "-" + subtaskIndex;
        }
    +
    +   /**
    +    * An internal operator that solely serves as a state logging facility 
for persisting,
    +    * partitioning and restoring output logs for dataflow cycles 
consistently. To support concurrency,
    +    * logs are being sliced proportionally to the number of concurrent 
snapshots. This allows committed
    +    * output logs to be uniquely identified and cleared after each 
complete checkpoint.
    +    * <p>
    +    * The design is based on the following assumptions:
    +    * <p>
    +    * - A slice is named after a checkpoint ID. Checkpoint IDs are 
numerically ordered within an execution.
    +    * - Each checkpoint barrier arrives back in FIFO order, thus we 
discard log slices in respective FIFO order.
    +    * - Upon restoration the logger sorts sliced logs in the same FIFO 
order and returns an Iterable that
    +    * gives a singular view of the log.
    +    * <p>
    +    * TODO it seems that ListState.clear does not unregister state. We 
need to put a hook for that.
    +    *
    +    * @param <IN>
    +    */
    +   public static class UpstreamLogger<IN> extends 
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    +
    +           private final StreamConfig config;
    +
    +           private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new 
LinkedList<>();
    +
    +           private UpstreamLogger(StreamConfig config) {
    +                   this.config = config;
    +           }
    +
    +           public void logRecord(StreamRecord<IN> record) throws Exception 
{
    +                   if (!slicedLog.isEmpty()) {
    +                           slicedLog.getLast().add(record);
    +                   }
    +           }
    +
    +           public void createSlice(String sliceID) throws Exception {
    +                   ListState<StreamRecord<IN>> nextSlice =
    +                           getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(sliceID,
    +                                   
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader())));
    +                   slicedLog.addLast(nextSlice);
    +           }
    +
    +           public void discardSlice() {
    +                   ListState<StreamRecord<IN>> logToEvict = 
slicedLog.pollFirst();
    +                   logToEvict.clear();
    +           }
    +
    +           public Iterable<StreamRecord<IN>> getReplayLog() throws 
Exception {
    +                   final List<String> logSlices = new 
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
    +                   Collections.sort(logSlices, new Comparator<String>() {
    +                           @Override
    +                           public int compare(String o1, String o2) {
    +                                   return 
Long.valueOf(o1).compareTo(Long.valueOf(o2));
    +                           }
    +                   });
    +
    +                   final List<Iterator<StreamRecord<IN>>> wrappedIterators 
= new ArrayList<>();
    +                   for (String splitID : logSlices) {
    +                           wrappedIterators.add(getOperatorStateBackend()
    +                                   .getOperatorState(new 
ListStateDescriptor<>(splitID,
    +                                           
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator());
    +                   }
    +
    +                   if (wrappedIterators.size() == 0) {
    +                           return new Iterable<StreamRecord<IN>>() {
    +                                   @Override
    +                                   public Iterator<StreamRecord<IN>> 
iterator() {
    +                                           return 
Collections.emptyListIterator();
    +                                   }
    +                           };
    +                   }
    +
    +                   return new Iterable<StreamRecord<IN>>() {
    +                           @Override
    +                           public Iterator<StreamRecord<IN>> iterator() {
    +
    +                                   return new Iterator<StreamRecord<IN>>() 
{
    +                                           int indx = 0;
    +                                           Iterator<StreamRecord<IN>> 
currentIterator = wrappedIterators.get(0);
    +
    +                                           @Override
    +                                           public boolean hasNext() {
    +                                                   if 
(!currentIterator.hasNext()) {
    +                                                           progressLog();
    +                                                   }
    +                                                   return 
currentIterator.hasNext();
    +                                           }
    +
    +                                           @Override
    +                                           public StreamRecord<IN> next() {
    +                                                   if 
(!currentIterator.hasNext() && indx < wrappedIterators.size()) {
    +                                                           progressLog();
    +                                                   }
    +                                                   return 
currentIterator.next();
    +                                           }
    +
    +                                           private void progressLog() {
    +                                                   while 
(!currentIterator.hasNext() && ++indx < wrappedIterators.size()) {
    +                                                           currentIterator 
= wrappedIterators.get(indx);
    +                                                   }
    +                                           }
    +
    +                                           @Override
    +                                           public void remove() {
    +                                                   throw new 
UnsupportedOperationException();
    +                                           }
    +
    +                                   };
    +                           }
    +                   };
    +           }
    +
    +           public void clearLog() throws Exception {
    +                   for (String outputLogs : 
getOperatorStateBackend().getRegisteredStateNames()) {
    +                           getOperatorStateBackend().getOperatorState(new 
ListStateDescriptor<>(outputLogs,
    +                                   
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).clear();
    +                   }
    +           }
    +
    +
    +           @Override
    +           public void open() throws Exception {
    +                   super.open();
    --- End diff --
    
    why override if nothing changes?


> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
>                 Key: FLINK-3257
>                 URL: https://issues.apache.org/jira/browse/FLINK-3257
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Paris Carbone
>            Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution 
> graph. An alternative scheme can potentially include records in-transit 
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the 
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as 
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start 
> block output and start upstream backup of all records forwarded from the 
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch 
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource 
> should finalize the snapshot, unblock its output and emit all records 
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected 
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but 
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to