johnjcasey commented on code in PR #22261:
URL: https://github.com/apache/beam/pull/22261#discussion_r926867277


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java:
##########
@@ -1336,28 +1335,34 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin 
input) {
       Coder<K> keyCoder = getKeyCoder(coderRegistry);
       Coder<V> valueCoder = getValueCoder(coderRegistry);
 
-      final KafkaIOReadImplementationCompatibilityResult compatibility =
-          KafkaIOReadImplementationCompatibility.getCompatibility(this);
-
-      // For a number of cases, we prefer using the UnboundedSource Kafka over 
the new SDF-based
-      // Kafka source, for example,
-      // * Experiments 'beam_fn_api_use_deprecated_read' and 
use_deprecated_read will result in
-      // legacy UnboundeSource being used.
-      // * Experiment 'use_unbounded_sdf_wrapper' will result in legacy 
UnboundeSource being used
-      // but will be wrapped by an SDF.
-      // * Some runners or selected features may not be compatible with 
SDF-based Kafka.
-      if (ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), 
"beam_fn_api_use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_deprecated_read")
-          || ExperimentalOptions.hasExperiment(
-              input.getPipeline().getOptions(), "use_unbounded_sdf_wrapper")
-          || compatibility.supportsOnly(KafkaIOReadImplementation.LEGACY)
-          || (compatibility.supports(KafkaIOReadImplementation.LEGACY)
-              && runnerPrefersLegacyRead(input.getPipeline().getOptions()))) {
-        return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder, 
valueCoder));
-      }
-      return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder, 
valueCoder));
+      // Reading from Kafka SDF is currently broken, as re-starting the 
pipeline will cause the

Review Comment:
   This is temporary. We primarily want to make sure that a new user won't run 
into this problem. We intend to fix this as rapidly as possible. If you have a 
less typical use case, that will still work on existing versions of Beam, while 
we try to get this fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to