This is an automated email from the ASF dual-hosted git repository.

massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f8945a  [PIP 79][client] Add lazy-loading feature to 
PartitionedProducer (#10279)
1f8945a is described below

commit 1f8945addb18bf1f94366d2c7f5286ee6f62c675
Author: Yuri Mizushima <[email protected]>
AuthorDate: Fri Oct 15 14:12:21 2021 +0900

    [PIP 79][client] Add lazy-loading feature to PartitionedProducer (#10279)
    
    * feat: add lazy loading feature to PartitionedProducerImpl
    
    * feat: add partial round robin routing mode
    
    * test: add tests for lazy-loading
    
    * fix: fix producer closing code at lazy-loading
    
    * test: remove unnecessary handling, fail from test codes
    
    * feat: add enableLazyStartPartitionedProducers config
    
    * test: fix test for lazy-loading config
    
    * fix: address comments
    
    * fix: add partition-change interceptor
    
    * fix: address comments
---
 .../apache/pulsar/client/api/ClientErrorsTest.java |  97 +++++++-
 .../impl/PartialPartitionedProducerTest.java       | 255 +++++++++++++++++++++
 .../apache/pulsar/client/api/ProducerBuilder.java  |  19 ++
 .../client/impl/PartitionedProducerImpl.java       | 182 +++++++++------
 .../pulsar/client/impl/ProducerBuilderImpl.java    |   6 +
 .../impl/conf/ProducerConfigurationData.java       |   2 +
 .../PartialRoundRobinMessageRouterImpl.java        |  85 +++++++
 .../client/impl/PartitionedProducerImplTest.java   |  49 ++++
 8 files changed, 628 insertions(+), 67 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index a38b711..f66b198 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import lombok.Cleanup;
 import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
 import org.apache.pulsar.client.impl.ProducerBase;
 import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.ServerError;
@@ -462,8 +463,100 @@ public class ClientErrorsTest {
         mockBrokerService.resetHandleSubscribe();
     }
 
-    // if a producer fails to connect while creating partitioned producer, it 
should close all successful connections of
-    // other producers and fail
+    // failed to connect to partition at initialization step if a producer 
which connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnInitialization() throws Throwable 
{
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 1) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
+        });
+
+        try {
+            client.newProducer()
+                    .enableLazyStartPartitionedProducers(true)
+                    .accessMode(ProducerAccessMode.Shared)
+                    .topic("persistent://prop/use/ns/multi-part-t1").create();
+            fail("Should have failed with an authorization error");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.AuthorizationException);
+        }
+        assertEquals(producerCounter.get(), 1);
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // failed to connect to partition at sending step if a producer which 
connects to broker as lazy-loading mode
+    @Test
+    public void testPartitionedProducerFailOnSending() throws Throwable {
+        @Cleanup
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+        final AtomicInteger producerCounter = new AtomicInteger(0);
+        final AtomicInteger closeCounter = new AtomicInteger(0);
+        final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+        mockBrokerService.setHandleProducer((ctx, producer) -> {
+            if (producerCounter.incrementAndGet() == 2) {
+                ctx.writeAndFlush(Commands.newError(producer.getRequestId(), 
ServerError.AuthorizationError, "msg"));
+                return;
+            }
+            
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), 
"default-producer", SchemaVersion.Empty));
+        });
+
+        mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+            ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(), 
send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+        );
+
+        mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+            
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+            closeCounter.incrementAndGet();
+        });
+
+        final PartitionedProducerImpl<byte[]> producer = 
(PartitionedProducerImpl<byte[]>) client.newProducer()
+                .enableLazyStartPartitionedProducers(true)
+                .accessMode(ProducerAccessMode.Shared)
+                .topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        try {
+            producer.send("msg".getBytes());
+            fail("Should have failed with an not connected exception");
+        } catch (Exception e) {
+            assertTrue(e instanceof 
PulsarClientException.NotConnectedException);
+            assertEquals(producer.getProducers().size(), 1);
+        }
+
+        try {
+            // recreate failed producer
+            for (int i = 0; i < 
client.getPartitionsForTopic(topicName).get().size(); i++) {
+                producer.send("msg".getBytes());
+            }
+            assertEquals(producer.getProducers().size(), 
client.getPartitionsForTopic(topicName).get().size());
+            assertEquals(producerCounter.get(), 5);
+        } catch (Exception e) {
+            fail();
+        }
+
+        // should not call close
+        assertEquals(closeCounter.get(), 0);
+
+        mockBrokerService.resetHandleProducer();
+        mockBrokerService.resetHandleCloseProducer();
+        client.close();
+    }
+
+    // if a producer which doesn't connect as lazy-loading mode fails to 
connect while creating partitioned producer,
+    // it should close all successful connections of other producers and fail
     @Test
     public void 
