lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r442957543
########## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadViaSDF.java ########## @@ -0,0 +1,742 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.ExperimentalOptions; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.AppInfoParser; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} that takes a PCollection of {@link Row} IN {@link + * KafkaSourceDescriptionSchemas} which represents Kafka source description as input and outputs a + * PCollection of {@link KafkaRecord}. The core implementation is based on {@code SplittableDoFn}. + * For more details about the concept of {@code SplittableDoFn}, please refer to the beam blog post: + * https://beam.apache.org/blog/splittable-do-fn/ and design doc:https://s.apache.org/beam-fn-api. + * The major difference from {@link KafkaIO.Read} is, {@link ReadViaSDF} doesn't require source + * descriptions(e.g., {@link KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, + * {@link KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, + * the pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadViaSDF}. + * + * <h3>Common Kafka Consumer Configurations</h3> + * + * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * <ul> + * <li>{@link ReadViaSDF#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * <li>{@link ReadViaSDF#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * <li>{@link ReadViaSDF#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * <li>{@link ReadViaSDF#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * <li>{@link ReadViaSDF#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * <li>{@link ReadViaSDF#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * <li>{@link ReadViaSDF#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * <li>{@link ReadViaSDF#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * </ul> + * + * <p>For example, to create a basic {@link ReadViaSDF} transform: + * + * <pre>{@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1)))) + * .apply(ReadFromKafkaViaSDF.create() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * }</pre> + * + * <h3>Configurations of {@link ReadViaSDF}</h3> + * + * <p>Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * <p>{@link ReadViaSDF#commitOffsets()} enables committing offset after processing the record. Note + * that if {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadViaSDF#commitOffsets()} will be ignored. + * + * <p>{@link ReadViaSDF#withExtractOutputTimestampFn(SerializableFunction)} asks for a function + * which takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is used to + * produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link + * ReadViaSDF#withProcessingTime()}, {@link ReadViaSDF#withCreateTime()} and {@link + * ReadViaSDF#withLogAppendTime()}. + * + * <p>For example, to create a {@link ReadViaSDF} with these configurations: + * + * <pre>{@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("my_topic", 1)))) + * .apply(ReadFromKafkaViaSDF.create() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class) + * .withProcessingTime() + * .commitOffsets()); + * + * }</pre> + * + * <h3>Read from Kafka source description in {@link Row}</h3> + * + * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link + * Row}, and the restriction is an {@link OffsetRange} which represents record offset. A {@link + * GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with {@code + * Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is created. + * + * <h4>Initialize Restriction</h4> + * + * {@link ReadFromKafkaDoFn#initialRestriction(Row)} creates an initial range for a input element + * {@link Row}. The end of range will be initialized as {@code Long.MAX_VALUE}. For the start of the + * range: + * + * <ul> + * <li>If {@code start_read_offset} in {@link Row} is set, use this offset as start. + * <li>If {@code start_read_time} in {@link Row} is set, seek the start offset based on this time. + * <li>Otherwise, the last committed offset + 1 will be returned by {@link + * Consumer#position(TopicPartition)} as the start. + * </ul> + * + * <h4>Initial Split</h4> + * + * <p>There is no initial split for now. + * + * <h4>Checkpoint and Resume Processing</h4> + * + * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by the DoFn and + * system-checkpoint which is issued by the runner via {@link + * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. Every time the + * consumer gets empty response from {@link Consumer#poll(long)}, {@link ReadFromKafkaDoFn} will + * checkpoint at current {@link Row} and move to process the next element. These deferred elements + * will be resumed by the runner as soon as possible. + * + * <h4>Progress and Size</h4> + * + * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or {@link OffsetRangeTracker} + * per {@link Row}. For an infinite {@link OffsetRange}, a Kafka {@link Consumer} is used in the + * {@link GrowableOffsetRangeTracker} as the {@link GrowableOffsetRangeTracker.RangeEndEstimator} to + * poll the latest offset. Please refer to {@link ReadFromKafkaDoFn.KafkaLatestOffsetEstimator} for + * details. + * + * <p>The size is computed by {@link ReadFromKafkaDoFn#getSize(Row, OffsetRange).} A {@link + * KafkaIOUtils.MovingAvg} is used to track the average size of kafka records. + * + * <h4>Track Watermark</h4> + * + * The estimated watermark is computed by {@link MonotonicallyIncreasing} based on output timestamps + * per {@link Row}. + */ +@AutoValue +public abstract class ReadViaSDF<K, V> Review comment: If you take a look at FileIO, you can see we have PTransforms like ReadMatches, MatchAll, ... I think we can do the same with KafkaIO where we add ReadTopics<PCollection<String>, PCollection<KafkaRecord>> and these are wrappers over the ReadViaSDF implementation with a transform that converts String -> Row in this example. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org