This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0992634 add getPartitionIndex() to the Record<> (#9947)
0992634 is described below
commit 0992634d9f1dd77543bfa89540b330f162de8215
Author: Andrey Yegorov <[email protected]>
AuthorDate: Thu Mar 25 21:28:46 2021 -0700
add getPartitionIndex() to the Record<> (#9947)
### Motivation
Looking at https://github.com/apache/pulsar/pull/9927#discussion_r595714308
there is no way to reliably get partition number.
There is Optional<String> getPartitionId() which returns:
- "{topic}-{partition}" in some Record implementation
- String(partition) in other implementation (parsing back integer is a
waste)
- empty Optional (ok) in others
### Modifications
Added `default Optional<Integer> getPartitionIndex()` to the `Record`
interface.
Return partition number where appropriate.
---
.../src/main/java/org/apache/pulsar/functions/api/Record.java | 9 +++++++++
.../java/org/apache/pulsar/functions/instance/SinkRecord.java | 5 +++++
.../java/org/apache/pulsar/functions/source/PulsarRecord.java | 5 +++++
.../java/org/apache/pulsar/functions/sink/PulsarSinkTest.java | 7 +++++++
.../pulsar/io/kafka/connect/AbstractKafkaConnectSource.java | 3 +++
.../org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java | 1 +
.../apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java | 3 +++
.../java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java | 5 +++++
8 files changed, 38 insertions(+)
diff --git
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index bfaf6dc..79aab6c 100644
---
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -78,6 +78,15 @@ public interface Record<T> {
}
/**
+ * Retrieves the partition index if any of the record.
+ *
+ * @return The partition index
+ */
+ default Optional<Integer> getPartitionIndex() {
+ return Optional.empty();
+ }
+
+ /**
* Retrieves the sequence of the record from a source partition.
*
* @return Sequence Id associated with the record
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index dc1bfd1..9626c770341 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -64,6 +64,11 @@ public class SinkRecord<T> implements Record<T> {
}
@Override
+ public Optional<Integer> getPartitionIndex() {
+ return sourceRecord.getPartitionIndex();
+ }
+
+ @Override
public Optional<Long> getRecordSequence() {
return sourceRecord.getRecordSequence();
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 46a58a8..35ad82e 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -62,6 +62,11 @@ public class PulsarRecord<T> implements
RecordWithEncryptionContext<T> {
}
@Override
+ public Optional<Integer> getPartitionIndex() {
+ return Optional.of(partition);
+ }
+
+ @Override
public Optional<String> getPartitionId() {
return Optional.of(String.format("%s-%s", topicName, partition));
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 8848d41..5e52c89 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -452,12 +453,18 @@ public class PulsarSinkTest {
}
@Override
+ public Optional<Integer> getPartitionIndex() {
+ return Optional.of(1);
+ }
+
+ @Override
public Optional<Long> getRecordSequence() {
return Optional.of(1L);
}
}, "out1");
+ assertEquals(1, record.getPartitionIndex().get().intValue());
pulsarSink.write(record);
Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof
PulsarSink.PulsarSinkEffectivelyOnceProcessor);
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
index 5a6a1a6..eab0bca 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java
@@ -183,6 +183,8 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
Optional<String> partitionId;
@Getter
Optional<String> destinationTopic;
+ @Getter
+ Optional<Integer> partitionIndex;
KafkaSchemaWrappedSchema keySchema;
@@ -190,6 +192,7 @@ public abstract class AbstractKafkaConnectSource<T>
implements Source<T> {
AbstractKafkaSourceRecord(SourceRecord srcRecord) {
this.destinationTopic = Optional.of("persistent://"+topicNamespace
+ "/" + srcRecord.topic());
+ this.partitionIndex =
Optional.ofNullable(srcRecord.kafkaPartition());
}
@Override
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index 47214ae..e8d3a11 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -114,6 +114,7 @@ public class KafkaConnectSource extends
AbstractKafkaConnectSource<KeyValue<byte
.stream()
.map(e -> e.getKey() + "=" + e.getValue())
.collect(Collectors.joining(",")));
+ this.partitionIndex =
Optional.ofNullable(srcRecord.kafkaPartition());
}
@Override
diff --git
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
index 5915dcd..98b2618 100644
---
a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
+++
b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.kafka.connect;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -116,6 +117,8 @@ public class KafkaConnectSourceTest extends
ProducerConsumerBase {
String readBack2 = new String(record.getValue().getValue());
assertTrue(line2.contains(readBack2));
assertNull(record.getValue().getKey());
+ assertTrue(record.getPartitionId().isPresent());
+ assertFalse(record.getPartitionIndex().isPresent());
log.info("read line2: {}", readBack2);
record.ack();
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index f0b9362..4daafb1 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -179,6 +179,11 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
}
@Override
+ public Optional<Integer> getPartitionIndex() {
+ return Optional.of(record.partition());
+ }
+
+ @Override
public Optional<Long> getRecordSequence() {
return Optional.of(record.offset());
}