boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r444434591
########## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ########## @@ -295,21 +301,32 @@ /** * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka configuration * should set with {@link Read#withBootstrapServers(String)} and {@link Read#withTopics(List)}. - * Other optional settings include key and value {@link Deserializer}s, custom timestamp and + * Other optional settings include key and value {@link Deserializer}s, custom timestamp, * watermark functions. */ public static <K, V> Read<K, V> read() { return new AutoValue_KafkaIO_Read.Builder<K, V>() .setTopics(new ArrayList<>()) .setTopicPartitions(new ArrayList<>()) - .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN) - .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES) + .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) .setMaxNumRecords(Long.MAX_VALUE) .setCommitOffsetsInFinalizeEnabled(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) .build(); } + /** + * Creates an uninitialized {@link ReadViaSDF} {@link PTransform}. Different from {@link Read}, + * setting up {@code topics} and {@code bootstrapServers} is not required during construction + * time. But the {@code bootstrapServers} still can be configured {@link + * ReadViaSDF#withBootstrapServers(String)}. Please refer to {@link ReadViaSDF} for more details. + */ + public static <K, V, WatermarkEstimatorT extends WatermarkEstimator<Instant>> + ReadViaSDF<K, V, WatermarkEstimatorT> readAll() { + return ReadViaSDF.<K, V, WatermarkEstimatorT>read(); Review comment: The `WatermarkEstimatorT` is used when defining `createWatermarkEstimatorFn` and when `@NewWatermarkEstimator` is called. I understand that we can always use `WatermarkEstimator` as the type, I thought it would be better to make the type explicitly, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org