zentol closed pull request #6610: [FLINK-10204] - fix serialization/copy error
for LatencyMarker records.
URL: https://github.com/apache/flink/pull/6610
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
index 932e1300d9c..4074251986f 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/LatencyMarker.java
@@ -80,7 +80,7 @@ public boolean equals(Object o) {
if (markedTime != that.markedTime) {
return false;
}
- if (operatorId != that.operatorId) {
+ if (!operatorId.equals(that.operatorId)) {
return false;
}
return subtaskIndex == that.subtaskIndex;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
index ba92416d792..ed6022ff592 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java
@@ -156,7 +156,8 @@ else if (tag == TAG_STREAM_STATUS) {
}
else if (tag == TAG_LATENCY_MARKER) {
target.writeLong(source.readLong());
- target.writeInt(source.readInt());
+ target.writeLong(source.readLong());
+ target.writeLong(source.readLong());
target.writeInt(source.readInt());
} else {
throw new IOException("Corrupt stream, found tag: " +
tag);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
index 4a993179793..0e4e84b5d30 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerTest.java
@@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Test;
@@ -89,6 +90,9 @@ public void testSerialization() throws Exception {
Watermark negativeWatermark = new
Watermark(-4647654567676555876L);
assertEquals(negativeWatermark,
serializeAndDeserialize(negativeWatermark, serializer));
+
+ LatencyMarker latencyMarker = new
LatencyMarker(System.currentTimeMillis(), new OperatorID(-1, -1), 1);
+ assertEquals(latencyMarker,
serializeAndDeserialize(latencyMarker, serializer));
}
@SuppressWarnings("unchecked")
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services