FredTing commented on a change in pull request #6105: [FLINK-8500] Get the
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r212082669
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
##########
@@ -45,6 +45,22 @@
*/
T deserialize(byte[] messageKey, byte[] message, String topic, int
partition, long offset) throws IOException;
+ /**
+ * Deserializes the byte message.
+ *
+ * @param messageKey the key as a byte array (null if no key has been
set).
+ * @param message The message, as a byte array (null if the message was
empty or deleted).
+ * @param partition The partition the message has originated from.
+ * @param offset the offset of the message in the original source (for
example the Kafka offset).
+ * @param timestamp the timestamp of the consumer record
+ * @param timestampType The timestamp type, could be NO_TIMESTAMP,
CREATE_TIME or INGEST_TIME.
+ *
+ * @return The deserialized message as an object (null if the message
cannot be deserialized).
+ */
+ default T deserialize(byte[] messageKey, byte[] message, String topic,
int partition, long offset, long timestamp, TimestampType timestampType) throws
IOException {
Review comment:
@tzulitai We can remove the non-timestamped version of the `deserialize`
method completely. It will break the interface and if you implemented this
method it quite easy to migrate to the new method. I've only introduced the new
`deserialize` method as default to make this change backwards compatible.
When backwards compatibility is required we could mark the non-timestamped
version of the `deserialize` method as `@Deprecated` and give it a default
implementation that throws a `NotImplementedException` or a
`FlinkRuntimeException` with the message "you must implement the deserialize
method when implementingt the KeyedDeserializationSchema interface"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services