lukecwik commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445087927



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.

Review comment:
       ```suggestion
    * Represents a Kafka source description.
    *
    * <p>Note that this object should be encoded/decoded with its corresponding 
{@link #getCoder schema coder}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline

Review comment:
       ```suggestion
    * <pre>{@code
    * pipeline
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *

Review comment:
       ```suggestion
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {
+  @SchemaFieldName("topic")
+  abstract String getTopic();
+
+  @SchemaFieldName("partition")
+  abstract Integer getPartition();
+
+  @SchemaFieldName("start_read_offset")
+  @Nullable
+  abstract Long getStartReadOffset();
+
+  @SchemaFieldName("start_read_time")
+  @Nullable
+  abstract Instant getStartReadTime();
+
+  @SchemaFieldName("bootstrapServers")

Review comment:
       Did you mean to make this one snake_case as well?
   ```suggestion
     @SchemaFieldName("bootstrap_servers")
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *

Review comment:
       ```suggestion
    * }</pre>
    *
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.

Review comment:
       ```suggestion
    * query Kafka topics from a BigQuery table and read these topics via {@link 
ReadAll}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:

Review comment:
       ```suggestion
    * Note that the {@code bootstrapServers} can also be populated from the 
{@link KafkaSourceDescriptor}:
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *

Review comment:
       ```suggestion
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, 
use this offset as
+ *       start.
+ *   <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, 
seek the start offset
+ *       based on this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>

Review comment:
       Should we be defining endReadOffset and endReadTime which are optional 
as well?
   ```suggestion
    * {@link The initial range for
    * a {@link KafkaSourceDescription} is defined by {@code [startOffset, 
Long.MAX_VALUE)} where {@code startOffset} is defined as:
    *
    * <ul>
    *   <li>the {@code startReadOffset} if {@link 
KafkaSourceDescription#getStartReadOffset} is set.
    *   <li>the first offset with a greater or equivalent timestamp if {@link 
KafkaSourceDescription#getStartReadTimestamp} is set.
    *   <li>the {@code last committed offset + 1} for the {@link
    *       Consumer#position(TopicPartition) topic partition}.
    * </ul>
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, 
use this offset as
+ *       start.
+ *   <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, 
seek the start offset
+ *       based on this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>

Review comment:
       ```suggestion
    * <h4>Splitting</h4>
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...

Review comment:
       nit: appropriate appropriate -> appropriate on 
https://github.com/apache/beam/blob/f98104a22b69972744a13378e17af5f2361fbb3e/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L156

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link

Review comment:
       ```suggestion
    *   <li>{@link ReadAll#isCommitOffsetEnabled()} has the same meaning as 
{@link
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));

Review comment:
       ```suggestion
    *  .apply(Create.of(
    *    KafkaSourceDescription.of(
    *      new TopicPartition("topic", 1),
    *      null,
    *      null,
    *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
    *  .apply(KafkaIO.readAll()
    *         .withKeyDeserializer(LongDeserializer.class).
    *         .withValueDeserializer(StringDeserializer.class));
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.Instant;
+
+/**
+ * An AutoValue object which represents a Kafka source description. Note that 
this object should be
+ * encoded/decoded with equivalent {@link Schema} as a {@link Row} when 
crossing the wire.
+ */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class KafkaSourceDescription implements Serializable {

Review comment:
       Do you think the name `KafkaSourceDescriptor` would be more appropriate?

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>

Review comment:
       ```suggestion
    * <h4>Initial Restriction</h4>
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link

Review comment:
       ```suggestion
    * that if the {@code isolation.level} is set to "read_committed" or {@link
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
##########
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.kafka;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.kafka.KafkaIO.ReadAll;
+import org.apache.beam.sdk.io.kafka.KafkaIOUtils.MovingAvg;
+import 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.TimestampPolicyContext;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import 
org.apache.beam.sdk.transforms.splittabledofn.GrowableOffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
+import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Closeables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SplittableDoFn which reads from {@link KafkaSourceDescription} and 
outputs {@link KafkaRecord}.
+ * By default, a {@link MonotonicallyIncreasing} watermark estimator is used 
to track watermark.
+ */
+@UnboundedPerElement
+class ReadFromKafkaDoFn<K, V> extends DoFn<KafkaSourceDescription, 
KafkaRecord<K, V>> {
+
+  ReadFromKafkaDoFn(ReadAll transform) {
+    this.consumerConfig = transform.getConsumerConfig();
+    this.offsetConsumerConfig = transform.getOffsetConsumerConfig();
+    this.keyDeserializerProvider = transform.getKeyDeserializerProvider();
+    this.valueDeserializerProvider = transform.getValueDeserializerProvider();
+    this.consumerFactoryFn = transform.getConsumerFactoryFn();
+    this.extractOutputTimestampFn = transform.getExtractOutputTimestampFn();
+    this.createWatermarkEstimatorFn = 
transform.getCreateWatermarkEstimatorFn();
+    this.timestampPolicyFactory = transform.getTimestampPolicyFactory();
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
+
+  private final Map<String, Object> offsetConsumerConfig;
+
+  private final SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
+      consumerFactoryFn;
+  private final SerializableFunction<KafkaRecord<K, V>, Instant> 
extractOutputTimestampFn;
+  private final SerializableFunction<Instant, WatermarkEstimator<Instant>>
+      createWatermarkEstimatorFn;
+  private final TimestampPolicyFactory<K, V> timestampPolicyFactory;
+
+  // Valid between bundle start and bundle finish.
+  private transient ConsumerSpEL consumerSpEL = null;
+  private transient Deserializer<K> keyDeserializerInstance = null;
+  private transient Deserializer<V> valueDeserializerInstance = null;
+
+  private transient HashMap<TopicPartition, AverageRecordSize> avgRecordSize;

Review comment:
       This is going to grow indefinitely, please use an LRU cache with a 
limited number of elements (e.g. 1000).

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, 
use this offset as
+ *       start.
+ *   <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, 
seek the start offset
+ *       based on this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.
+ *
+ * <h4>Checkpoint and Resume Processing</h4>
+ *
+ * <p>There are 2 types of checkpoint here: self-checkpoint which invokes by 
the DoFn and
+ * system-checkpoint which is issued by the runner via {@link
+ * org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleSplitRequest}. 
Every time the
+ * consumer gets empty response from {@link Consumer#poll(long)}, {@link 
ReadFromKafkaDoFn} will
+ * checkpoint at current {@link KafkaSourceDescription} and move to process 
the next element. These
+ * deferred elements will be resumed by the runner as soon as possible.
+ *
+ * <h4>Progress and Size</h4>
+ *
+ * <p>The progress is provided by {@link GrowableOffsetRangeTracker} or per 
{@link
+ * KafkaSourceDescription}. For an infinite {@link OffsetRange}, a Kafka 
{@link Consumer} is used in
+ * the {@link GrowableOffsetRangeTracker} as the {@link
+ * GrowableOffsetRangeTracker.RangeEndEstimator} to poll the latest offset. 
Please refer to {@link
+ * ReadFromKafkaDoFn#restrictionTracker(KafkaSourceDescription, OffsetRange)} 
for details.
+ *
+ * <p>The size is computed by {@link 
ReadFromKafkaDoFn#getSize(KafkaSourceDescription,
+ * OffsetRange).} A {@link KafkaIOUtils.MovingAvg} is used to track the 
average size of kafka
+ * records.
+ *
+ * <h4>Track Watermark</h4>
+ *
+ * The {@link WatermarkEstimator} is created by {@link 
ReadAll#getCreateWatermarkEstimatorFn()}. The
+ * estimated watermark is computed by this {@link WatermarkEstimator} based on 
output timestamps
+ * computed by {@link ReadAll#getExtractOutputTimestampFn()} 
(SerializableFunction)}. The default
+ * configuration is using {@link ReadAll#withProcessingTime()} as {@code 
extractTimestampFn} and
+ * {@link ReadAll#withMonotonicallyIncreasingWatermarkEstimator()} as {@link 
WatermarkEstimator}.

Review comment:
       Move to ReadFromKafkaViaDoFn

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
-  
////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll<K, V>
+      extends PTransform<PCollection<KafkaSourceDescription>, 
PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class);
+
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
+    @Nullable
+    abstract DeserializerProvider getKeyDeserializerProvider();
+
+    @Nullable
+    abstract DeserializerProvider getValueDeserializerProvider();
+
+    @Nullable
+    abstract Coder<K> getKeyCoder();
+
+    @Nullable
+    abstract Coder<V> getValueCoder();
+
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
+        getConsumerFactoryFn();
+
+    @Nullable
+    abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getExtractOutputTimestampFn();
+
+    @Nullable
+    abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+        getCreateWatermarkEstimatorFn();
+
+    abstract boolean isCommitOffsetEnabled();
+
+    @Nullable
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+    abstract ReadAll.Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract ReadAll.Builder<K, V> setConsumerConfig(Map<String, Object> 
config);
+
+      abstract ReadAll.Builder<K, V> setOffsetConsumerConfig(
+          Map<String, Object> offsetConsumerConfig);
+
+      abstract ReadAll.Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn);
+
+      abstract ReadAll.Builder<K, V> setKeyDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setValueDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+      abstract ReadAll.Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+      abstract ReadAll.Builder<K, V> setExtractOutputTimestampFn(
+          SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+      abstract ReadAll.Builder<K, V> setCreateWatermarkEstimatorFn(
+          SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+      abstract ReadAll.Builder<K, V> setCommitOffsetEnabled(boolean 
commitOffsetEnabled);
+
+      abstract ReadAll.Builder<K, V> 
setTimestampPolicyFactory(TimestampPolicyFactory<K, V> policy);
+
+      abstract ReadAll<K, V> build();
+    }
 
-    for (String key : updates.keySet()) {
+    public static <K, V> ReadAll<K, V> read() {
+      return new AutoValue_KafkaIO_ReadAll.Builder<K, V>()
+          .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+          .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+          .setCommitOffsetEnabled(false)
+          .build()
+          .withProcessingTime()
+          .withMonotonicallyIncreasingWatermarkEstimator();
+    }
+
+    // Note that if the bootstrapServers is set here but also populated with 
the element, the
+    // element
+    // will override the bootstrapServers from the config.
+    public ReadAll<K, V> withBootstrapServers(String bootstrapServers) {
+      return withConsumerConfigUpdates(
+          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerProvider(DeserializerProvider<K> 
deserializerProvider) {
+      return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerProvider(
+        DeserializerProvider<V> deserializerProvider) {
+      return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> 
keyDeserializer) {
+      return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+    }
+
+    public ReadAll<K, V> withValueDeserializer(Class<? extends 
Deserializer<V>> valueDeserializer) {
+      return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
+      return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+    }
+
+    public ReadAll<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+    }
+
+    public ReadAll<K, V> withConsumerConfigUpdates(Map<String, Object> 
configUpdates) {
+      Map<String, Object> config =
+          KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
+      return toBuilder().setConsumerConfig(config).build();
+    }
+
+    public ReadAll<K, V> withExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+      return toBuilder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public ReadAll<K, V> withCreatWatermarkEstimatorFn(
+        SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+      return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+    }
+
+    public ReadAll<K, V> withLogAppendTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useLogAppendTime());
+    }
+
+    public ReadAll<K, V> withProcessingTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useProcessingTime());
+    }
+
+    public ReadAll<K, V> withCreateTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useCreateTime());
+    }
+
+    public ReadAll<K, V> withWallTimeWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new WallTime(state);
+          });
+    }
+
+    public ReadAll<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new MonotonicallyIncreasing(state);
+          });
+    }
+
+    public ReadAll<K, V> withManualWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new Manual(state);
+          });
+    }
+
+    // If a transactional producer is used and it's desired to only read 
records from committed
+    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
+    // default
+    // value.
+    public ReadAll<K, V> withReadCommitted() {
+      return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", 
"read_committed"));
+    }
+
+    public ReadAll<K, V> commitOffsets() {
+      return toBuilder().setCommitOffsetEnabled(true).build();
+    }
+
+    public ReadAll<K, V> withOffsetConsumerConfigOverrides(
+        Map<String, Object> offsetConsumerConfig) {
+      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+    }
+
+    public ReadAll<K, V> withConsumerConfigOverrides(Map<String, Object> 
consumerConfig) {
+      return toBuilder().setConsumerConfig(consumerConfig).build();
+    }
+
+    ReadAllFromRow forExternalBuild() {
+      return new ReadAllFromRow(this);
+    }
+
+    // This transform is used in cross-language case. The input Row should be 
encoded with an
+    // equivalent schema as KafkaSourceDescription.
+    private static class ReadAllFromRow<K, V>
+        extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
+
+      private final ReadAll<K, V> readViaSDF;
+
+      ReadAllFromRow(ReadAll read) {
+        readViaSDF = read;
+      }
+
+      @Override
+      public PCollection<KV<K, V>> expand(PCollection<Row> input) {
+        return input
+            .apply(Convert.fromRows(KafkaSourceDescription.class))
+            .apply(readViaSDF)
+            .apply(
+                ParDo.of(
+                    new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                      @ProcessElement
+                      public void processElement(
+                          @Element KafkaRecord element, OutputReceiver<KV<K, 
V>> outputReceiver) {
+                        outputReceiver.output(element.getKV());
+                      }
+                    }))
+            .setCoder(KvCoder.<K, V>of(readViaSDF.getKeyCoder(), 
readViaSDF.getValueCoder()));
+      }
+    }

