This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 068db598adb529d1ae441084c280f0e776d2cbbc
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Mon Aug 13 14:19:46 2018 -0700

    Fix: function with multi-topic not acking on effectively-once (#2347)
    
     ### Motivation
    
    `MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and 
therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not 
acking message and failing `EFFECTIVELY_ONCE` behavior.
    
     ### Modifications
    
    Function should ack message for a specific topic consumer if 
`inputTopicConsumer` is multi-topic consumer.
    
     ### Result
    
    Function should able to ack messages for multi-topic consumer when 
processing-guarantee is `EFFECTIVELY_ONCE`
---
 .../api/PartitionedProducerConsumerTest.java       |  9 ++------
 .../org/apache/pulsar/io/PulsarSinkE2ETest.java    | 10 +++++----
 .../client/impl/MultiTopicsConsumerImpl.java       | 17 +++++++++------
 .../pulsar/functions/source/PulsarSource.java      | 24 ++++++++++++----------
 4 files changed, 32 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532..ae7757d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         }
 
         try {
-            producer = pulsarClient.newProducer().topic(topicName.toString())
-                .enableBatching(false)
-                .messageRoutingMode(MessageRoutingMode.SinglePartition)
-                .create();
+            producer = 
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+                    
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
             consumer = 
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
             producer.send("message1".getBytes());
             producer.send("message2".getBytes());
             /* Message<byte[]> msg1 = */ consumer.receive();
             Message<byte[]> msg2 = consumer.receive();
             consumer.acknowledgeCumulative(msg2);
-            Assert.fail("should fail since ack cumulative is not supported for 
partitioned topic");
-        } catch (PulsarClientException e) {
-            Assert.assertTrue(e instanceof 
PulsarClientException.NotSupportedException);
         } finally {
             producer.close();
             consumer.unsubscribe();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 5398bc9..2001981 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public class PulsarSinkE2ETest {
         retryStrategically((test) -> {
             try {
                 SubscriptionStats subStats = 
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
-                return subStats.unackedMessages == 0;
+                return subStats.unackedMessages == 0 && 
subStats.msgThroughputOut == totalMsgs;
             } catch (PulsarAdminException e) {
                 return false;
             }
-        }, 5, 500);
+        }, 5, 200);
 
         FunctionRuntimeManager functionRuntimeManager = 
functionsWorkerService.getFunctionRuntimeManager();
         functionRuntimeManager.updateRates();
@@ -400,11 +401,12 @@ public class PulsarSinkE2ETest {
         functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
         functionDetailsBuilder.setParallelism(1);
         functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+        
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
 
         // set source spec
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
         sourceSpecBuilder.setTypeClassName(typeArg.getName());
         sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
         sourceSpecBuilder.setSubscriptionName(subscriptionName);
@@ -485,7 +487,7 @@ public class PulsarSinkE2ETest {
         // set source spec
         // source spec classname should be empty so that the default pulsar 
source will be used
         SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+        
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
         sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic, 
DefaultSerDe.class.getName());
         functionDetailsBuilder.setAutoAck(true);
         functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eed..4a0c449 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -362,22 +362,27 @@ public class MultiTopicsConsumerImpl<T> extends 
ConsumerBase<T> {
     protected CompletableFuture<Void> doAcknowledge(MessageId messageId, 
AckType ackType,
                                                     Map<String,Long> 
properties) {
         checkArgument(messageId instanceof TopicMessageIdImpl);
-        TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
 
         if (getState() != State.Ready) {
             return FutureUtil.failedFuture(new PulsarClientException("Consumer 
already closed"));
         }
 
         if (ackType == AckType.Cumulative) {
-            return FutureUtil.failedFuture(new 
PulsarClientException.NotSupportedException(
-                    "Cumulative acknowledge not supported for topics 
consumer"));
+            Consumer individualConsumer = 
consumers.get(topicMessageId.getTopicName());
+            if (individualConsumer != null) {
+                MessageId innerId = topicMessageId.getInnerMessageId();
+                return individualConsumer.acknowledgeCumulativeAsync(innerId);
+            } else {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
+            }
         } else {
-            ConsumerImpl<T> consumer = 
consumers.get(messageId1.getTopicName());
+            ConsumerImpl<T> consumer = 
consumers.get(topicMessageId.getTopicName());
 
-            MessageId innerId = messageId1.getInnerMessageId();
+            MessageId innerId = topicMessageId.getInnerMessageId();
             return consumer.doAcknowledge(innerId, ackType, properties)
                 .thenRun(() ->
-                    unAckedMessageTracker.remove(messageId1));
+                    unAckedMessageTracker.remove(topicMessageId));
         }
     }
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index f70100d..dd2f05d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.functions.source;
 
-import com.google.common.annotations.VisibleForTesting;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
-
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -36,14 +37,14 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class PulsarSource<T> implements Source<T> {
@@ -135,7 +136,8 @@ public class PulsarSource<T> implements Source<T> {
                 .message(message)
                 .topicName(topicName)
                 .ackFunction(() -> {
-                    if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig
+                            .getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
                     } else {
                         inputConsumer.acknowledgeAsync(message);

Reply via email to