kennknowles commented on a change in pull request #16909:
URL: https://github.com/apache/beam/pull/16909#discussion_r814978064
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1354,17 +1356,29 @@ private boolean
runnerRequiresLegacyRead(PipelineOptions options) {
}
}
- private static class ReadFromKafkaViaUnbounded<K, V>
+ private abstract static class AbstractReadFromKafka<K, V>
extends PTransform<PBegin, PCollection<KafkaRecord<K, V>>> {
Read<K, V> kafkaRead;
Coder<K> keyCoder;
Coder<V> valueCoder;
- ReadFromKafkaViaUnbounded(Read<K, V> kafkaRead, Coder<K> keyCoder,
Coder<V> valueCoder) {
+ AbstractReadFromKafka(
+ Read<K, V> kafkaRead,
+ Coder<K> keyCoder,
+ Coder<V> valueCoder,
+ KafkaIOReadImplementation implementation) {
+ KafkaIOReadImplementationCompatibility.getCompatibility(kafkaRead)
+ .checkSupport(implementation);
this.kafkaRead = kafkaRead;
this.keyCoder = keyCoder;
this.valueCoder = valueCoder;
}
+ }
+
+ static class ReadFromKafkaViaUnbounded<K, V> extends
AbstractReadFromKafka<K, V> {
Review comment:
Please don't do this overriding of `expand`. It is quite confusing. Just
have two independent `PTransform` classes that implement it their own way. It
is not valuable to share local fields across these, really. If you really must
share between the two implementations, use composition rather than inheritance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]