AHeise commented on pull request #16142:
URL: https://github.com/apache/flink/pull/16142#issuecomment-870270384


   > @AHeise , thanks for detailed rephrase and I hold a different point of 
view:
   > 
   >     1. The `defaultTopicId` could be null in the constructor of 
`KafkaSerializationSchema` for the case that user doesn't define the `TOPIC` 
table option, in which case that `defaultTopicId` is unnecessary regarded as 
`TOPIC_UNSPECIFIED `.
   
   It's a bad habit to weaken invariants to add new features, especially if 
they are on different abstraction levels. Don't forget that 
`FlinkKafkaProducer` is very commonly used in DataStream applications. Why make 
it harder to detect bugs in DataStream applications, just so that Table API has 
a new feature?
   
   Instead of weakening invariants, please always consider if there is a way to 
express your intention without modification of low layers. From the Table API's 
perspective, the actual value of `defaultTopicId` doesn't matter, so there is a 
large degree of freedom that you should use.
   
   If you insist on `defaultTopicId` being nullable, then you need to guard 
`FlinkKafkaProducer` against all cases of `defaultTopicId == null && 
getTargetTopic() == null`. There are at least two code-paths that can lead to 
the violated invariant `topicId == null`, so you need to provide good 
error-messages that the user can quickly see which code-path lead to the 
violation. Further, I expect test coverage of all cases then before I can 
approve. Imho that's wasted effort.
   
   >     2. The return value of 
`DynamicKafkaSerializationSchema#getTargetTopic` is never null after adding the 
check whether the `TOPIC` metadata column is null or empty. But this check is a 
very expensive operation that is executed for every record in the hot path, 
therefore in terms of performance this check could be unnecessary and can rely 
on the user to specify it correctly.
   
   I was hoping that we can do it statically: when setting up the Flink job, 
don't we have a way to detect if a column is nullable or not? You could simply 
reject an input where the column could potentially contain nulls, no? 
   
   If that is impossible, we could simply reduce the check to `isNull` (that's 
rather cheap), I don't see why we need to check for empty values. We are also 
not verifying if the string is a valid topic name at this point, so just relay 
that to Kafka.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to