sijie closed pull request #2347: Fix: function with multi-topic not acking on
effectively-once
URL: https://github.com/apache/incubator-pulsar/pull/2347
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532bda..ae7757d09e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public void testSillyUser() throws Exception {
}
try {
- producer = pulsarClient.newProducer().topic(topicName.toString())
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ producer =
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
consumer =
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
producer.send("message1".getBytes());
producer.send("message2".getBytes());
/* Message<byte[]> msg1 = */ consumer.receive();
Message<byte[]> msg2 = consumer.receive();
consumer.acknowledgeCumulative(msg2);
- Assert.fail("should fail since ack cumulative is not supported for
partitioned topic");
- } catch (PulsarClientException e) {
- Assert.assertTrue(e instanceof
PulsarClientException.NotSupportedException);
} finally {
producer.close();
consumer.unsubscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index abc4735ea1..5db9a0ac04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public void testPulsarSinkStats() throws Exception {
retryStrategically((test) -> {
try {
SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
- return subStats.unackedMessages == 0;
+ return subStats.unackedMessages == 0 &&
subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
- }, 5, 500);
+ }, 5, 200);
FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
@@ -399,11 +400,12 @@ protected static FunctionDetails createSinkConfig(String
jarFile, String tenant,
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
// set source spec
// source spec classname should be empty so that the default pulsar
source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName);
@@ -484,7 +486,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs()
throws Exception {
// set source spec
// source spec classname should be empty so that the default pulsar
source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic,
DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eedc09..4a0c449bf1 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -362,22 +362,27 @@ private void resumeReceivingFromPausedConsumersIfNeeded()
{
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String,Long>
properties) {
checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
}
if (ackType == AckType.Cumulative) {
- return FutureUtil.failedFuture(new
PulsarClientException.NotSupportedException(
- "Cumulative acknowledge not supported for topics
consumer"));
+ Consumer individualConsumer =
consumers.get(topicMessageId.getTopicName());
+ if (individualConsumer != null) {
+ MessageId innerId = topicMessageId.getInnerMessageId();
+ return individualConsumer.acknowledgeCumulativeAsync(innerId);
+ } else {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
+ }
} else {
- ConsumerImpl<T> consumer =
consumers.get(messageId1.getTopicName());
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicName());
- MessageId innerId = messageId1.getInnerMessageId();
+ MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
.thenRun(() ->
- unAckedMessageTracker.remove(messageId1));
+ unAckedMessageTracker.remove(topicMessageId));
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ec71450813..1b3c177a56 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -20,25 +20,17 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
@@ -49,6 +41,10 @@
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@Slf4j
@@ -142,7 +138,8 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
.message(message)
.topicName(topicName)
.ackFunction(() -> {
- if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ if (pulsarSourceConfig
+ .getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
} else {
inputConsumer.acknowledgeAsync(message);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services