dannycranmer commented on code in PR #19417:
URL: https://github.com/apache/flink/pull/19417#discussion_r866220480
##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java:
##########
@@ -57,11 +58,9 @@ default void
open(DeserializationSchema.InitializationContext context) throws Ex
* the record
* @param stream the name of the Kinesis stream that this record was sent
to
* @param shardId The identifier of the shard the record was sent to
- * @return the deserialized message as an Java object ({@code null} if the
message cannot be
- * deserialized).
* @throws IOException
*/
- T deserialize(
+ List<T> deserialize(
Review Comment:
This is a non-backwards compatible change. It will break all existing
implementations of `KinesisDeserializationSchema`. I would rather introduce a
new interface specifically for this usecase, then we can fork the code path
based on the supplied deserialiser
##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java:
##########
@@ -216,8 +217,13 @@ private void
deserializeRecordForCollectionAndUpdateState(final UserRecord recor
record.getSequenceNumber(),
record.getSubSequenceNumber())
: new SequenceNumber(record.getSequenceNumber());
- fetcherRef.emitRecordAndUpdateState(
- value, approxArrivalTimestamp, subscribedShardStateIndex,
collectedSequenceNumber);
+ for (T val : values) {
+ fetcherRef.emitRecordAndUpdateState(
+ val,
+ approxArrivalTimestamp,
+ subscribedShardStateIndex,
+ collectedSequenceNumber);
+ }
Review Comment:
How about if values is empty? I think this would mean that the state would
not be updated, and if the job restarts it would not restart from
`collectedSequenceNumber`, it would use an earlier sequence number. While this
is not going to impact processing semantics, it feels like a smell to me.
##########
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchemaWrapper.java:
##########
@@ -60,15 +64,26 @@ public void
open(DeserializationSchema.InitializationContext context) throws Exc
}
@Override
- public T deserialize(
+ public List<T> deserialize(
byte[] recordValue,
String partitionKey,
String seqNum,
long approxArrivalTimestamp,
String stream,
String shardId)
throws IOException {
- return deserializationSchema.deserialize(recordValue);
+
+ List<T> out = new ArrayList<>();
+ if (useCollector) {
+ ListCollector<T> coll = new ListCollector<>(out);
+ deserializationSchema.deserialize(recordValue, coll);
+ } else {
Review Comment:
There are no unit tests to cover this code path, can you please add
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]