Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2707#discussion_r86110695
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
---
@@ -201,27 +259,83 @@ public void processElement(StreamRecord<IN> element)
throws Exception {
serializer.serialize(value, new
DataOutputViewStreamWrapper(out));
}
- /**
- * This state is used to keep a list of all StateHandles (essentially
references to past OperatorStates) that were
- * used since the last completed checkpoint.
- **/
- public static class ExactlyOnceState implements Serializable {
+ private static final class PendingCheckpointId implements
Comparable<PendingCheckpointId>, Serializable {
+
+ private static final long serialVersionUID =
-3571036395734603443L;
+
+ private final long checkpointId;
+ private final int subtaskId;
+
+ PendingCheckpointId(long checkpointId, int subtaskId) {
+ this.checkpointId = checkpointId;
+ this.subtaskId = subtaskId;
+ }
+
+ void serialize(DataOutputViewStreamWrapper outputStream) throws
IOException {
+ outputStream.writeLong(this.checkpointId);
+ outputStream.writeInt(this.subtaskId);
+ }
- private static final long serialVersionUID =
-3571063495273460743L;
+ static PendingCheckpointId restore(DataInputViewStreamWrapper
inputStream) throws IOException, ClassNotFoundException {
+ long checkpointId = inputStream.readLong();
+ int subtaskId = inputStream.readInt();
+ return new PendingCheckpointId(checkpointId, subtaskId);
+ }
+
+ @Override
+ public int compareTo(PendingCheckpointId o) {
+ if (o == null) {
+ return -1;
+ }
- protected TreeMap<Long, Tuple2<Long, StreamStateHandle>>
pendingHandles;
+ long res = this.checkpointId - o.checkpointId;
--- End diff --
you could use `Long#compare(long, long)` instead of all this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---