leonardBang commented on code in PR #21186:
URL: https://github.com/apache/flink/pull/21186#discussion_r1008991930
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java:
##########
@@ -194,8 +194,10 @@
public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
Review Comment:
```suggestion
public void write(@Nullable IN element, Context context) throws
IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext,
context.timestamp());
```
##########
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchema.java:
##########
@@ -54,7 +54,7 @@ default void open(
* @param element element to be serialized
* @param context context to possibly determine target partition
* @param timestamp timestamp
- * @return Kafka {@link ProducerRecord}
+ * @return Kafka {@link ProducerRecord} (null if the element cannot be
serialized)
*/
ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext
context, Long timestamp);
Review Comment:
```suggestion
* @return Kafka {@link ProducerRecord} or null if the given element
cannot be serialized
*/
@Nullable ProducerRecord<byte[], byte[]> serialize(T element,
KafkaSinkContext context, Long timestamp);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws
Exception {
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+ // elements that cannot be serialized should be silently skipped
Review Comment:
minor: we can improve the note, because not all serializers will return
`null` when the element cannot be serialized, not all `null` are due to cannot
be serialized .
What we can ensure is only that the `KafkaSinkWriter` will silently skip
`null` that returned by serializers
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java:
##########
@@ -148,6 +148,15 @@ public void testIncreasingRecordBasedCounters() throws
Exception {
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+ // elements that cannot be serialized should be silently skipped
+ writer.write(null, SINK_WRITER_CONTEXT);
+ timeService.trigger();
+ assertThat(numBytesOut.getCount()).isEqualTo(0L);
+ assertThat(numRecordsOut.getCount()).isEqualTo(0);
+ assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
+ assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
+
+ // but properly serialized elements should count just normally
Review Comment:
Could we add a new test like `testWriteNullElement` instead of modification
in an existed test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]