nbali commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926883593
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin
input) {
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
- final KafkaIOReadImplementationCompatibilityResult compatibility =
- KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
- // For a number of cases, we prefer using the UnboundedSource Kafka over
the new SDF-based
- // Kafka source, for example,
- // * Experiments 'beam_fn_api_use_deprecated_read' and
use_deprecated_read will result in
- // legacy UnboundeSource being used.
- // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy
UnboundeSource being used
- // but will be wrapped by an SDF.
- // * Some runners or selected features may not be compatible with
SDF-based Kafka.
- if (ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")
- || ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(), "use_deprecated_read")
- || ExperimentalOptions.hasExperiment(
- input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
- || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
- || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
- && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
- return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder,
valueCoder));
- }
- return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
+ // Reading from Kafka SDF is currently broken, as re-starting the
pipeline will cause the
Review Comment:
If this gets merged into master and gets released it's not temporary.
Totally removing the SDF support is a breaking change. It should be as
minimally breaking as possible. I have shown one precondition that indicates it
works just fine even in this bugged state. There could be even more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]