pawel-big-lebowski commented on code in PR #130:
URL:
https://github.com/apache/flink-connector-kafka/pull/130#discussion_r1820485021
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java:
##########
@@ -369,5 +416,43 @@ public ProducerRecord<byte[], byte[]> serialize(
value,
headerProvider != null ?
headerProvider.getHeaders(element) : null);
}
+
+ @Override
+ public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+ if (!(topicSelector instanceof KafkaDatasetIdentifierProvider)) {
+ LOG.warn("Cannot identify topics. Not an
TopicsIdentifierProvider");
+ return Optional.empty();
+ }
+
+ Optional<KafkaDatasetIdentifier> topicsIdentifier =
+ ((KafkaDatasetIdentifierProvider)
(topicSelector)).getDatasetIdentifier();
+
+ if (!topicsIdentifier.isPresent()) {
+ LOG.warn("No topics' identifiers provided");
+ return Optional.empty();
+ }
+
+ TypeInformation typeInformation;
+ if (this.valueSerializationSchema instanceof ResultTypeQueryable) {
+ typeInformation =
+ ((ResultTypeQueryable<?>)
this.valueSerializationSchema).getProducedType();
+ } else {
+ // gets type information from serialize method signature
+ typeInformation =
Review Comment:
`LineageGraph` in the flink-core contains separate lists of sources and
sinks. Given that, I am not sure if we want to distinguish "inputType" from
"outputType". From the facet perspective, this should be all `type` and the
same facet can be used for both scenarios.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]