testOneProducerFailShouldCloseAllProducersInPartitionedProducer() throws 
Exception {
         @Cleanup
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
new file mode 100644
index 0000000..7fd992a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.TopicMetadata;
+import 
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PartialPartitionedProducerTest extends ProducerConsumerBase {
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testPtWithSinglePartition() throws Throwable {
+        final String topic = 
BrokerTestUtil.newUniqueName("pt-with-single-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImpl = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 1);
+    }
+
+    @Test
+    public void testPtWithPartialPartition() throws Throwable {
+        final String topic = 
BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImpl = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .create();
+
+        for (int i = 0; i < 10; i++) {
+            producerImpl.newMessage().value("msg".getBytes()).send();
+        }
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    // AddPartitionTest
+    @Test
+    public void testPtLazyLoading() throws Throwable {
+        final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImpl = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        final Supplier<Boolean> send = () -> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 10);
+    }
+
+    @Test
+    public void testPtLoadingNotSharedMode() throws Throwable {
+        final String topic = 
BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+        admin.topics().createPartitionedTopic(topic, 10);
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        // create first producer at initialization step
+        assertEquals(producerImplExclusive.getProducers().size(), 10);
+
+        producerImplExclusive.close();
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 10);
+    }
+
+    // AddPartitionAndLimitTest
+    @Test
+    public void testPtUpdateWithPartialPartition() throws Throwable {
+        final String topic = 
BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = 
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImpl = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+                .accessMode(ProducerAccessMode.Shared)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        final Supplier<Boolean> send = ()-> {
+            for (int i = 0; i < 10; i++) {
+                try {
+                    producerImpl.newMessage().value("msg".getBytes()).send();
+                } catch (Throwable e) {
+                    return false;
+                }
+            }
+            return true;
+        };
+
+        // create first producer at initialization step
+        assertEquals(producerImpl.getProducers().size(), 1);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) 
field.get(producerImpl)).numPartitions(), 3));
+        assertEquals(producerImpl.getProducers().size(), 2);
+
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) 
field.get(producerImpl)).numPartitions(), 4));
+        assertTrue(send.get());
+        assertEquals(producerImpl.getProducers().size(), 3);
+    }
+
+    @Test
+    public void testPtUpdateNotSharedMode() throws Throwable {
+        final String topic = 
BrokerTestUtil.newUniqueName("pt-update-not-shared");
+        admin.topics().createPartitionedTopic(topic, 2);
+
+        final Field field = 
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+        field.setAccessible(true);
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImplExclusive = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.Exclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplExclusive.getProducers().size(), 2);
+
+        admin.topics().updatePartitionedTopic(topic, 3);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) 
field.get(producerImplExclusive)).numPartitions(), 3));
+        assertEquals(producerImplExclusive.getProducers().size(), 3);
+
+        producerImplExclusive.close();
+
+        @Cleanup
+        final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive = 
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+                .topic(topic)
+                .enableLazyStartPartitionedProducers(true)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+                .create();
+
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 3);
+
+        admin.topics().updatePartitionedTopic(topic, 4);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(((TopicMetadata) 
field.get(producerImplWaitForExclusive)).numPartitions(), 4));
+        assertEquals(producerImplWaitForExclusive.getProducers().size(), 4);
+    }
+}
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index ecac0b6..fd91101 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -576,4 +576,23 @@ public interface ProducerBuilder<T> extends Cloneable {
      * @since 2.5.0
      */
     ProducerBuilder<T> enableMultiSchema(boolean multiSchema);
+
+    /**
+     * This config affects Shared mode producers of partitioned topics only. 
It controls whether
+     * producers register and connect immediately to the owner broker of each 
partition
+     * or start lazily on demand. The internal producer of one partition is 
always
+     * started eagerly, chosen by the routing policy, but the internal 
producers of
+     * any additional partitions are started on demand, upon receiving their 
first
+     * message.
+     * Using this mode can reduce the strain on brokers for topics with large 
numbers of
+     * partitions and when the SinglePartition or some custom partial 
partition routing policy
+     * like PartialRoundRobinMessageRouterImpl is used without keyed messages.
+     * Because producer connection can be on demand, this can produce extra 
send latency
+     * for the first messages of a given partition.
+     *
+     * @param lazyStartPartitionedProducers
+     *            true/false as to whether to start partition producers lazily
+     * @return the producer builder instance
+     */
+    ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean 
lazyStartPartitionedProducers);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5702563..4525531 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -22,10 +22,10 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRouter;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
 import org.apache.pulsar.client.api.Schema;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,10 +58,11 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
     private static final Logger log = 
