kennknowles commented on a change in pull request #14419:
URL: https://github.com/apache/beam/pull/14419#discussion_r608248229
##########
File path:
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/BeamFnDataReadRunner.java
##########
@@ -224,6 +224,7 @@ public void trySplit(
ProcessBundleSplitRequest request, ProcessBundleSplitResponse.Builder
response) {
DesiredSplit desiredSplit =
request.getDesiredSplitsMap().get(pTransformId);
if (desiredSplit == null) {
+ LOG.info("[BOYUANZ LOG] {}", request.getInstructionId());
Review comment:
.
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1209,67 +1215,137 @@ public void setTimestampPolicy(String timestampPolicy)
{
Coder<K> keyCoder = getKeyCoder(coderRegistry);
Coder<V> valueCoder = getValueCoder(coderRegistry);
- // The Read will be expanded into SDF transform when "beam_fn_api" is
enabled.
- if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(),
"beam_fn_api")
- || ExperimentalOptions.hasExperiment(
+ // For read from unbounded in a bounded manner, we actually are not
going through Read or SDF.
+ if (ExperimentalOptions.hasExperiment(
input.getPipeline().getOptions(),
"beam_fn_api_use_deprecated_read")
|| getMaxNumRecords() < Long.MAX_VALUE
|| getMaxReadTime() != null) {
+ return input.apply(new ReadFromKafkaViaUnbounded<>(this, keyCoder,
valueCoder));
+ }
+ return input.apply(new ReadFromKafkaViaSDF<>(this, keyCoder,
valueCoder));
+ }
+
+ public static final PTransformOverride KAFKA_READ_OVERRIDE =
Review comment:
Mark `@Internal` just to be clear that it is not for pipeline authors.
Would be good to document why this exists and when to use it.
Perhaps it could be in runners-core-construction, but I actually want to
merge that back into the core SDK so no need.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]