This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0992634  add getPartitionIndex() to the Record<> (#9947)
0992634 is described below

commit 0992634d9f1dd77543bfa89540b330f162de8215
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Mar 25 21:28:46 2021 -0700

    add getPartitionIndex() to the Record<> (#9947)
    
    ### Motivation
    
    Looking at https://github.com/apache/pulsar/pull/9927#discussion_r595714308 
there is no way to reliably get partition number.
    There is Optional<String> getPartitionId() which returns:
    - "{topic}-{partition}" in some Record implementation
    - String(partition) in other implementation (parsing back integer is a 
waste)
    - empty Optional (ok) in others
    
    ### Modifications
    
    Added `default Optional<Integer> getPartitionIndex()` to the `Record` 
interface.
    Return partition number where appropriate.
---
 .../src/main/java/org/apache/pulsar/functions/api/Record.java    | 9 +++++++++
 .../java/org/apache/pulsar/functions/instance/SinkRecord.java    | 5 +++++
 .../java/org/apache/pulsar/functions/source/PulsarRecord.java    | 5 +++++
 .../java/org/apache/pulsar/functions/sink/PulsarSinkTest.java    | 7 +++++++
 .../pulsar/io/kafka/connect/AbstractKafkaConnectSource.java      | 3 +++
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java   | 1 +
 .../apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java   | 3 +++
 .../java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java     | 5 +++++
 8 files changed, 38 insertions(+)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index bfaf6dc..79aab6c 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -78,6 +78,15 @@ public interface Record<T> {
     }
 
     /**
+     * Retrieves the partition index if any of the record.
+     *
+     * @return The partition index
+     */
+    default Optional<Integer> getPartitionIndex() {
+        return Optional.empty();
+    }
+
+    /**
      * Retrieves the sequence of the record from a source partition.
      *
      * @return Sequence Id associated with the record
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index dc1bfd1..9626c770341 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -64,6 +64,11 @@ public class SinkRecord<T> implements Record<T> {
     }
 
     @Override
+    public Optional<Integer> getPartitionIndex() {
+        return sourceRecord.getPartitionIndex();
+    }
+
+    @Override
     public Optional<Long> getRecordSequence() {
         return sourceRecord.getRecordSequence();
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 46a58a8..35ad82e 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -62,6 +62,11 @@ public class PulsarRecord<T> implements 
RecordWithEncryptionContext<T> {
     }
 
     @Override
+    public Optional<Integer> getPartitionIndex() {
+        return Optional.of(partition);
+    }
+
+    @Override
     public Optional<String> getPartitionId() {
         return Optional.of(String.format("%s-%s", topicName, partition));
     }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 8848d41..5e52c89 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -452,12 +453,18 @@ public class PulsarSinkTest {
                 }
 
                 @Override
+                public Optional<Integer> getPartitionIndex() {
+                    return Optional.of(1);
+                }
+
+                @Override
                 public Optional<Long> getRecordSequence() {
                     return Optional.of(1L);
                 }
             }, "out1");
 
 
+            assertEquals(1, record.getPartitionIndex().get().intValue());
             pulsarSink.write(record);
 
             Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof 
PulsarSink.PulsarSinkEffectivelyOnceProcessor);
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 5a6a1a6..eab0bca 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -183,6 +183,8 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
         Optional<String> partitionId;
         @Getter
         Optional<String> destinationTopic;
+        @Getter
+        Optional<Integer> partitionIndex;
 
         KafkaSchemaWrappedSchema keySchema;
 
@@ -190,6 +192,7 @@ public abstract class AbstractKafkaConnectSource<T> 
implements Source<T> {
 
         AbstractKafkaSourceRecord(SourceRecord srcRecord) {
             this.destinationTopic = Optional.of("persistent://"+topicNamespace 
+ "/" + srcRecord.topic());
+            this.partitionIndex = 
Optional.ofNullable(srcRecord.kafkaPartition());
         }
 
         @Override
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 47214ae..e8d3a11 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -114,6 +114,7 @@ public class KafkaConnectSource extends 
AbstractKafkaConnectSource<KeyValue<byte
                 .stream()
                 .map(e -> e.getKey() + "=" + e.getValue())
                 .collect(Collectors.joining(",")));
+            this.partitionIndex = 
Optional.ofNullable(srcRecord.kafkaPartition());
         }
 
         @Override
diff --git 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 5915dcd..98b2618 100644
--- 
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++ 
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.kafka.connect;
 
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 
@@ -116,6 +117,8 @@ public class KafkaConnectSourceTest extends 
ProducerConsumerBase  {
         String readBack2 = new String(record.getValue().getValue());
         assertTrue(line2.contains(readBack2));
         assertNull(record.getValue().getKey());
+        assertTrue(record.getPartitionId().isPresent());
+        assertFalse(record.getPartitionIndex().isPresent());
         log.info("read line2: {}", readBack2);
         record.ack();
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index f0b9362..4daafb1 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -179,6 +179,11 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         }
 
         @Override
+        public Optional<Integer> getPartitionIndex() {
+            return Optional.of(record.partition());
+        }
+
+        @Override
         public Optional<Long> getRecordSequence() {
             return Optional.of(record.offset());
         }

Reply via email to