This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9b5a4b22ca61dd4c2418c93145c1e4f0eed86255 Author: Yunze Xu <[email protected]> AuthorDate: Mon Jan 4 11:25:14 2021 -0600 Fix NPE when MultiTopicsConsumerImpl receives null value messages (#9113) ### Motivation [#6379](https://github.com/apache/pulsar/pull/6379) introduced the feature to handle null value messages, but it only checks the null value in `ConsumerImpl` when `INCOMING_MESSAGES_SIZE_UPDATER` is updated. Therefore, if a partitioned topic with at least 2 partitions was consumed with a null value message, the NPE would be thrown. ### Modifications - Check the null value message in `MultiTopicsConsumerImpl` as well as `ConsumerImpl`. To reduce repeated code, two protected methods are added to `ConsumerBase` and `INCOMING_MESSAGES_SIZE_UPDATER` becomes private now, the derived consumer classes just use these two methods to update or reset `INCOMING_MESSAGES_SIZE_UPDATER`. - Add tests for partitioned topics in `NullValueTest`. Since the existed tests rely on the message send order, here we only send messages to a single partition only. (cherry picked from commit dd3b9d8115ccaa409957a089ab04ea16b731e5a2) --- .../pulsar/broker/service/NullValueTest.java | 52 +++++++++++++++++----- .../apache/pulsar/client/impl/ConsumerBase.java | 11 ++++- .../apache/pulsar/client/impl/ConsumerImpl.java | 14 +++--- .../client/impl/MultiTopicsConsumerImpl.java | 16 +++---- 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java index 553fe01..6473b2b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java @@ -21,17 +21,22 @@ package org.apache.pulsar.broker.service; import java.util.concurrent.CompletableFuture; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TopicMetadata; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -52,13 +57,23 @@ public class NullValueTest extends BrokerTestBase { super.internalCleanup(); } - @Test - public void nullValueBytesSchemaTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/null-value-bytes-test"; + @DataProvider(name = "topics") + public static Object[][] topics() { + return new Object[][]{ + {"persistent://prop/ns-abc/null-value-test-0", 1}, + {"persistent://prop/ns-abc/null-value-test-1", 3}, + }; + } + + @Test(dataProvider = "topics") + public void nullValueBytesSchemaTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -120,13 +135,15 @@ public class NullValueTest extends BrokerTestBase { } - @Test - public void nullValueBooleanSchemaTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/null-value-bool-test"; + @Test(dataProvider = "topics") + public void nullValueBooleanSchemaTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer<Boolean> producer = pulsarClient.newProducer(Schema.BOOL) .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -148,14 +165,16 @@ public class NullValueTest extends BrokerTestBase { } - @Test - public void keyValueNullInlineTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/kv-null-value-test"; + @Test(dataProvider = "topics") + public void keyValueNullInlineTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer<KeyValue<String, String>> producer = pulsarClient .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING)) .topic(topic) + .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); @Cleanup @@ -193,14 +212,23 @@ public class NullValueTest extends BrokerTestBase { } - @Test - public void keyValueNullSeparatedTest() throws PulsarClientException { - String topic = "persistent://prop/ns-abc/kv-null-value-test"; + @Test(dataProvider = "topics") + public void keyValueNullSeparatedTest(String topic, int partitions) + throws PulsarClientException, PulsarAdminException { + admin.topics().createPartitionedTopic(topic, partitions); @Cleanup Producer<KeyValue<String, String>> producer = pulsarClient .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED)) .topic(topic) + // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is + // SEPARATED so we need to define a message router to guarantee the message order. + .messageRouter(new MessageRouter() { + @Override + public int choosePartition(Message<?> msg, TopicMetadata metadata) { + return 0; + } + }) .create(); @Cleanup diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 1db6aee..c184f9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -78,7 +78,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T protected final ConsumerInterceptors<T> interceptors; protected final BatchReceivePolicy batchReceivePolicy; protected ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives; - protected static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater + private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater .newUpdater(ConsumerBase.class, "incomingMessagesSize"); protected volatile long incomingMessagesSize = 0; protected volatile Timeout batchReceiveTimeout = null; @@ -851,6 +851,15 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T return pendingBatchReceives != null && peekNextBatchReceive() != null; } + protected void resetIncomingMessageSize() { + INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + } + + protected void updateIncomingMessageSize(final Message<?> message) { + INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, + (message.getData() != null) ? message.getData().length : 0); + } + protected abstract void completeOpBatchReceive(OpBatchReceive<T> op); private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2062671..b5c3c01 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -941,7 +941,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle private BatchMessageIdImpl clearReceiverQueue() { List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); if (duringSeek.compareAndSet(true, false)) { return seekMessageId; @@ -1528,7 +1528,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle stats.updateNumMsgsReceived(msg); trackMessage(msg); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, msg.getData() == null ? 0 : -msg.getData().length); + updateIncomingMessageSize(msg); } protected void trackMessage(Message<?> msg) { @@ -1738,7 +1738,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle synchronized (this) { currentSize = incomingMessages.size(); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); } cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(consumerId), cnx.ctx().voidPromise()); @@ -1762,7 +1762,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle public int clearIncomingMessagesAndGetMessageNumber() { int messagesNumber = incomingMessages.size(); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); return messagesNumber; } @@ -1916,7 +1916,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); seekFuture.complete(null); }).exceptionally(e -> { log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); @@ -1977,7 +1977,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle lastDequeuedMessageId = MessageId.earliest; incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); seekFuture.complete(null); }).exceptionally(e -> { log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage()); @@ -2222,7 +2222,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle // try not to remove elements that are added while we remove Message<T> message = incomingMessages.poll(); while (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); messagesFromQueue++; MessageIdImpl id = getMessageIdImpl(message); if (!messageIds.contains(id)) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 49e31f4..f59c2c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -311,7 +311,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { @Override protected synchronized void messageProcessed(Message<?> msg) { unAckedMessageTracker.add(msg.getMessageId()); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length); + updateIncomingMessageSize(msg); } private void resumeReceivingFromPausedConsumersIfNeeded() { @@ -334,7 +334,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { Message<T> message; try { message = incomingMessages.take(); - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -350,7 +350,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { try { message = incomingMessages.poll(timeout, unit); if (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); } @@ -391,7 +391,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message<T> msg = incomingMessages.poll(); if (msg != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -msg.getData().length); + updateIncomingMessageSize(msg); Message<T> interceptMsg = beforeConsume(msg); messages.add(interceptMsg); } @@ -419,7 +419,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); checkState(message instanceof TopicMessageImpl); unAckedMessageTracker.add(message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); @@ -625,7 +625,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { consumer.unAckedChunckedMessageIdSequenceMap.clear(); }); incomingMessages.clear(); - INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); unAckedMessageTracker.clear(); } finally { lock.writeLock().unlock(); @@ -694,7 +694,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { unAckedMessageTracker.clear(); incomingMessages.clear(); - MultiTopicsConsumerImpl.INCOMING_MESSAGES_SIZE_UPDATER.set(this, 0); + resetIncomingMessageSize(); FutureUtil.waitForAll(futures).whenComplete((result, exception) -> { if (exception != null) { @@ -784,7 +784,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> { Message<T> message = incomingMessages.poll(); checkState(message instanceof TopicMessageImpl); while (message != null) { - INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.getData().length); + updateIncomingMessageSize(message); MessageId messageId = message.getMessageId(); if (!messageIds.contains(messageId)) { messageIds.add(messageId);
