This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 9de17f51d76a232261b2ae9c10ccc98f494d54ae
Author: sbourkeostk <[email protected]>
AuthorDate: Mon Feb 15 05:06:16 2021 +0000

    [Issue 9480][pulsar-io] add option for auto.offset.reset to kafka source 
(#9482)
    
    Fixes #9480
    
    ### Motivation
    
    The kafka source sets auto.offset.reset to "earliest". This means all old 
messages from kafka are produced to pulsar. Often is it desirable to start form 
the present location "latest".
    The option is set after the user config has been loaded so it cannot be 
changed:
    [source code 
link](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java#L87)
    
    ### Modifications
    
    Added an autoOffsetReset option to KafkaSourceConfig
    
    (cherry picked from commit 9c2b081fac0d48039820a32c3811864921339912)
---
 .../main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java    | 2 +-
 .../src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java  | 5 +++++
 .../org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java   | 4 ++++
 pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml            | 3 ++-
 site2/docs/adaptors-kafka.md                                         | 2 +-
 site2/docs/io-kafka-source.md                                        | 1 +
 6 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index c828ef5..de2e0c2 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -84,7 +84,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(kafkaSourceConfig.getAutoCommitIntervalMs()));
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
String.valueOf(kafkaSourceConfig.getSessionTimeoutMs()));
         props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 
String.valueOf(kafkaSourceConfig.getHeartbeatIntervalMs()));
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
kafkaSourceConfig.getAutoOffsetReset());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getKeyDeserializationClass());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
kafkaSourceConfig.getValueDeserializationClass());
         try {
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
index 9e97d04..3fa687e 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java
@@ -94,6 +94,11 @@ public class KafkaSourceConfig implements Serializable {
                 + "Since the deserializer will be set by a specific 
implementation of `KafkaAbstractSource`.")
     private String valueDeserializationClass = 
"org.apache.kafka.common.serialization.ByteArrayDeserializer";
     @FieldDoc(
+            defaultValue = "earliest",
+            help =
+                    "The default offset reset policy.")
+    private String autoOffsetReset = "earliest";
+    @FieldDoc(
         defaultValue = "",
         help =
             "The consumer config properties to be passed to Consumer. Note 
that other properties specified "
diff --git 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
index b84d1b5..aa909ef 100644
--- 
a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
+++ 
b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java
@@ -203,6 +203,9 @@ public class KafkaAbstractSourceTest {
         config.put("heartbeatIntervalMs", 20000);
         expectThrows(IllegalArgumentException.class, "Unable to instantiate 
Kafka consumer", openAndClose);
         config.put("heartbeatIntervalMs", 5000);
+        config.put("autoOffsetReset", "some-value");
+        expectThrows(IllegalArgumentException.class, "Unable to instantiate 
Kafka consumer", openAndClose);
+        config.put("autoOffsetReset", "earliest");
         source.open(config, ctx);
         source.close();
     }
@@ -216,6 +219,7 @@ public class KafkaAbstractSourceTest {
         assertEquals("test", config.getTopic());
         assertEquals(Long.parseLong("10000"), config.getSessionTimeoutMs());
         assertEquals(Boolean.parseBoolean("false"), 
config.isAutoCommitEnabled());
+        assertEquals("latest", config.getAutoOffsetReset());
         assertNotNull(config.getConsumerConfigProperties());
         Properties props = new Properties();
         props.putAll(config.getConsumerConfigProperties());
diff --git a/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml 
b/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml
index 0e5c13e..d466f92 100644
--- a/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml
+++ b/pulsar-io/kafka/src/test/resources/kafkaSourceConfig.yaml
@@ -22,9 +22,10 @@
 "topic": "test"
 "sessionTimeoutMs": "10000"
 "autoCommitEnabled": "false"
+"autoOffsetReset": "latest"
 "consumerConfigProperties":
     "client.id": "test-pulsar-consumer"
     "security.protocol": "SASL_PLAINTEXT"
     "sasl.mechanism": "GSSAPI"
     "sasl.kerberos.service.name": "kafka"
-    "group.id": "test-pulsar-io-group"
\ No newline at end of file
+    "group.id": "test-pulsar-io-group"
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 4ec0f15..7731cff 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -128,7 +128,7 @@ Properties:
 | Config property                         | Supported | Notes                  
                                                       |
 
|:----------------------------------------|:----------|:------------------------------------------------------------------------------|
 | `acks`                                  | Ignored   | Durability and quorum 
writes are configured at the namespace level            |
-| `auto.offset.reset`                     | Yes       | Will have a default 
value of `latest` if user does not give specific setting. |
+| `auto.offset.reset`                     | Yes       | It uses a default 
value of `earliest` if you do not give a specific setting. |
 | `batch.size`                            | Ignored   |                        
                                                       |
 | `bootstrap.servers`                     | Yes       |                        
         |
 | `buffer.memory`                         | Ignored   |                        
                                                       |
diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md
index 484cf9d..d5e1ed8 100644
--- a/site2/docs/io-kafka-source.md
+++ b/site2/docs/io-kafka-source.md
@@ -28,6 +28,7 @@ The configuration of the Kafka source connector has the 
following properties.
 |  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
+| `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
 
 ### Example

Reply via email to