boyuanzz commented on a change in pull request #11749:
URL: https://github.com/apache/beam/pull/11749#discussion_r466673488
##########
File path:
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1051,33 +1198,352 @@ public void populateDisplayData(DisplayData.Builder
builder) {
}
}
-
////////////////////////////////////////////////////////////////////////////////////////////////
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class);
-
/**
- * Returns a new config map which is merge of current config and updates.
Verifies the updates do
- * not includes ignored properties.
+ * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more
information on usage and
+ * configuration.
*/
- private static Map<String, Object> updateKafkaProperties(
- Map<String, Object> currentConfig,
- Map<String, String> ignoredProperties,
- Map<String, Object> updates) {
+ @Experimental(Kind.PORTABILITY)
+ @AutoValue
+ public abstract static class ReadSourceDescriptors<K, V>
+ extends PTransform<PCollection<KafkaSourceDescriptor>,
PCollection<KafkaRecord<K, V>>> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ReadSourceDescriptors.class);
+
+ abstract Map<String, Object> getConsumerConfig();
+
+ @Nullable
+ abstract Map<String, Object> getOffsetConsumerConfig();
+
+ @Nullable
+ abstract DeserializerProvider getKeyDeserializerProvider();
+
+ @Nullable
+ abstract DeserializerProvider getValueDeserializerProvider();
+
+ @Nullable
+ abstract Coder<K> getKeyCoder();
+
+ @Nullable
+ abstract Coder<V> getValueCoder();
+
+ abstract SerializableFunction<Map<String, Object>, Consumer<byte[],
byte[]>>
+ getConsumerFactoryFn();
+
+ @Nullable
+ abstract SerializableFunction<KafkaRecord<K, V>, Instant>
getExtractOutputTimestampFn();
+
+ @Nullable
+ abstract SerializableFunction<Instant, WatermarkEstimator<Instant>>
+ getCreateWatermarkEstimatorFn();
+
+ abstract boolean isCommitOffsetEnabled();
+
+ @Nullable
+ abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+
+ abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder<K, V> {
+ abstract ReadSourceDescriptors.Builder<K, V>
setConsumerConfig(Map<String, Object> config);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setOffsetConsumerConfig(
+ Map<String, Object> offsetConsumerConfig);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setConsumerFactoryFn(
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setKeyDeserializerProvider(
+ DeserializerProvider deserializerProvider);
+
+ abstract ReadSourceDescriptors.Builder<K, V>
setValueDeserializerProvider(
+ DeserializerProvider deserializerProvider);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setKeyCoder(Coder<K>
keyCoder);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setValueCoder(Coder<V>
valueCoder);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setExtractOutputTimestampFn(
+ SerializableFunction<KafkaRecord<K, V>, Instant> fn);
+
+ abstract ReadSourceDescriptors.Builder<K, V>
setCreateWatermarkEstimatorFn(
+ SerializableFunction<Instant, WatermarkEstimator<Instant>> fn);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setCommitOffsetEnabled(
+ boolean commitOffsetEnabled);
+
+ abstract ReadSourceDescriptors.Builder<K, V> setTimestampPolicyFactory(
+ TimestampPolicyFactory<K, V> policy);
+
+ abstract ReadSourceDescriptors<K, V> build();
+ }
+
+ public static <K, V> ReadSourceDescriptors<K, V> read() {
+ return new AutoValue_KafkaIO_ReadSourceDescriptors.Builder<K, V>()
+ .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN)
+ .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES)
+ .setCommitOffsetEnabled(false)
+ .build()
+ .withProcessingTime()
+ .withMonotonicallyIncreasingWatermarkEstimator();
+ }
+
+ // Note that if the bootstrapServers is set here but also populated with
the element, the
+ // element
+ // will override the bootstrapServers from the config.
+ public ReadSourceDescriptors<K, V> withBootstrapServers(String
bootstrapServers) {
+ return withConsumerConfigUpdates(
+ ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers));
+ }
+
+ public ReadSourceDescriptors<K, V> withKeyDeserializerProvider(
+ DeserializerProvider<K> deserializerProvider) {
+ return
toBuilder().setKeyDeserializerProvider(deserializerProvider).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withValueDeserializerProvider(
+ DeserializerProvider<V> deserializerProvider) {
+ return
toBuilder().setValueDeserializerProvider(deserializerProvider).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withKeyDeserializer(
+ Class<? extends Deserializer<K>> keyDeserializer) {
+ return
withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer));
+ }
+
+ public ReadSourceDescriptors<K, V> withValueDeserializer(
+ Class<? extends Deserializer<V>> valueDeserializer) {
+ return
withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer));
+ }
+
+ public ReadSourceDescriptors<K, V> withKeyDeserializerAndCoder(
+ Class<? extends Deserializer<K>> keyDeserializer, Coder<K> keyCoder) {
+ return
withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withValueDeserializerAndCoder(
+ Class<? extends Deserializer<V>> valueDeserializer, Coder<V>
valueCoder) {
+ return
withValueDeserializer(valueDeserializer).toBuilder().setValueCoder(valueCoder).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withConsumerFactoryFn(
+ SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
consumerFactoryFn) {
+ return toBuilder().setConsumerFactoryFn(consumerFactoryFn).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withConsumerConfigUpdates(
+ Map<String, Object> configUpdates) {
+ Map<String, Object> config =
+ KafkaIOUtils.updateKafkaProperties(getConsumerConfig(),
configUpdates);
+ return toBuilder().setConsumerConfig(config).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withExtractOutputTimestampFn(
+ SerializableFunction<KafkaRecord<K, V>, Instant> fn) {
+ return toBuilder().setExtractOutputTimestampFn(fn).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withCreatWatermarkEstimatorFn(
+ SerializableFunction<Instant, WatermarkEstimator<Instant>> fn) {
+ return toBuilder().setCreateWatermarkEstimatorFn(fn).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withLogAppendTime() {
+ return withExtractOutputTimestampFn(
+ ReadSourceDescriptors.ExtractOutputTimestampFns.useLogAppendTime());
+ }
+
+ public ReadSourceDescriptors<K, V> withProcessingTime() {
+ return withExtractOutputTimestampFn(
+ ReadSourceDescriptors.ExtractOutputTimestampFns.useProcessingTime());
+ }
+
+ public ReadSourceDescriptors<K, V> withCreateTime() {
+ return withExtractOutputTimestampFn(
+ ReadSourceDescriptors.ExtractOutputTimestampFns.useCreateTime());
+ }
+
+ public ReadSourceDescriptors<K, V> withWallTimeWatermarkEstimator() {
+ return withCreatWatermarkEstimatorFn(
+ state -> {
+ return new WallTime(state);
+ });
+ }
+
+ public ReadSourceDescriptors<K, V>
withMonotonicallyIncreasingWatermarkEstimator() {
+ return withCreatWatermarkEstimatorFn(
+ state -> {
+ return new MonotonicallyIncreasing(state);
+ });
+ }
+
+ public ReadSourceDescriptors<K, V> withManualWatermarkEstimator() {
+ return withCreatWatermarkEstimatorFn(
+ state -> {
+ return new Manual(state);
+ });
+ }
+
+ // If a transactional producer is used and it's desired to only read
records from committed
+ // transaction, it's recommended to set read_committed. Otherwise,
read_uncommitted is the
+ // default value.
+ public ReadSourceDescriptors<K, V> withReadCommitted() {
+ return withConsumerConfigUpdates(ImmutableMap.of("isolation.level",
"read_committed"));
+ }
+
+ public ReadSourceDescriptors<K, V> commitOffsets() {
+ return toBuilder().setCommitOffsetEnabled(true).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withOffsetConsumerConfigOverrides(
+ Map<String, Object> offsetConsumerConfig) {
+ return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+ }
+
+ public ReadSourceDescriptors<K, V> withConsumerConfigOverrides(
+ Map<String, Object> consumerConfig) {
+ return toBuilder().setConsumerConfig(consumerConfig).build();
+ }
+
+ // TODO(BEAM-10320): Create external build transform for
ReadSourceDescriptors().
Review comment:
Yes you are right. Thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]