lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445707474



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, 
use this offset as
+ *       start.
+ *   <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, 
seek the start offset
+ *       based on this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>

Review comment:
       `endReadTime` is the same time domain as `startReadTime`.
   
   Having an end would be primarily for batch but could also be useful in 
streaming pipelines.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to