lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445707474
########## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ########## @@ -198,6 +213,154 @@ * ... * }</pre> * + * <h2>Read from Kafka as a {@link DoFn}</h2> + * + * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link + * KafkaSourceDescription} as input and outputs a PCollection of {@link KafkaRecord}. The core + * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code + * SplittableDoFn}, please refer to the <a + * href="https://beam.apache.org/blog/splittable-do-fn/">blog post</a> and <a + * href="https://s.apache.org/beam-fn-api">design doc</a>. The major difference from {@link + * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadAll}. + * + * <h3>Common Kafka Consumer Configurations</h3> + * + * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * <ul> + * <li>{@link ReadAll#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * <li>{@link ReadAll#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * <li>{@link ReadAll#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * </ul> + * + * <p>For example, to create a basic {@link ReadAll} transform: + * + * <pre>{@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * Note that the {@code bootstrapServers} can also be populated from {@link KafkaSourceDescription}: + * pipeline + * .apply(Create.of( + * KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * }</pre> + * + * <h3>Configurations of {@link ReadAll}</h3> + * + * <p>Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * <p>{@link ReadAll#commitOffsets()} enables committing offset after processing the record. Note + * that if {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadAll#commitOffsets()} will be ignored. + * + * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks for a function which + * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is used to + * produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link + * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link + * ReadAll#withLogAppendTime()}. + * + * <p>For example, to create a {@link ReadAll} with these configurations: + * + * <pre>{@code + * pipeline + * .apply(Create.of( + * KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class) + * .withProcessingTime() + * .commitOffsets()); + * + * }</pre> + * + * <h3>Read from {@link KafkaSourceDescription}</h3> + * + * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link + * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record + * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with + * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created. + * + * <h4>Initialize Restriction</h4> + * + * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} creates an initial range for + * a input element {@link KafkaSourceDescription}. The end of range will be initialized as {@code + * Long.MAX_VALUE}. For the start of the range: + * + * <ul> + * <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, use this offset as + * start. + * <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, seek the start offset + * based on this time. + * <li>Otherwise, the last committed offset + 1 will be returned by {@link + * Consumer#position(TopicPartition)} as the start. + * </ul> Review comment: `endReadTime` is the same time domain as `startReadTime`. Having an end would be for batch but could also be useful in streaming pipelines. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org