Review comment:
       We should move this to the external transform builder and remove the 
`forExternalBuild` method.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
-  
////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll<K, V>
+      extends PTransform<PCollection<KafkaSourceDescription>, 
PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class);
+
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
+    @Nullable
+    abstract DeserializerProvider getKeyDeserializerProvider();
+
+    @Nullable
+    abstract DeserializerProvider getValueDeserializerProvider();
+
+    @Nullable
+    abstract Coder<K> getKeyCoder();
+
+    @Nullable
+    abstract Coder<V> getValueCoder();
+
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
+        getConsumerFactoryFn();
+
+    @Nullable
+    abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getExtractOutputTimestampFn();
+
+    @Nullable
+    abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+        getCreateWatermarkEstimatorFn();
+
+    abstract boolean isCommitOffsetEnabled();
+
+    @Nullable
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+    abstract ReadAll.Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract ReadAll.Builder<K, V> setConsumerConfig(Map<String, Object> 
config);
+
+      abstract ReadAll.Builder<K, V> setOffsetConsumerConfig(
+          Map<String, Object> offsetConsumerConfig);
+
+      abstract ReadAll.Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn);
+
+      abstract ReadAll.Builder<K, V> setKeyDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setValueDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+      abstract ReadAll.Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+      abstract ReadAll.Builder<K, V> setExtractOutputTimestampFn(
+          SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+      abstract ReadAll.Builder<K, V> setCreateWatermarkEstimatorFn(
+          SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+      abstract ReadAll.Builder<K, V> setCommitOffsetEnabled(boolean 
commitOffsetEnabled);
+
+      abstract ReadAll.Builder<K, V> 
setTimestampPolicyFactory(TimestampPolicyFactory<K, V> policy);
+
+      abstract ReadAll<K, V> build();
+    }
 
-    for (String key : updates.keySet()) {
+    public static <K, V> ReadAll<K, V> read() {
+      return new AutoValue_KafkaIO_ReadAll.Builder<K, V>()
+          .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+          .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+          .setCommitOffsetEnabled(false)
+          .build()
+          .withProcessingTime()
+          .withMonotonicallyIncreasingWatermarkEstimator();
+    }
+
+    // Note that if the bootstrapServers is set here but also populated with 
the element, the
+    // element
+    // will override the bootstrapServers from the config.
+    public ReadAll<K, V> withBootstrapServers(String bootstrapServers) {
+      return withConsumerConfigUpdates(
+          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerProvider(DeserializerProvider<K> 
deserializerProvider) {
+      return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerProvider(
+        DeserializerProvider<V> deserializerProvider) {
+      return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> 
keyDeserializer) {
+      return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+    }
+
+    public ReadAll<K, V> withValueDeserializer(Class<? extends 
Deserializer<V>> valueDeserializer) {
+      return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
+      return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+    }
+
+    public ReadAll<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+    }
+
+    public ReadAll<K, V> withConsumerConfigUpdates(Map<String, Object> 
configUpdates) {
+      Map<String, Object> config =
+          KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
+      return toBuilder().setConsumerConfig(config).build();
+    }
+
+    public ReadAll<K, V> withExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+      return toBuilder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public ReadAll<K, V> withCreatWatermarkEstimatorFn(
+        SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+      return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+    }
+
+    public ReadAll<K, V> withLogAppendTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useLogAppendTime());
+    }
+
+    public ReadAll<K, V> withProcessingTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useProcessingTime());
+    }
+
+    public ReadAll<K, V> withCreateTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useCreateTime());
+    }
+
+    public ReadAll<K, V> withWallTimeWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new WallTime(state);
+          });
+    }
+
+    public ReadAll<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new MonotonicallyIncreasing(state);
+          });
+    }
+
+    public ReadAll<K, V> withManualWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new Manual(state);
+          });
+    }
+
+    // If a transactional producer is used and it's desired to only read 
records from committed
+    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
+    // default
+    // value.