LoggerFactory.getLogger(PartitionedProducerImpl.class);
 
-    private List<ProducerImpl<T>> producers;
-    private MessageRouter routerPolicy;
+    private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
+    private final MessageRouter routerPolicy;
     private final ProducerStatsRecorderImpl stats;
     private TopicMetadata topicMetadata;
+    private final int firstPartitionIndex;
 
     // timeout related to auto check and subscribe partition increasement
     private volatile Timeout partitionsAutoUpdateTimeout = null;
@@ -68,15 +72,29 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
     public PartitionedProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfigurationData conf, int numPartitions,
             CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> 
schema, ProducerInterceptors interceptors) {
         super(client, topic, conf, producerCreatedFuture, schema, 
interceptors);
-        this.producers = Lists.newArrayListWithCapacity(numPartitions);
+        this.producers = new ConcurrentOpenHashMap<>();
         this.topicMetadata = new TopicMetadataImpl(numPartitions);
         this.routerPolicy = getMessageRouter();
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new 
ProducerStatsRecorderImpl() : null;
 
+        // MaxPendingMessagesAcrossPartitions doesn't support partial 
partition such as SinglePartition correctly
         int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
                 conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
         conf.setMaxPendingMessages(maxPendingMessages);
-        start();
+
+        final List<Integer> indexList;
+        if (conf.isLazyStartPartitionedProducers() &&
+                conf.getAccessMode() == ProducerAccessMode.Shared) {
+            // try to create producer at least one partition
+            indexList = Collections.singletonList(routerPolicy
+                    .choosePartition(((TypedMessageBuilderImpl<T>) 
newMessage()).getMessage(), topicMetadata));
+        } else {
+            // try to create producer for all partitions
+            indexList = IntStream.range(0, 
topicMetadata.numPartitions()).boxed().collect(Collectors.toList());
+        }
+
+        firstPartitionIndex = indexList.get(0);
+        start(indexList);
 
         // start track and auto subscribe partition increasement
         if (conf.isAutoUpdatePartitions()) {
@@ -113,25 +131,22 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
     @Override
     public String getProducerName() {
-        return producers.get(0).getProducerName();
+        return producers.get(firstPartitionIndex).getProducerName();
     }
 
     @Override
     public long getLastSequenceId() {
         // Return the highest sequence id across all partitions. This will be 
correct,
         // since there is a single id generator across all partitions for the 
same producer
-        return 
producers.stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1);
+        return 
producers.values().stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1);
     }
 
