boyuanzz commented on a change in pull request #14419:
URL: https://github.com/apache/beam/pull/14419#discussion_r609027411
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1209,67 +1215,137 @@ public void setTimestampPolicy(String timestampPolicy)
{
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
- // The Read will be expanded into SDF transform when "beam_fn_api" is
enabled.
- if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(),
"beam_fn_api")
- || ExperimentalOptions.hasExperiment(
+ // For read from unbounded in a bounded manner, we actually are not
going through Read or SDF.
+ if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
+ return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder,
valueCoder));
+ }
+ return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
+ }
+
+ public static final PTransformOverride KAFKA_READ_OVERRIDE =
Review comment:
runners-core-construction cannot depend on kafka-io because
expansion-service introduces circular dependency, just like pubsub
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]