sijie closed pull request #2347: Fix: function with multi-topic not acking on 
effectively-once
URL: https://github.com/apache/incubator-pulsar/pull/2347
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532bda..ae7757d09e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public void testSillyUser() throws Exception {
         }
 
         try {
-            producer = pulsarClient.newProducer().topic(topicName.toString())
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
+            producer = 
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+                    
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
             consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
             producer.send("message1".getBytes());
             producer.send("message2".getBytes());
             /* Message<byte[]> msg1 = */ consumer.receive();
             Message<byte[]> msg2 = consumer.receive();
             consumer.acknowledgeCumulative(msg2);
-            Assert.fail("should fail since ack cumulative is not supported for 
partitioned topic");
-        } catch (PulsarClientException e) {
-            Assert.assertTrue(e instanceof 
PulsarClientException.NotSupportedException);
         } finally {
             producer.close();
             consumer.unsubscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index abc4735ea1..5db9a0ac04 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public void testPulsarSinkStats() throws Exception {
         retryStrategically((test) -> {
             try {
                 SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
-                return subStats.unackedMessages == 0;
+                return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 5, 500);
+        }, 5, 200);
 
         FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
         functionRuntimeManager.updateRates();
@@ -399,11 +400,12 @@ protected static FunctionDetails createSinkConfig(String 
jarFile, String tenant,
         functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
         functionDetailsBuilder.setParallelism(1);
         functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
 
         // set source spec
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
         sourceSpecBuilder.setTypeClassName(typeArg.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
         sourceSpecBuilder.setSubscriptionName(subscriptionName);
@@ -484,7 +486,7 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() 
throws Exception {
         // set source spec
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
         sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, 
DefaultSerDe.class.getName());
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eedc09..4a0c449bf1 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -362,22 +362,27 @@ private void resumeReceivingFromPausedConsumersIfNeeded() 
{
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String,Long> 
properties) {
         checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
 
         if (getState() != State.Ready) {
             return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
         }
 
         if (ackType == AckType.Cumulative) {
-            return FutureUtil.failedFuture(new 
PulsarClientException.NotSupportedException(
-                    "Cumulative acknowledge not supported for topics 
consumer"));
+            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+            if (individualConsumer != null) {
+                MessageId innerId = topicMessageId.getInnerMessageId();
+                return individualConsumer.acknowledgeCumulativeAsync(innerId);
+            } else {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
+            }
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(messageId1.getTopicName());
+            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicName());
 
-            MessageId innerId = messageId1.getInnerMessageId();
+            MessageId innerId = topicMessageId.getInnerMessageId();
             return consumer.doAcknowledge(innerId, ackType, properties)
                 .thenRun(() ->
-                    unAckedMessageTracker.remove(messageId1));
+                    unAckedMessageTracker.remove(topicMessageId));
         }
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ec71450813..1b3c177a56 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -20,25 +20,17 @@
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
@@ -49,6 +41,10 @@
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 
 @Slf4j
@@ -142,7 +138,8 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
                 .message(message)
                 .topicName(topicName)
                 .ackFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig
+                            .getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
                     } else {
                         inputConsumer.acknowledgeAsync(message);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to