[
https://issues.apache.org/jira/browse/FLINK-3257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940875#comment-15940875
]
ASF GitHub Bot commented on FLINK-3257:
---------------------------------------
Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/1668#discussion_r107968567
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
* Creates the identification string with which head and tail task find
the shared blocking
* queue for the back channel. The identification string is unique per
parallel head/tail pair
* per iteration per job.
- *
- * @param jid The job ID.
- * @param iterationID The id of the iteration in the job.
+ *
+ * @param jid The job ID.
+ * @param iterationID The id of the iteration in the job.
* @param subtaskIndex The parallel subtask number
* @return The identification string.
*/
public static String createBrokerIdString(JobID jid, String
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+ /**
+ * An internal operator that solely serves as a state logging facility
for persisting,
+ * partitioning and restoring output logs for dataflow cycles
consistently. To support concurrency,
+ * logs are being sliced proportionally to the number of concurrent
snapshots. This allows committed
+ * output logs to be uniquely identified and cleared after each
complete checkpoint.
+ * <p>
+ * The design is based on the following assumptions:
+ * <p>
+ * - A slice is named after a checkpoint ID. Checkpoint IDs are
numerically ordered within an execution.
+ * - Each checkpoint barrier arrives back in FIFO order, thus we
discard log slices in respective FIFO order.
+ * - Upon restoration the logger sorts sliced logs in the same FIFO
order and returns an Iterable that
+ * gives a singular view of the log.
+ * <p>
+ * TODO it seems that ListState.clear does not unregister state. We
need to put a hook for that.
+ *
+ * @param <IN>
+ */
+ public static class UpstreamLogger<IN> extends
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+
+ private final StreamConfig config;
+
+ private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new
LinkedList<>();
+
+ private UpstreamLogger(StreamConfig config) {
+ this.config = config;
+ }
+
+ public void logRecord(StreamRecord<IN> record) throws Exception
{
+ if (!slicedLog.isEmpty()) {
+ slicedLog.getLast().add(record);
+ }
+ }
+
+ public void createSlice(String sliceID) throws Exception {
+ ListState<StreamRecord<IN>> nextSlice =
+ getOperatorStateBackend().getOperatorState(new
ListStateDescriptor<>(sliceID,
+
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader())));
+ slicedLog.addLast(nextSlice);
+ }
+
+ public void discardSlice() {
+ ListState<StreamRecord<IN>> logToEvict =
slicedLog.pollFirst();
+ logToEvict.clear();
+ }
+
+ public Iterable<StreamRecord<IN>> getReplayLog() throws
Exception {
+ final List<String> logSlices = new
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+ Collections.sort(logSlices, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+ }
+ });
+
+ final List<Iterator<StreamRecord<IN>>> wrappedIterators
= new ArrayList<>();
+ for (String splitID : logSlices) {
+ wrappedIterators.add(getOperatorStateBackend()
+ .getOperatorState(new
ListStateDescriptor<>(splitID,
+
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator());
+ }
+
+ if (wrappedIterators.size() == 0) {
+ return new Iterable<StreamRecord<IN>>() {
+ @Override
+ public Iterator<StreamRecord<IN>>
iterator() {
+ return
Collections.emptyListIterator();
+ }
+ };
+ }
+
+ return new Iterable<StreamRecord<IN>>() {
+ @Override
+ public Iterator<StreamRecord<IN>> iterator() {
+
+ return new Iterator<StreamRecord<IN>>()
{
+ int indx = 0;
+ Iterator<StreamRecord<IN>>
currentIterator = wrappedIterators.get(0);
+
+ @Override
+ public boolean hasNext() {
+ if
(!currentIterator.hasNext()) {
+ progressLog();
+ }
+ return
currentIterator.hasNext();
+ }
+
+ @Override
+ public StreamRecord<IN> next() {
+ if
(!currentIterator.hasNext() && indx < wrappedIterators.size()) {
+ progressLog();
+ }
+ return
currentIterator.next();
+ }
+
+ private void progressLog() {
+ while
(!currentIterator.hasNext() && ++indx < wrappedIterators.size()) {
+ currentIterator
= wrappedIterators.get(indx);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new
UnsupportedOperationException();
+ }
+
+ };
+ }
+ };
+ }
+
+ public void clearLog() throws Exception {
+ for (String outputLogs :
getOperatorStateBackend().getRegisteredStateNames()) {
--- End diff --
It's kind of bad that we can't remove the state completely and keep
iterating over them when replaying the log...
> Add Exactly-Once Processing Guarantees in Iterative DataStream Jobs
> -------------------------------------------------------------------
>
> Key: FLINK-3257
> URL: https://issues.apache.org/jira/browse/FLINK-3257
> Project: Flink
> Issue Type: Improvement
> Components: DataStream API
> Reporter: Paris Carbone
> Assignee: Paris Carbone
>
> The current snapshotting algorithm cannot support cycles in the execution
> graph. An alternative scheme can potentially include records in-transit
> through the back-edges of a cyclic execution graph (ABS [1]) to achieve the
> same guarantees.
> One straightforward implementation of ABS for cyclic graphs can work as
> follows along the lines:
> 1) Upon triggering a barrier in an IterationHead from the TaskManager start
> block output and start upstream backup of all records forwarded from the
> respective IterationSink.
> 2) The IterationSink should eventually forward the current snapshotting epoch
> barrier to the IterationSource.
> 3) Upon receiving a barrier from the IterationSink, the IterationSource
> should finalize the snapshot, unblock its output and emit all records
> in-transit in FIFO order and continue the usual execution.
> --
> Upon restart the IterationSource should emit all records from the injected
> snapshot first and then continue its usual execution.
> Several optimisations and slight variations can be potentially achieved but
> this can be the initial implementation take.
> [1] http://arxiv.org/abs/1506.08603
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)