Github user gyfora commented on a diff in the pull request:
https://github.com/apache/flink/pull/1668#discussion_r107967886
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
---
@@ -119,13 +183,154 @@ protected void cleanup() throws Exception {
* Creates the identification string with which head and tail task find
the shared blocking
* queue for the back channel. The identification string is unique per
parallel head/tail pair
* per iteration per job.
- *
- * @param jid The job ID.
- * @param iterationID The id of the iteration in the job.
+ *
+ * @param jid The job ID.
+ * @param iterationID The id of the iteration in the job.
* @param subtaskIndex The parallel subtask number
* @return The identification string.
*/
public static String createBrokerIdString(JobID jid, String
iterationID, int subtaskIndex) {
return jid + "-" + iterationID + "-" + subtaskIndex;
}
+
+ /**
+ * An internal operator that solely serves as a state logging facility
for persisting,
+ * partitioning and restoring output logs for dataflow cycles
consistently. To support concurrency,
+ * logs are being sliced proportionally to the number of concurrent
snapshots. This allows committed
+ * output logs to be uniquely identified and cleared after each
complete checkpoint.
+ * <p>
+ * The design is based on the following assumptions:
+ * <p>
+ * - A slice is named after a checkpoint ID. Checkpoint IDs are
numerically ordered within an execution.
+ * - Each checkpoint barrier arrives back in FIFO order, thus we
discard log slices in respective FIFO order.
+ * - Upon restoration the logger sorts sliced logs in the same FIFO
order and returns an Iterable that
+ * gives a singular view of the log.
+ * <p>
+ * TODO it seems that ListState.clear does not unregister state. We
need to put a hook for that.
+ *
+ * @param <IN>
+ */
+ public static class UpstreamLogger<IN> extends
AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+
+ private final StreamConfig config;
+
+ private LinkedList<ListState<StreamRecord<IN>>> slicedLog = new
LinkedList<>();
+
+ private UpstreamLogger(StreamConfig config) {
+ this.config = config;
+ }
+
+ public void logRecord(StreamRecord<IN> record) throws Exception
{
+ if (!slicedLog.isEmpty()) {
+ slicedLog.getLast().add(record);
+ }
+ }
+
+ public void createSlice(String sliceID) throws Exception {
+ ListState<StreamRecord<IN>> nextSlice =
+ getOperatorStateBackend().getOperatorState(new
ListStateDescriptor<>(sliceID,
+
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader())));
+ slicedLog.addLast(nextSlice);
+ }
+
+ public void discardSlice() {
+ ListState<StreamRecord<IN>> logToEvict =
slicedLog.pollFirst();
+ logToEvict.clear();
+ }
+
+ public Iterable<StreamRecord<IN>> getReplayLog() throws
Exception {
+ final List<String> logSlices = new
ArrayList<>(getOperatorStateBackend().getRegisteredStateNames());
+ Collections.sort(logSlices, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return
Long.valueOf(o1).compareTo(Long.valueOf(o2));
+ }
+ });
+
+ final List<Iterator<StreamRecord<IN>>> wrappedIterators
= new ArrayList<>();
+ for (String splitID : logSlices) {
+ wrappedIterators.add(getOperatorStateBackend()
+ .getOperatorState(new
ListStateDescriptor<>(splitID,
+
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).get().iterator());
+ }
+
+ if (wrappedIterators.size() == 0) {
+ return new Iterable<StreamRecord<IN>>() {
+ @Override
+ public Iterator<StreamRecord<IN>>
iterator() {
+ return
Collections.emptyListIterator();
+ }
+ };
+ }
+
+ return new Iterable<StreamRecord<IN>>() {
+ @Override
+ public Iterator<StreamRecord<IN>> iterator() {
+
+ return new Iterator<StreamRecord<IN>>()
{
+ int indx = 0;
+ Iterator<StreamRecord<IN>>
currentIterator = wrappedIterators.get(0);
+
+ @Override
+ public boolean hasNext() {
+ if
(!currentIterator.hasNext()) {
+ progressLog();
+ }
+ return
currentIterator.hasNext();
+ }
+
+ @Override
+ public StreamRecord<IN> next() {
+ if
(!currentIterator.hasNext() && indx < wrappedIterators.size()) {
+ progressLog();
+ }
+ return
currentIterator.next();
+ }
+
+ private void progressLog() {
+ while
(!currentIterator.hasNext() && ++indx < wrappedIterators.size()) {
+ currentIterator
= wrappedIterators.get(indx);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new
UnsupportedOperationException();
+ }
+
+ };
+ }
+ };
+ }
+
+ public void clearLog() throws Exception {
+ for (String outputLogs :
getOperatorStateBackend().getRegisteredStateNames()) {
+ getOperatorStateBackend().getOperatorState(new
ListStateDescriptor<>(outputLogs,
+
config.<StreamRecord<IN>>getTypeSerializerOut(getUserCodeClassloader()))).clear();
+ }
+ }
+
+
+ @Override
+ public void open() throws Exception {
+ super.open();
--- End diff --
why override if nothing changes?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---