-    private void start() {
+    private void start(List<Integer> indexList) {
         AtomicReference<Throwable> createFail = new 
AtomicReference<Throwable>();
         AtomicInteger completed = new AtomicInteger();
-        for (int partitionIndex = 0; partitionIndex < 
topicMetadata.numPartitions(); partitionIndex++) {
-            String partitionName = 
TopicName.get(topic).getPartition(partitionIndex).toString();
-            ProducerImpl<T> producer = client.newProducerImpl(partitionName, 
partitionIndex,
-                    conf, schema, interceptors, new CompletableFuture<>());
-            producers.add(producer);
-            producer.producerCreatedFuture().handle((prod, createException) -> 
{
+
+        for (int partitionIndex : indexList) {
+            
createProducer(partitionIndex).producerCreatedFuture().handle((prod, 
createException) -> {
                 if (createException != null) {
                     setState(State.Failed);
                     createFail.compareAndSet(null, createException);
@@ -141,7 +156,7 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 // due to any
                 // failure in one of the partitions and close the successfully
                 // created partitions
-                if (completed.incrementAndGet() == 
topicMetadata.numPartitions()) {
+                if (completed.incrementAndGet() == indexList.size()) {
                     if (createFail.get() == null) {
                         setState(State.Ready);
                         log.info("[{}] Created partitioned producer", topic);
@@ -159,7 +174,14 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 return null;
             });
         }
+    }
 
+    private ProducerImpl<T> createProducer(final int partitionIndex) {
+        return producers.computeIfAbsent(partitionIndex, (idx) -> {
+            String partitionName = 
TopicName.get(topic).getPartition(idx).toString();
+            return client.newProducerImpl(partitionName, idx,
+                    conf, schema, interceptors, new CompletableFuture<>());
+        });
     }
 
     @Override
@@ -169,6 +191,34 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
 
     @Override
     CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, 
Transaction txn) {
+        int partition = routerPolicy.choosePartition(message, topicMetadata);
+        checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
+                "Illegal partition index chosen by the message routing policy: 
" + partition);
+
+        if (conf.isLazyStartPartitionedProducers() && 
!producers.containsKey(partition)) {
+            final ProducerImpl<T> newProducer = createProducer(partition);
+            final State createState = 
newProducer.producerCreatedFuture().handle((prod, createException) -> {
+                if (createException != null) {
+                    log.error("[{}] Could not create internal producer. 
partitionIndex: {}", topic, partition,
+                            createException);
+                    try {
+                        producers.remove(partition, newProducer);
+                        newProducer.close();
+                    } catch (PulsarClientException e) {
+                        log.error("[{}] Could not close internal producer. 
partitionIndex: {}", topic, partition, e);
+                    }
+                    return State.Failed;
+                }
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Created internal producer. partitionIndex: 
{}", topic, partition);
+                }
+                return State.Ready;
+            }).join();
+            if (createState == State.Failed) {
+                return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
+            }
+        }
+
         switch (getState()) {
             case Ready:
             case Connecting:
@@ -185,34 +235,30 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                 return FutureUtil.failedFuture(new 
PulsarClientException.NotConnectedException());
         }
 
-        int partition = routerPolicy.choosePartition(message, topicMetadata);
-        checkArgument(partition >= 0 && partition < 
topicMetadata.numPartitions(),
-                "Illegal partition index chosen by the message routing policy: 
" + partition);
         return producers.get(partition).internalSendWithTxnAsync(message, txn);
     }
 
     @Override
     public CompletableFuture<Void> flushAsync() {
-        List<CompletableFuture<Void>> flushFutures =
-            
producers.stream().map(ProducerImpl::flushAsync).collect(Collectors.toList());
-        return CompletableFuture.allOf(flushFutures.toArray(new 
CompletableFuture[flushFutures.size()]));
+        return CompletableFuture.allOf(
+                
producers.values().stream().map(ProducerImpl::flushAsync).toArray(CompletableFuture[]::new));
     }
 
     @Override
     void triggerFlush() {
-        producers.forEach(ProducerImpl::triggerFlush);
+        producers.values().forEach(ProducerImpl::triggerFlush);
     }
 
     @Override
     public boolean isConnected() {
         // returns false if any of the partition is not connected
-        return producers.stream().allMatch(ProducerImpl::isConnected);
+        return producers.values().stream().allMatch(ProducerImpl::isConnected);
     }
 
     @Override
     public long getLastDisconnectedTimestamp() {
         long lastDisconnectedTimestamp = 0;
-        Optional<ProducerImpl<T>> p = 
producers.stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
+        Optional<ProducerImpl<T>> p = 
producers.values().stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
         if (p.isPresent()) {
             lastDisconnectedTimestamp = p.get().getLastDisconnectedTimestamp();
         }
@@ -232,9 +278,9 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
         }
 
         AtomicReference<Throwable> closeFail = new 
AtomicReference<Throwable>();
-        AtomicInteger completed = new 
AtomicInteger(topicMetadata.numPartitions());
+        AtomicInteger completed = new AtomicInteger((int) producers.size());
         CompletableFuture<Void> closeFuture = new CompletableFuture<>();
-        for (Producer<T> producer : producers) {
+        for (Producer<T> producer : producers.values()) {
             if (producer != null) {
                 producer.closeAsync().handle((closed, ex) -> {
                     if (ex != null) {
@@ -268,14 +314,14 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
             return null;
         }
         stats.reset();
-        for (int i = 0; i < topicMetadata.numPartitions(); i++) {
-            stats.updateCumulativeStats(producers.get(i).getStats());
-        }
+        producers.values().forEach(p -> 
stats.updateCumulativeStats(p.getStats()));
         return stats;
     }
 
     public List<ProducerImpl<T>> getProducers() {
-        return producers.stream().collect(Collectors.toList());
+        return producers.values().stream()
+                .sorted(Comparator.comparingInt(e -> 
TopicName.getPartitionIndex(e.getTopic())))
+                .collect(Collectors.toList());
     }
 
     @Override
@@ -308,41 +354,47 @@ public class PartitionedProducerImpl<T> extends 
ProducerBase<T> {
                     future.complete(null);
                     return future;
                 } else if (oldPartitionNumber < currentPartitionNumber) {
-                    List<CompletableFuture<Producer<T>>> futureList = list
-                        .subList(oldPartitionNumber, currentPartitionNumber)
-                        .stream()
-                        .map(partitionName -> {
-                            int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
-                            ProducerImpl<T> producer =
-                                new ProducerImpl<>(client,
-                                    partitionName, conf, new 
CompletableFuture<>(),
-                                    partitionIndex, schema, interceptors);
-                            producers.add(producer);
-                            return producer.producerCreatedFuture();
-                        }).collect(Collectors.toList());
-
-                    FutureUtil.waitForAll(futureList)
-                        .thenAccept(finalFuture -> {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] success create producers for 
extended partitions. old: {}, new: {}",
-                                    topic, oldPartitionNumber, 
currentPartitionNumber);
-                            }
-                            topicMetadata = new 
TopicMetadataImpl(currentPartitionNumber);
-                            future.complete(null);
-                        })
-                        .exceptionally(ex -> {
-                            // error happened, remove
-                            log.warn("[{}] fail create producers for extended 
partitions. old: {}, new: {}",
-                                topic, oldPartitionNumber, 
currentPartitionNumber);
-                            List<ProducerImpl<T>> sublist = 
producers.subList(oldPartitionNumber, producers.size());
-                            sublist.forEach(newProducer -> 
newProducer.closeAsync());
-                            sublist.clear();
-                            future.completeExceptionally(ex);
-                            return null;
-                        });
-                    // call interceptor with the metadata change
-                    onPartitionsChange(topic, currentPartitionNumber);
-                    return null;
+                    if (conf.isLazyStartPartitionedProducers() && 
conf.getAccessMode() == ProducerAccessMode.Shared) {
+                        topicMetadata = new 
TopicMetadataImpl(currentPartitionNumber);
+                        future.complete(null);
+                        // call interceptor with the metadata change
+                        onPartitionsChange(topic, currentPartitionNumber);
+                        return future;
+                    } else {
+                        List<CompletableFuture<Producer<T>>> futureList = list
+                                .subList(oldPartitionNumber, 
currentPartitionNumber)
+                                .stream()
+                                .map(partitionName -> {
+                                    int partitionIndex = 
TopicName.getPartitionIndex(partitionName);
+                                    return 
producers.computeIfAbsent(partitionIndex, (idx) -> new ProducerImpl<>(
+                                            client, partitionName, conf, new 
CompletableFuture<>(),
+                                            idx, schema, 
interceptors)).producerCreatedFuture();
+                                }).collect(Collectors.toList());
+
+                        FutureUtil.waitForAll(futureList)
+                                .thenAccept(finalFuture -> {
+                                    if (log.isDebugEnabled()) {
+                                        log.debug(
+                                                "[{}] success create producers 
for extended partitions."
+                                                        + " old: {}, new: {}",
+                                                topic, oldPartitionNumber, 
currentPartitionNumber);
+                                    }
+                                    topicMetadata = new 
TopicMetadataImpl(currentPartitionNumber);
+                                    future.complete(null);
+                                })
+                                .exceptionally(ex -> {
+                                    // error happened, remove
+                                    log.warn("[{}] fail create producers for 
extended partitions. old: {}, new: {}",
+                                            topic, oldPartitionNumber, 
currentPartitionNumber);
+                                    IntStream.range(oldPartitionNumber, (int) 
producers.size())
+                                            .forEach(i -> 
producers.remove(i).closeAsync());
+                                    future.completeExceptionally(ex);
+                                    return null;
+                                });
+                        // call interceptor with the metadata change
+                        onPartitionsChange(topic, currentPartitionNumber);
+                        return null;
+                    }
                 } else {
                     log.error("[{}] not support shrink topic partitions. old: 
{}, new: {}",
                         topic, oldPartitionNumber, currentPartitionNumber);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index d3a1bb1..497b82a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -322,6 +322,12 @@ public class ProducerBuilderImpl<T> implements 
ProducerBuilder<T> {
         return this;
     }
 
+    @Override
+    public ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean 
lazyStartPartitionedProducers) {
+        conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
+        return this;
+    }
+
     private void setMessageRoutingMode() throws PulsarClientException {
         if(conf.getMessageRoutingMode() == null && 
conf.getCustomMessageRouter() == null) {
             messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 4b96767..12fb4ab 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -102,6 +102,8 @@ public class ProducerConfigurationData implements 
Serializable, Cloneable {
 
     private ProducerAccessMode accessMode = ProducerAccessMode.Shared;
 
+    private boolean lazyStartPartitionedProducers = false;
+
     private SortedMap<String, String> properties = new TreeMap<>();
 
     /**
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
new file mode 100644
index 0000000..fe244a6
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.customroute;
+
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.TopicMetadata;
+
+public class PartialRoundRobinMessageRouterImpl implements MessageRouter {
+    private final int numPartitionsLimit;
+    private final List<Integer> partialList = new CopyOnWriteArrayList<>();
+    private static final 
AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl> 
PARTITION_INDEX_UPDATER =
+            
AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class, 
"partitionIndex");
+    @SuppressWarnings("unused")
+    private volatile int partitionIndex = 0;
+
+    public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) {
+        if (numPartitionsLimit < 1) {
+            throw new IllegalArgumentException("numPartitionsLimit should be 
greater than or equal to 1");
+        }
+        this.numPartitionsLimit = numPartitionsLimit;
+    }
+
+    /**
+     * Choose a partition based on the topic metadata.
+     * Key hash routing isn't supported.
+     *
+     * @param msg message
+     * @param metadata topic metadata
+     * @return the partition to route the message.
+     */
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        final List<Integer> newPartialList = new 
ArrayList<>(getOrCreatePartialList(metadata));
+        return newPartialList
+                
.get(signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
newPartialList.size()));
+    }
+
+    private List<Integer> getOrCreatePartialList(TopicMetadata metadata) {
+        if (partialList.isEmpty()
+                || partialList.size() < numPartitionsLimit && 
partialList.size() < metadata.numPartitions()) {
+            synchronized (this) {
+                if (partialList.isEmpty()) {
+                    partialList.addAll(IntStream.range(0, 
metadata.numPartitions()).boxed()
+                            
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
+                                Collections.shuffle(list);
+                                return list.stream();
+                            
})).limit(numPartitionsLimit).collect(Collectors.toList()));
+                } else if (partialList.size() < numPartitionsLimit && 
partialList.size() < metadata.numPartitions()) {
+                    partialList.addAll(IntStream.range(0, 
metadata.numPartitions()).boxed()
+                            .filter(e -> !partialList.contains(e))
+                            
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
+                                Collections.shuffle(list);
+                                return list.stream();
+                            })).limit(numPartitionsLimit - 
partialList.size()).collect(Collectors.toList()));
+                }
+            }
+        }
+        return partialList;
+    }
+}
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index d4c6aa4..1f9496b 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,18 +18,25 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import com.google.api.client.util.Lists;
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import org.apache.pulsar.client.api.Message;
@@ -40,7 +47,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import 
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.assertj.core.util.Sets;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
@@ -107,6 +117,45 @@ public class PartitionedProducerImplTest {
         assertTrue(messageRouter instanceof CustomMessageRouter);
     }
 
+    @Test
+    public void testPartialPartition() {
+        final MessageRouter router = new PartialRoundRobinMessageRouterImpl(3);
+        final Set<Integer> actualSet = Sets.newHashSet();
+        final Message<byte[]> msg = MessageImpl
+                .create(new MessageMetadata(), ByteBuffer.wrap(new byte[0]), 
Schema.BYTES, null);
+
+        for (int i = 0; i < 10; i++) {
+            final TopicMetadata metadata = new TopicMetadataImpl(10);
+            actualSet.add(router.choosePartition(msg, metadata));
+        }
+        assertEquals(actualSet.size(), 3);
+
+        try {
+            new PartialRoundRobinMessageRouterImpl(0);
+            fail();
+        } catch (Exception e) {
+            assertEquals(e.getClass(), IllegalArgumentException.class);
+        }
+    }
+
+    @Test
+    public void testPartialPartitionWithKey() {
+        final MessageRouter router = new PartialRoundRobinMessageRouterImpl(3);
+        final Hash hash = Murmur3_32Hash.getInstance();
+        final List<Integer> expectedHashList = Lists.newArrayList();
+        final List<Integer> actualHashList = Lists.newArrayList();
+
+        for (int i = 0; i < 10; i++) {
+            final String key = String.valueOf(i);
+            final Message<byte[]> msg = MessageImpl
+                    .create(new MessageMetadata().setPartitionKey(key), 
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
+            final TopicMetadata metadata = new TopicMetadataImpl(10);
+            expectedHashList.add(signSafeMod(hash.makeHash(key), 10));
+            actualHashList.add(router.choosePartition(msg, metadata));
+        }
+        assertNotEquals(actualHashList, expectedHashList);
+    }
+
     private MessageRouter getMessageRouter(ProducerConfigurationData 
producerConfigurationData)
             throws NoSuchFieldException, IllegalAccessException {
         PartitionedProducerImpl impl = new PartitionedProducerImpl(

Reply via email to