This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0d2154e Fix: function with multi-topic not acking on effectively-once
(#2347)
0d2154e is described below
commit 0d2154ed3acd67bcb651644ae13e43cdd4045e90
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Mon Aug 13 14:19:46 2018 -0700
Fix: function with multi-topic not acking on effectively-once (#2347)
### Motivation
`MultiTopicsConsumerImpl` doesn't support `acknowledgeCumulativeAsync` and
therefore, function with multi-topic and `EFFECTIVELY_ONCE` processing is not
acking message and failing `EFFECTIVELY_ONCE` behavior.
### Modifications
Function should ack message for a specific topic consumer if
`inputTopicConsumer` is multi-topic consumer.
### Result
Function should able to ack messages for multi-topic consumer when
processing-guarantee is `EFFECTIVELY_ONCE`
---
.../client/api/PartitionedProducerConsumerTest.java | 9 ++-------
.../java/org/apache/pulsar/io/PulsarSinkE2ETest.java | 10 ++++++----
.../pulsar/client/impl/MultiTopicsConsumerImpl.java | 17 +++++++++++------
.../apache/pulsar/functions/source/PulsarSource.java | 15 ++++++---------
4 files changed, 25 insertions(+), 26 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index a599532..ae7757d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -290,19 +290,14 @@ public class PartitionedProducerConsumerTest extends
ProducerConsumerBase {
}
try {
- producer = pulsarClient.newProducer().topic(topicName.toString())
- .enableBatching(false)
- .messageRoutingMode(MessageRoutingMode.SinglePartition)
- .create();
+ producer =
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+
.messageRoutingMode(MessageRoutingMode.SinglePartition).create();
consumer =
pulsarClient.newConsumer().topic(topicName.toString()).subscriptionName("my-sub").subscribe();
producer.send("message1".getBytes());
producer.send("message2".getBytes());
/* Message<byte[]> msg1 = */ consumer.receive();
Message<byte[]> msg2 = consumer.receive();
consumer.acknowledgeCumulative(msg2);
- Assert.fail("should fail since ack cumulative is not supported for
partitioned topic");
- } catch (PulsarClientException e) {
- Assert.assertTrue(e instanceof
PulsarClientException.NotSupportedException);
} finally {
producer.close();
consumer.unsubscribe();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index abc4735..5db9a0a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -62,6 +62,7 @@ import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
@@ -356,11 +357,11 @@ public class PulsarSinkE2ETest {
retryStrategically((test) -> {
try {
SubscriptionStats subStats =
admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
- return subStats.unackedMessages == 0;
+ return subStats.unackedMessages == 0 &&
subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
- }, 5, 500);
+ }, 5, 200);
FunctionRuntimeManager functionRuntimeManager =
functionsWorkerService.getFunctionRuntimeManager();
functionRuntimeManager.updateRates();
@@ -399,11 +400,12 @@ public class PulsarSinkE2ETest {
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
functionDetailsBuilder.setParallelism(1);
functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+
functionDetailsBuilder.setProcessingGuarantees(ProcessingGuarantees.EFFECTIVELY_ONCE);
// set source spec
// source spec classname should be empty so that the default pulsar
source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.setTypeClassName(typeArg.getName());
sourceSpecBuilder.setTopicsPattern(sourceTopicPattern);
sourceSpecBuilder.setSubscriptionName(subscriptionName);
@@ -484,7 +486,7 @@ public class PulsarSinkE2ETest {
// set source spec
// source spec classname should be empty so that the default pulsar
source will be used
SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
+
sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.FAILOVER);
sourceSpecBuilder.putTopicsToSerDeClassName(sinkTopic,
DefaultSerDe.class.getName());
functionDetailsBuilder.setAutoAck(true);
functionDetailsBuilder.setSource(sourceSpecBuilder);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index fc91eed..4a0c449 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -362,22 +362,27 @@ public class MultiTopicsConsumerImpl<T> extends
ConsumerBase<T> {
protected CompletableFuture<Void> doAcknowledge(MessageId messageId,
AckType ackType,
Map<String,Long>
properties) {
checkArgument(messageId instanceof TopicMessageIdImpl);
- TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer
already closed"));
}
if (ackType == AckType.Cumulative) {
- return FutureUtil.failedFuture(new
PulsarClientException.NotSupportedException(
- "Cumulative acknowledge not supported for topics
consumer"));
+ Consumer individualConsumer =
consumers.get(topicMessageId.getTopicName());
+ if (individualConsumer != null) {
+ MessageId innerId = topicMessageId.getInnerMessageId();
+ return individualConsumer.acknowledgeCumulativeAsync(innerId);
+ } else {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
+ }
} else {
- ConsumerImpl<T> consumer =
consumers.get(messageId1.getTopicName());
+ ConsumerImpl<T> consumer =
consumers.get(topicMessageId.getTopicName());
- MessageId innerId = messageId1.getInnerMessageId();
+ MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledge(innerId, ackType, properties)
.thenRun(() ->
- unAckedMessageTracker.remove(messageId1));
+ unAckedMessageTracker.remove(topicMessageId));
}
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ec71450..1b3c177 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -20,25 +20,17 @@ package org.apache.pulsar.functions.source;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.api.SerDe;
@@ -49,6 +41,10 @@ import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
+import com.google.common.annotations.VisibleForTesting;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@Slf4j
@@ -142,7 +138,8 @@ public class PulsarSource<T> implements Source<T> {
.message(message)
.topicName(topicName)
.ackFunction(() -> {
- if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ if (pulsarSourceConfig
+ .getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
} else {
inputConsumer.acknowledgeAsync(message);