This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9de17f51d76a232261b2ae9c10ccc98f494d54ae Author: sbourkeostk <[email protected]> AuthorDate: Mon Feb 15 05:06:16 2021 +0000 [Issue 9480][pulsar-io] add option for auto.offset.reset to kafka source (#9482) Fixes #9480 ### Motivation The kafka source sets auto.offset.reset to "earliest". This means all old messages from kafka are produced to pulsar. Often is it desirable to start form the present location "latest". The option is set after the user config has been loaded so it cannot be changed: [source code link](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java#L87) ### Modifications Added an autoOffsetReset option to KafkaSourceConfig (cherry picked from commit 9c2b081fac0d48039820a32c3811864921339912) --- .../main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 2 +- .../src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java | 5 +++++ .../org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java | 4 ++++ pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml | 3 ++- site2/docs/adaptors-kafka.md | 2 +- site2/docs/io-kafka-source.md | 1 + 6 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index c828ef5..de2e0c2 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -84,7 +84,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs())); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, String.valueOf(kafkaSourceConfig.getSessionTimeoutMs())); props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(kafkaSourceConfig.getHeartbeatIntervalMs())); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaSourceConfig.getAutoOffsetReset()); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getKeyDeserializationClass()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaSourceConfig.getValueDeserializationClass()); try { diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java index 9e97d04..3fa687e 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java @@ -94,6 +94,11 @@ public class KafkaSourceConfig implements Serializable { + "Since the deserializer will be set by a specific implementation of `KafkaAbstractSource`.") private String valueDeserializationClass = "org.apache.kafka.common.serialization.ByteArrayDeserializer"; @FieldDoc( + defaultValue = "earliest", + help = + "The default offset reset policy.") + private String autoOffsetReset = "earliest"; + @FieldDoc( defaultValue = "", help = "The consumer config properties to be passed to Consumer. Note that other properties specified " diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index b84d1b5..aa909ef 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -203,6 +203,9 @@ public class KafkaAbstractSourceTest { config.put("heartbeatIntervalMs", 20000); expectThrows(IllegalArgumentException.class, "Unable to instantiate Kafka consumer", openAndClose); config.put("heartbeatIntervalMs", 5000); + config.put("autoOffsetReset", "some-value"); + expectThrows(IllegalArgumentException.class, "Unable to instantiate Kafka consumer", openAndClose); + config.put("autoOffsetReset", "earliest"); source.open(config, ctx); source.close(); } @@ -216,6 +219,7 @@ public class KafkaAbstractSourceTest { assertEquals("test", config.getTopic()); assertEquals(Long.parseLong("10000"), config.getSessionTimeoutMs()); assertEquals(Boolean.parseBoolean("false"), config.isAutoCommitEnabled()); + assertEquals("latest", config.getAutoOffsetReset()); assertNotNull(config.getConsumerConfigProperties()); Properties props = new Properties(); props.putAll(config.getConsumerConfigProperties()); diff --git a/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml b/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml index 0e5c13e..d466f92 100644 --- a/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml +++ b/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml @@ -22,9 +22,10 @@ "topic": "test" "sessionTimeoutMs": "10000" "autoCommitEnabled": "false" +"autoOffsetReset": "latest" "consumerConfigProperties": "client.id": "test-pulsar-consumer" "security.protocol": "SASL_PLAINTEXT" "sasl.mechanism": "GSSAPI" "sasl.kerberos.service.name": "kafka" - "group.id": "test-pulsar-io-group" \ No newline at end of file + "group.id": "test-pulsar-io-group" diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md index 4ec0f15..7731cff 100644 --- a/site2/docs/adaptors-kafka.md +++ b/site2/docs/adaptors-kafka.md @@ -128,7 +128,7 @@ Properties: | Config property | Supported | Notes | |:----------------------------------------|:----------|:------------------------------------------------------------------------------| | `acks` | Ignored | Durability and quorum writes are configured at the namespace level | -| `auto.offset.reset` | Yes | Will have a default value of `latest` if user does not give specific setting. | +| `auto.offset.reset` | Yes | It uses a default value of `earliest` if you do not give a specific setting. | | `batch.size` | Ignored | | | `bootstrap.servers` | Yes | | | `buffer.memory` | Ignored | | diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md index 484cf9d..d5e1ed8 100644 --- a/site2/docs/io-kafka-source.md +++ b/site2/docs/io-kafka-source.md @@ -28,6 +28,7 @@ The configuration of the Kafka source connector has the following properties. | `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. | | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java). | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values. +| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. | ### Example
