lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445707474
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
* ...
* }</pre>
*
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api">design doc</a>. The major
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g.,
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ * <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ * KafkaIO.Read#getConsumerConfig()}.
+ * <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ * KafkaIO.Read#getConsumerFactoryFn()}.
+ * <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ * KafkaIO.Read#getOffsetConsumerConfig()}.
+ * <li>{@link ReadAll#getKeyCoder()} is the same as {@link
KafkaIO.Read#getKeyCoder()}.
+ * <li>{@link ReadAll#getValueCoder()} is the same as {@link
KafkaIO.Read#getValueCoder()}.
+ * <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ * KafkaIO.Read#getKeyDeserializerProvider()}.
+ * <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ * KafkaIO.Read#getValueDeserializerProvider()}.
+ * <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ * .apply(KafkaIO.readAll()
+ * .withBootstrapServers("broker_1:9092,broker_2:9092")
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ * KafkaSourceDescription.of(
+ * new TopicPartition("topic", 1),
+ * null,
+ * null,
+ * ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config,
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ * KafkaSourceDescription.of(
+ * new TopicPartition("topic", 1),
+ * null,
+ * null,
+ * ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ * .withKeyDeserializer(LongDeserializer.class).
+ * .withValueDeserializer(StringDeserializer.class)
+ * .withProcessingTime()
+ * .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange}
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)}
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ * <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set,
use this offset as
+ * start.
+ * <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set,
seek the start offset
+ * based on this time.
+ * <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ * Consumer#position(TopicPartition)} as the start.
+ * </ul>
Review comment:
`endReadTime` is the same time domain as `startReadTime`.
Having an end would be primarily for batch but could also be useful in
streaming pipelines.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]