[ https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940873#comment-15940873 ]
ASF GitHub Bot commented on FLINK-3257: --------------------------------------- Github user gyfora commented on a diff in the pull request: https://github.com/apache/flink/pull/1668#discussion_r107967886 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java --- @@ -119,13 +183,154 @@ protected void cleanup() throws Exception { * Creates the identification string with which head and tail task find the shared blocking * queue for the back channel. The identification string is unique per parallel head/tail pair * per iteration per job. - * - * @param jid The job ID. - * @param iterationID The id of the iteration in the job. + * + * @param jid The job ID. + * @param iterationID The id of the iteration in the job. * @param subtaskIndex The parallel subtask number * @return The identification string. */ public static String createBrokerIdString(JobID jid, String iterationID, int subtaskIndex) { return jid + "-" + iterationID + "-" + subtaskIndex; } + + /** + * An internal operator that solely serves as a state logging facility for persisting, + * partitioning and restoring output logs for dataflow cycles consistently. To support concurrency, + * logs are being sliced proportionally to the number of concurrent snapshots. This allows committed + * output logs to be uniquely identified and cleared after each complete checkpoint. + * <p> + * The design is based on the following assumptions: + * <p> + * - A slice is named after a checkpoint ID. Checkpoint IDs are numerically ordered within an execution. + * - Each checkpoint barrier arrives back in FIFO order, thus we discard log slices in respective FIFO order. + * - Upon restoration the logger sorts sliced logs in the same FIFO order and returns an Iterable that + * gives a singular view of the log. + * <p> + * TODO it seems that ListState.clear does not unregister state. We need to put a hook for that. + * + * @param <IN> + */ + public static class UpstreamLogger<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> { + + private final StreamConfig config; + + private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new LinkedList<>(); + + private UpstreamLogger(StreamConfig config) { + this.config = config; + } + + public void logRecord(StreamRecord<IN> record) throws Exception { + if (!slicedLog.isEmpty()) { + slicedLog.getLast().add(record); + } + } + + public void createSlice(String sliceID) throws Exception { + ListState<StreamRecord<IN>> nextSlice = + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(sliceID, + config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))); + slicedLog.addLast(nextSlice); + } + + public void discardSlice() { + ListState<StreamRecord<IN>> logToEvict = slicedLog.pollFirst(); + logToEvict.clear(); + } + + public Iterable<StreamRecord<IN>> getReplayLog() throws Exception { + final List<String> logSlices = new ArrayList<>(getOperatorStateBackend().getRegisteredStateNames()); + Collections.sort(logSlices, new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return Long.valueOf(o1).compareTo(Long.valueOf(o2)); + } + }); + + final List<Iterator<StreamRecord<IN>>> wrappedIterators = new ArrayList<>(); + for (String splitID : logSlices) { + wrappedIterators.add(getOperatorStateBackend() + .getOperatorState(new ListStateDescriptor<>(splitID, + config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator()); + } + + if (wrappedIterators.size() == 0) { + return new Iterable<StreamRecord<IN>>() { + @Override + public Iterator<StreamRecord<IN>> iterator() { + return Collections.emptyListIterator(); + } + }; + } + + return new Iterable<StreamRecord<IN>>() { + @Override + public Iterator<StreamRecord<IN>> iterator() { + + return new Iterator<StreamRecord<IN>>() { + int indx = 0; + Iterator<StreamRecord<IN>> currentIterator = wrappedIterators.get(0); + + @Override + public boolean hasNext() { + if (!currentIterator.hasNext()) { + progressLog(); + } + return currentIterator.hasNext(); + } + + @Override + public StreamRecord<IN> next() { + if (!currentIterator.hasNext() && indx < wrappedIterators.size()) { + progressLog(); + } + return currentIterator.next(); + } + + private void progressLog() { + while (!currentIterator.hasNext() && ++indx < wrappedIterators.size()) { + currentIterator = wrappedIterators.get(indx); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + }; + } + }; + } + + public void clearLog() throws Exception { + for (String outputLogs : getOperatorStateBackend().getRegisteredStateNames()) { + getOperatorStateBackend().getOperatorState(new ListStateDescriptor<>(outputLogs, + config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).clear(); + } + } + + + @Override + public void open() throws Exception { + super.open(); --- End diff -- why override if nothing changes? > Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs > ------------------------------------------------------------------- > > Key: FLINK-3257 > URL: https://issues.apache.org/jira/browse/FLINK-3257 > Project: Flink > Issue Type: Improvement > Components: DataStream API > Reporter: Paris Carbone > Assignee: Paris Carbone > > The current snapshotting algorithm cannot support cycles in the execution > graph. An alternative scheme can potentially include records in-transit > through the back-edges of a cyclic execution graph (ABS [1]) to achieve the > same guarantees. > One straightforward implementation of ABS for cyclic graphs can work as > follows along the lines: > 1) Upon triggering a barrier in an IterationHead from the TaskManager start > block output and start upstream backup of all records forwarded from the > respective IterationSink. > 2) The IterationSink should eventually forward the current snapshotting epoch > barrier to the IterationSource. > 3) Upon receiving a barrier from the IterationSink, the IterationSource > should finalize the snapshot, unblock its output and emit all records > in-transit in FIFO order and continue the usual execution. > -- > Upon restart the IterationSource should emit all records from the injected > snapshot first and then continue its usual execution. > Several optimisations and slight variations can be potentially achieved but > this can be the initial implementation take. > [1] http://arxiv.org/abs/1506.08603 -- This message was sent by Atlassian JIRA (v6.3.15#6346)