boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r445235681



##########
File path: 
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
     }
   }
 
-  
////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
   /**
-   * Returns a new config map which is merge of current config and updates. 
Verifies the updates do
-   * not includes ignored properties.
+   * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more 
information on usage and
+   * configuration.
    */
-  private static Map<String, Object> updateKafkaProperties(
-      Map<String, Object> currentConfig,
-      Map<String, String> ignoredProperties,
-      Map<String, Object> updates) {
+  @Experimental(Kind.PORTABILITY)
+  @AutoValue
+  public abstract static class ReadAll<K, V>
+      extends PTransform<PCollection<KafkaSourceDescription>, 
PCollection<KafkaRecord<K, V>>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class);
+
+    abstract Map<String, Object> getConsumerConfig();
+
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
+    @Nullable
+    abstract DeserializerProvider getKeyDeserializerProvider();
+
+    @Nullable
+    abstract DeserializerProvider getValueDeserializerProvider();
+
+    @Nullable
+    abstract Coder<K> getKeyCoder();
+
+    @Nullable
+    abstract Coder<V> getValueCoder();
+
+    abstract SerializableFunction<Map<String, Object>, Consumer<byte[], 
byte[]>>
+        getConsumerFactoryFn();
+
+    @Nullable
+    abstract SerializableFunction<KafkaRecord<K, V>, Instant> 
getExtractOutputTimestampFn();
+
+    @Nullable
+    abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+        getCreateWatermarkEstimatorFn();
+
+    abstract boolean isCommitOffsetEnabled();
+
+    @Nullable
+    abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+    abstract ReadAll.Builder<K, V> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<K, V> {
+      abstract ReadAll.Builder<K, V> setConsumerConfig(Map<String, Object> 
config);
+
+      abstract ReadAll.Builder<K, V> setOffsetConsumerConfig(
+          Map<String, Object> offsetConsumerConfig);
+
+      abstract ReadAll.Builder<K, V> setConsumerFactoryFn(
+          SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn);
+
+      abstract ReadAll.Builder<K, V> setKeyDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setValueDeserializerProvider(
+          DeserializerProvider deserializerProvider);
+
+      abstract ReadAll.Builder<K, V> setKeyCoder(Coder<K> keyCoder);
+
+      abstract ReadAll.Builder<K, V> setValueCoder(Coder<V> valueCoder);
+
+      abstract ReadAll.Builder<K, V> setExtractOutputTimestampFn(
+          SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+      abstract ReadAll.Builder<K, V> setCreateWatermarkEstimatorFn(
+          SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+      abstract ReadAll.Builder<K, V> setCommitOffsetEnabled(boolean 
commitOffsetEnabled);
+
+      abstract ReadAll.Builder<K, V> 
setTimestampPolicyFactory(TimestampPolicyFactory<K, V> policy);
+
+      abstract ReadAll<K, V> build();
+    }
 
-    for (String key : updates.keySet()) {
+    public static <K, V> ReadAll<K, V> read() {
+      return new AutoValue_KafkaIO_ReadAll.Builder<K, V>()
+          .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+          .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+          .setCommitOffsetEnabled(false)
+          .build()
+          .withProcessingTime()
+          .withMonotonicallyIncreasingWatermarkEstimator();
+    }
+
+    // Note that if the bootstrapServers is set here but also populated with 
the element, the
+    // element
+    // will override the bootstrapServers from the config.
+    public ReadAll<K, V> withBootstrapServers(String bootstrapServers) {
+      return withConsumerConfigUpdates(
+          ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerProvider(DeserializerProvider<K> 
deserializerProvider) {
+      return 
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerProvider(
+        DeserializerProvider<V> deserializerProvider) {
+      return 
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+    }
+
+    public ReadAll<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> 
keyDeserializer) {
+      return 
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+    }
+
+    public ReadAll<K, V> withValueDeserializer(Class<? extends 
Deserializer<V>> valueDeserializer) {
+      return 
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+    }
+
+    public ReadAll<K, V> withKeyDeserializerAndCoder(
+        Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+      return 
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+    }
+
+    public ReadAll<K, V> withValueDeserializerAndCoder(
+        Class<? extends Deserializer<V>> valueDeserializer, Coder<V> 
valueCoder) {
+      return 
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+    }
+
+    public ReadAll<K, V> withConsumerFactoryFn(
+        SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> 
consumerFactoryFn) {
+      return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+    }
+
+    public ReadAll<K, V> withConsumerConfigUpdates(Map<String, Object> 
configUpdates) {
+      Map<String, Object> config =
+          KafkaIOUtils.updateKafkaProperties(getConsumerConfig(), 
configUpdates);
+      return toBuilder().setConsumerConfig(config).build();
+    }
+
+    public ReadAll<K, V> withExtractOutputTimestampFn(
+        SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+      return toBuilder().setExtractOutputTimestampFn(fn).build();
+    }
+
+    public ReadAll<K, V> withCreatWatermarkEstimatorFn(
+        SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+      return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+    }
+
+    public ReadAll<K, V> withLogAppendTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useLogAppendTime());
+    }
+
+    public ReadAll<K, V> withProcessingTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useProcessingTime());
+    }
+
+    public ReadAll<K, V> withCreateTime() {
+      return 
withExtractOutputTimestampFn(ReadAll.ExtractOutputTimestampFns.useCreateTime());
+    }
+
+    public ReadAll<K, V> withWallTimeWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new WallTime(state);
+          });
+    }
+
+    public ReadAll<K, V> withMonotonicallyIncreasingWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new MonotonicallyIncreasing(state);
+          });
+    }
+
+    public ReadAll<K, V> withManualWatermarkEstimator() {
+      return withCreatWatermarkEstimatorFn(
+          state -> {
+            return new Manual(state);
+          });
+    }
+
+    // If a transactional producer is used and it's desired to only read 
records from committed
+    // transaction, it's recommended to set read_committed. Otherwise, 
read_uncommitted is the
+    // default
+    // value.
+    public ReadAll<K, V> withReadCommitted() {
+      return withConsumerConfigUpdates(ImmutableMap.of("isolation.level", 
"read_committed"));
+    }
+
+    public ReadAll<K, V> commitOffsets() {
+      return toBuilder().setCommitOffsetEnabled(true).build();
+    }
+
+    public ReadAll<K, V> withOffsetConsumerConfigOverrides(
+        Map<String, Object> offsetConsumerConfig) {
+      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+    }
+
+    public ReadAll<K, V> withConsumerConfigOverrides(Map<String, Object> 
consumerConfig) {
+      return toBuilder().setConsumerConfig(consumerConfig).build();
+    }
+
+    ReadAllFromRow forExternalBuild() {
+      return new ReadAllFromRow(this);
+    }
+
+    // This transform is used in cross-language case. The input Row should be 
encoded with an
+    // equivalent schema as KafkaSourceDescription.
+    private static class ReadAllFromRow<K, V>
+        extends PTransform<PCollection<Row>, PCollection<KV<K, V>>> {
+
+      private final ReadAll<K, V> readViaSDF;
+
+      ReadAllFromRow(ReadAll read) {
+        readViaSDF = read;
+      }
+
+      @Override
+      public PCollection<KV<K, V>> expand(PCollection<Row> input) {
+        return input
+            .apply(Convert.fromRows(KafkaSourceDescription.class))
+            .apply(readViaSDF)
+            .apply(
+                ParDo.of(
+                    new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+                      @ProcessElement
+                      public void processElement(
+                          @Element KafkaRecord element, OutputReceiver<KV<K, 
V>> outputReceiver) {
+                        outputReceiver.output(element.getKV());
+                      }
+                    }))
+            .setCoder(KvCoder.<K, V>of(readViaSDF.getKeyCoder(), 
readViaSDF.getValueCoder()));
+      }
+    }

Review comment:
       I plan to have a separate PR to introduce external transform builder for 
ReadAll(). The `buildExternal` will be like:
   `return build().forExternalBuild()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to