rdhabalia closed pull request #2057: Forward user-properties to sink
URL: https://github.com/apache/incubator-pulsar/pull/2057
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 7b08f6b05b..5d70525a75 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -32,10 +32,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
@@ -49,9 +45,12 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -80,7 +79,6 @@
import com.google.common.collect.Sets;
import com.google.gson.Gson;
-import io.netty.util.concurrent.DefaultThreadFactory;
import jersey.repackaged.com.google.common.collect.Lists;
/**
@@ -240,22 +238,26 @@ private WorkerService
createPulsarFunctionWorker(ServiceConfiguration config) {
@Test(timeOut = 20000)
public void testE2EPulsarSink() throws Exception {
- final String namespacePortion = "myReplNs";
+ final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace +
"/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace,
clusters);
// create a producer that creates a topic at broker
- ProducerBuilder<byte[]> producerBuilder =
pulsarClient.newProducer().topic(sourceTopic);
- Producer<byte[]> producer = producerBuilder.create();
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(sourceTopic).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(sinkTopic).subscriptionName("sub").subscribe();
String jarFilePathUrl = Utils.FILE + ":"
+
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, "PulsarSink-test");
+ FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl,
tenant, namespacePortion, "PulsarSink-test",
+ sinkTopic);
admin.functions().createFunctionWithUrl(functionDetails,
jarFilePathUrl);
-
+
// try to update function to test: update-function functionality
admin.functions().updateFunctionWithUrl(functionDetails,
jarFilePathUrl);
@@ -271,8 +273,8 @@ public void testE2EPulsarSink() throws Exception {
int totalMsgs = 5;
for (int i = 0; i < totalMsgs; i++) {
- String message = "my-message-" + i;
- producer.send(message.getBytes());
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey,
propertyValue).value(data.getBytes()).send();
}
retryStrategically((test) -> {
try {
@@ -283,14 +285,20 @@ public void testE2EPulsarSink() throws Exception {
return false;
}
}, 5, 150);
+
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ Assert.assertEquals(propertyValue, receivedPropertyValue);
+
// validate pulsar-sink consumer has consumed all messages and
delivered to Pulsar sink but unacked messages
// due to publish failure
Assert.assertNotEquals(
-
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);
+
admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
}
- protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String sinkName) {
+ protected FunctionDetails createSinkConfig(String jarFile, String tenant,
String namespace, String sinkName, String sinkTopic) {
File file = new File(jarFile);
try {
@@ -322,7 +330,7 @@ protected FunctionDetails createSinkConfig(String jarFile,
String tenant, String
// set up sink spec
SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
// sinkSpecBuilder.setClassName(PulsarSink.class.getName());
- sinkSpecBuilder.setTopic(String.format("persistent://%s/%s/%s",
tenant, namespace, "output"));
+ sinkSpecBuilder.setTopic(sinkTopic);
Map<String, Object> sinkConfigMap = Maps.newHashMap();
sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfigMap));
sinkSpecBuilder.setTypeClassName(typeArg.getName());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index d538c8be7f..bd1433cae8 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -184,8 +184,8 @@ public void run() {
if (currentRecord instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
- messageId = pulsarRecord.getMessageId();
- topicName = pulsarRecord.getTopicName();
+ messageId = pulsarRecord.getMessageId();
+ topicName = pulsarRecord.getTopicName();
}
result = javaInstance.handleMessage(messageId, topicName,
currentRecord.getValue());
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 590a586ad0..a5610960d7 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -221,10 +221,13 @@ public void write(RecordContext recordContext, T value)
throws Exception {
msgBuilder.setContent(output);
if (recordContext instanceof PulsarRecord) {
PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
- msgBuilder
- .setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName())
- .setProperty("__pfn_input_msg_id__", new String(
-
Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+ // forward user properties to sink-topic
+ if (pulsarRecord.getProperties() != null) {
+ msgBuilder.setProperties(pulsarRecord.getProperties());
+ }
+ msgBuilder.setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName()).setProperty(
+ "__pfn_input_msg_id__",
+ new
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
}
this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 3684e865db..78a7c86ddf 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -23,6 +23,9 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
+
+import java.util.Map;
+
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.io.core.Record;
@@ -38,6 +41,7 @@
private T value;
private MessageId messageId;
private String topicName;
+ private Map<String, String> properties;
private Runnable failFunction;
private Runnable ackFunction;
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ae5d0d6421..2190be1d00 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -132,6 +132,7 @@ public void open(Map<String, Object> config) throws
Exception {
.partitionId(String.format("%s-%s", topicName, partitionId))
.recordSequence(Utils.getSequenceId(message.getMessageId()))
.topicName(topicName)
+ .properties(message.getProperties())
.ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services