Review comment:
       Should this be javadoc?
   ```suggestion
       // If a transactional producer is used and it's desired to only read 
records from committed
       // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
       // default value.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -729,12 +902,15 @@ public void setValueDeserializer(String 
valueDeserializer) {
 
     /**
      * Provide custom {@link TimestampPolicyFactory} to set event times and 
watermark for each
-     * partition. {@link 
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is
-     * invoked for each partition when the reader starts.
+     * partition when beam_fn_api is disabled. {@link
+     * TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} 
is invoked for each
+     * partition when the reader starts.
      *
      * @see #withLogAppendTime()
      * @see #withCreateTime(Duration)
      * @see #withProcessingTime()
+     *     <p>For the pipeline with beam_fn_api is enabled, you should use 
{@link

Review comment:
       I don't think we need this suggestion anymore.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.

Review comment:
       ```suggestion
    * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} is 
used to compute the {@code output timestamp} for a given {@link KafkaRecord}. 
There are three built-in types: {@link
    * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
    * ReadAll#withLogAppendTime()}.
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -729,12 +902,15 @@ public void setValueDeserializer(String 
valueDeserializer) {
 
     /**
      * Provide custom {@link TimestampPolicyFactory} to set event times and 
watermark for each
-     * partition. {@link 
TimestampPolicyFactory#createTimestampPolicy(TopicPartition, Optional)} is
-     * invoked for each partition when the reader starts.
+     * partition when beam_fn_api is disabled. {@link

Review comment:
       I think we can keep the original comment here since we support the 
timestamp policy fn now.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:

Review comment:
       ```suggestion
    * <p>For example, to create a {@link ReadAll} with this additional 
configuration:
   ```

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.

Review comment:
       This is an implementation detail, I'm not sure we want to share it as 
part of the Javadoc that we want users to read when they look at KafkaIO. It 
would make sense to have this on the ReadFromKafkaViaDoFn class though.

##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -198,6 +213,154 @@
  *    ...
  * }</pre>
  *
+ * <h2>Read from Kafka as a {@link DoFn}</h2>
+ *
+ * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link
+ * KafkaSourceDescription} as input and outputs a PCollection of {@link 
KafkaRecord}. The core
+ * implementation is based on {@code SplittableDoFn}. For more details about 
the concept of {@code
+ * SplittableDoFn}, please refer to the <a
+ * href="https://beam.apache.org/blog/splittable-do-fn/";>blog post</a> and <a
+ * href="https://s.apache.org/beam-fn-api";>design doc</a>. The major 
difference from {@link
+ * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., 
{@link
+ * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link
+ * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction 
time. Instead, the
+ * pipeline can populate these source descriptions during runtime. For 
example, the pipeline can
+ * query Kafka topics from BigQuery table and read these topics via {@link 
ReadAll}.
+ *
+ * <h3>Common Kafka Consumer Configurations</h3>
+ *
+ * <p>Most Kafka consumer configurations are similar to {@link KafkaIO.Read}:
+ *
+ * <ul>
+ *   <li>{@link ReadAll#getConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getConsumerConfig()}.
+ *   <li>{@link ReadAll#getConsumerFactoryFn()} is the same as {@link
+ *       KafkaIO.Read#getConsumerFactoryFn()}.
+ *   <li>{@link ReadAll#getOffsetConsumerConfig()} is the same as {@link
+ *       KafkaIO.Read#getOffsetConsumerConfig()}.
+ *   <li>{@link ReadAll#getKeyCoder()} is the same as {@link 
KafkaIO.Read#getKeyCoder()}.
+ *   <li>{@link ReadAll#getValueCoder()} is the same as {@link 
KafkaIO.Read#getValueCoder()}.
+ *   <li>{@link ReadAll#getKeyDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getKeyDeserializerProvider()}.
+ *   <li>{@link ReadAll#getValueDeserializerProvider()} is the same as {@link
+ *       KafkaIO.Read#getValueDeserializerProvider()}.
+ *   <li>{@link ReadAll#isCommitOffsetEnabled()} means the same as {@link
+ *       KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}.
+ * </ul>
+ *
+ * <p>For example, to create a basic {@link ReadAll} transform:
+ *
+ * <pre>{@code
+ * pipeline
+ *  .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1)))
+ *  .apply(KafkaIO.readAll()
+ *          .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class));
+ *
+ * Note that the {@code bootstrapServers} can also be populated from {@link 
KafkaSourceDescription}:
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *         .withKeyDeserializer(LongDeserializer.class).
+ *         .withValueDeserializer(StringDeserializer.class));
+ *
+ * }</pre>
+ *
+ * <h3>Configurations of {@link ReadAll}</h3>
+ *
+ * <p>Except configurations of Kafka Consumer, there are some other 
configurations which are related
+ * to processing records.
+ *
+ * <p>{@link ReadAll#commitOffsets()} enables committing offset after 
processing the record. Note
+ * that if {@code isolation.level} is set to "read_committed" or {@link
+ * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, 
the {@link
+ * ReadAll#commitOffsets()} will be ignored.
+ *
+ * <p>{@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks 
for a function which
+ * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This 
function is used to
+ * produce output timestamp per {@link KafkaRecord}. There are three built-in 
types: {@link
+ * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link
+ * ReadAll#withLogAppendTime()}.
+ *
+ * <p>For example, to create a {@link ReadAll} with these configurations:
+ *
+ * <pre>{@code
+ * pipeline
+ * .apply(Create.of(
+ *    KafkaSourceDescription.of(
+ *      new TopicPartition("topic", 1),
+ *      null,
+ *      null,
+ *      ImmutableList.of("broker_1:9092", "broker_2:9092"))
+ * .apply(KafkaIO.readAll()
+ *          .withKeyDeserializer(LongDeserializer.class).
+ *          .withValueDeserializer(StringDeserializer.class)
+ *          .withProcessingTime()
+ *          .commitOffsets());
+ *
+ * }</pre>
+ *
+ * <h3>Read from {@link KafkaSourceDescription}</h3>
+ *
+ * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The 
element is a {@link
+ * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} 
which represents record
+ * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link 
OffsetRange} ended with
+ * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is 
created.
+ *
+ * <h4>Initialize Restriction</h4>
+ *
+ * {@link ReadFromKafkaDoFn#initialRestriction(KafkaSourceDescription)} 
creates an initial range for
+ * a input element {@link KafkaSourceDescription}. The end of range will be 
initialized as {@code
+ * Long.MAX_VALUE}. For the start of the range:
+ *
+ * <ul>
+ *   <li>If {@code startReadOffset} in {@link KafkaSourceDescription} is set, 
use this offset as
+ *       start.
+ *   <li>If {@code startReadTime} in {@link KafkaSourceDescription} is set, 
seek the start offset
+ *       based on this time.
+ *   <li>Otherwise, the last committed offset + 1 will be returned by {@link
+ *       Consumer#position(TopicPartition)} as the start.
+ * </ul>
+ *
+ * <h4>Initial Split</h4>
+ *
+ * <p>There is no initial split for now.

Review comment:
       ```suggestion
    * <p>TODO(BEAM-YYY): Add support for initial splitting.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to