This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8fbb57d8ac4944a3221d71d52dfcd8907d95f1d3 Author: wenbingshen <[email protected]> AuthorDate: Thu Mar 17 19:50:19 2022 +0800 Fix partitionsAutoUpdateFuture never complete (#14625) (cherry picked from commit b06dac68700dfcdf701dfbccb98126db7a7b7ef3) --- .../client/api/SimpleProducerConsumerTest.java | 49 ++++++++++++++++++++++ .../client/impl/PartitionedProducerImpl.java | 13 +++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 6302cbd..32b320cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -39,6 +39,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.Timeout; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -4200,4 +4201,52 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { assertEquals(resultSet.size(), total); }); } + + @Test + public void testPartitionsAutoUpdate() throws Exception { + log.info("-- Starting {} test --", methodName); + + int numPartitions = 3; + TopicName topicName = TopicName.get("persistent://my-property/my-ns/partitionsAutoUpdate-1"); + admin.topics().createPartitionedTopic(topicName.toString(), numPartitions); + + int operationTimeout = 2000; // MILLISECONDS + @Cleanup final PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .operationTimeout(operationTimeout, TimeUnit.MILLISECONDS) + .build(); + + ProducerBuilder<byte[]> producerBuilder = client.newProducer() + .topic(topicName.toString()).sendTimeout(1, TimeUnit.SECONDS); + + @Cleanup + PartitionedProducerImpl<byte[]> partitionedProducer = + (PartitionedProducerImpl<byte[]>) producerBuilder.autoUpdatePartitions(true).create(); + + // Trigger the Connection refused exception + stopBroker(); + + log.info("trigger partitionsAutoUpdateTimerTask run failed for producer"); + Timeout timeout = partitionedProducer.getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Awaitility.await().untilAsserted(() -> { + assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture()); + assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally()); + assertTrue(FutureUtil.getException(partitionedProducer.getPartitionsAutoUpdateFuture()).get().getMessage() + .contains("Connection refused:")); + }); + + startBroker(); + + log.info("trigger partitionsAutoUpdateTimerTask run successful for producer"); + timeout = partitionedProducer.getPartitionsAutoUpdateTimeout(); + timeout.task().run(timeout); + Awaitility.await().untilAsserted(() -> { + assertNotNull(partitionedProducer.getPartitionsAutoUpdateFuture()); + assertTrue(partitionedProducer.getPartitionsAutoUpdateFuture().isDone()); + assertFalse(partitionedProducer.getPartitionsAutoUpdateFuture().isCompletedExceptionally()); + }); + + log.info("-- Exiting {} test --", methodName); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 9ad0f14..c47eca3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -300,7 +300,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { if (log.isDebugEnabled()) { log.debug("[{}] partitions number. old: {}, new: {}", - topic, oldPartitionNumber, currentPartitionNumber); + topic, oldPartitionNumber, currentPartitionNumber); } if (oldPartitionNumber == currentPartitionNumber) { @@ -343,10 +343,14 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { return null; } else { log.error("[{}] not support shrink topic partitions. old: {}, new: {}", - topic, oldPartitionNumber, currentPartitionNumber); + topic, oldPartitionNumber, currentPartitionNumber); future.completeExceptionally(new NotSupportedException("not support shrink topic partitions")); } return future; + }).exceptionally(throwable -> { + log.error("[{}] Auto getting partitions failed", topic, throwable); + future.completeExceptionally(throwable); + return null; }); return future; @@ -380,6 +384,11 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> { }; @VisibleForTesting + public CompletableFuture<Void> getPartitionsAutoUpdateFuture() { + return partitionsAutoUpdateFuture; + } + + @VisibleForTesting public Timeout getPartitionsAutoUpdateTimeout() { return partitionsAutoUpdateTimeout; }
