This is an automated email from the ASF dual-hosted git repository.
massakam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1f8945a [PIP 79][client] Add lazy-loading feature to
PartitionedProducer (#10279)
1f8945a is described below
commit 1f8945addb18bf1f94366d2c7f5286ee6f62c675
Author: Yuri Mizushima <[email protected]>
AuthorDate: Fri Oct 15 14:12:21 2021 +0900
[PIP 79][client] Add lazy-loading feature to PartitionedProducer (#10279)
* feat: add lazy loading feature to PartitionedProducerImpl
* feat: add partial round robin routing mode
* test: add tests for lazy-loading
* fix: fix producer closing code at lazy-loading
* test: remove unnecessary handling, fail from test codes
* feat: add enableLazyStartPartitionedProducers config
* test: fix test for lazy-loading config
* fix: address comments
* fix: add partition-change interceptor
* fix: address comments
---
.../apache/pulsar/client/api/ClientErrorsTest.java | 97 +++++++-
.../impl/PartialPartitionedProducerTest.java | 255 +++++++++++++++++++++
.../apache/pulsar/client/api/ProducerBuilder.java | 19 ++
.../client/impl/PartitionedProducerImpl.java | 182 +++++++++------
.../pulsar/client/impl/ProducerBuilderImpl.java | 6 +
.../impl/conf/ProducerConfigurationData.java | 2 +
.../PartialRoundRobinMessageRouterImpl.java | 85 +++++++
.../client/impl/PartitionedProducerImplTest.java | 49 ++++
8 files changed, 628 insertions(+), 67 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index a38b711..f66b198 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.pulsar.client.impl.ConsumerBase;
+import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
import org.apache.pulsar.common.api.proto.ServerError;
@@ -462,8 +463,100 @@ public class ClientErrorsTest {
mockBrokerService.resetHandleSubscribe();
}
- // if a producer fails to connect while creating partitioned producer, it
should close all successful connections of
- // other producers and fail
+ // failed to connect to partition at initialization step if a producer
which connects to broker as lazy-loading mode
+ @Test
+ public void testPartitionedProducerFailOnInitialization() throws Throwable
{
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+ final AtomicInteger producerCounter = new AtomicInteger(0);
+
+ mockBrokerService.setHandleProducer((ctx, producer) -> {
+ if (producerCounter.incrementAndGet() == 1) {
+ ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.AuthorizationError, "msg"));
+ return;
+ }
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
+ });
+
+ try {
+ client.newProducer()
+ .enableLazyStartPartitionedProducers(true)
+ .accessMode(ProducerAccessMode.Shared)
+ .topic("persistent://prop/use/ns/multi-part-t1").create();
+ fail("Should have failed with an authorization error");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.AuthorizationException);
+ }
+ assertEquals(producerCounter.get(), 1);
+
+ mockBrokerService.resetHandleProducer();
+ mockBrokerService.resetHandleCloseProducer();
+ client.close();
+ }
+
+ // failed to connect to partition at sending step if a producer which
connects to broker as lazy-loading mode
+ @Test
+ public void testPartitionedProducerFailOnSending() throws Throwable {
+ @Cleanup
+ PulsarClient client =
PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build();
+ final AtomicInteger producerCounter = new AtomicInteger(0);
+ final AtomicInteger closeCounter = new AtomicInteger(0);
+ final String topicName = "persistent://prop/use/ns/multi-part-t1";
+
+ mockBrokerService.setHandleProducer((ctx, producer) -> {
+ if (producerCounter.incrementAndGet() == 2) {
+ ctx.writeAndFlush(Commands.newError(producer.getRequestId(),
ServerError.AuthorizationError, "msg"));
+ return;
+ }
+
ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(),
"default-producer", SchemaVersion.Empty));
+ });
+
+ mockBrokerService.setHandleSend((ctx, send, headersAndPayload) ->
+ ctx.writeAndFlush(Commands.newSendReceipt(send.getProducerId(),
send.getSequenceId(), send.getHighestSequenceId(), 0L, 0L))
+ );
+
+ mockBrokerService.setHandleCloseProducer((ctx, closeProducer) -> {
+
ctx.writeAndFlush(Commands.newSuccess(closeProducer.getRequestId()));
+ closeCounter.incrementAndGet();
+ });
+
+ final PartitionedProducerImpl<byte[]> producer =
(PartitionedProducerImpl<byte[]>) client.newProducer()
+ .enableLazyStartPartitionedProducers(true)
+ .accessMode(ProducerAccessMode.Shared)
+ .topic(topicName)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ try {
+ producer.send("msg".getBytes());
+ fail("Should have failed with an not connected exception");
+ } catch (Exception e) {
+ assertTrue(e instanceof
PulsarClientException.NotConnectedException);
+ assertEquals(producer.getProducers().size(), 1);
+ }
+
+ try {
+ // recreate failed producer
+ for (int i = 0; i <
client.getPartitionsForTopic(topicName).get().size(); i++) {
+ producer.send("msg".getBytes());
+ }
+ assertEquals(producer.getProducers().size(),
client.getPartitionsForTopic(topicName).get().size());
+ assertEquals(producerCounter.get(), 5);
+ } catch (Exception e) {
+ fail();
+ }
+
+ // should not call close
+ assertEquals(closeCounter.get(), 0);
+
+ mockBrokerService.resetHandleProducer();
+ mockBrokerService.resetHandleCloseProducer();
+ client.close();
+ }
+
+ // if a producer which doesn't connect as lazy-loading mode fails to
connect while creating partitioned producer,
+ // it should close all successful connections of other producers and fail
@Test
public void
testOneProducerFailShouldCloseAllProducersInPartitionedProducer() throws
Exception {
@Cleanup
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
new file mode 100644
index 0000000..7fd992a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PartialPartitionedProducerTest.java
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.TopicMetadata;
+import
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class PartialPartitionedProducerTest extends ProducerConsumerBase {
+ @Override
+ @BeforeClass
+ public void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @Override
+ @AfterClass(alwaysRun = true)
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testPtWithSinglePartition() throws Throwable {
+ final String topic =
BrokerTestUtil.newUniqueName("pt-with-single-routing");
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImpl =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.SinglePartition)
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producerImpl.newMessage().value("msg".getBytes()).send();
+ }
+ assertEquals(producerImpl.getProducers().size(), 1);
+ }
+
+ @Test
+ public void testPtWithPartialPartition() throws Throwable {
+ final String topic =
BrokerTestUtil.newUniqueName("pt-with-partial-routing");
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImpl =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+ .create();
+
+ for (int i = 0; i < 10; i++) {
+ producerImpl.newMessage().value("msg".getBytes()).send();
+ }
+ assertEquals(producerImpl.getProducers().size(), 3);
+ }
+
+ // AddPartitionTest
+ @Test
+ public void testPtLazyLoading() throws Throwable {
+ final String topic = BrokerTestUtil.newUniqueName("pt-lazily");
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImpl =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ final Supplier<Boolean> send = () -> {
+ for (int i = 0; i < 10; i++) {
+ try {
+ producerImpl.newMessage().value("msg".getBytes()).send();
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ // create first producer at initialization step
+ assertEquals(producerImpl.getProducers().size(), 1);
+
+ assertTrue(send.get());
+ assertEquals(producerImpl.getProducers().size(), 10);
+ }
+
+ @Test
+ public void testPtLoadingNotSharedMode() throws Throwable {
+ final String topic =
BrokerTestUtil.newUniqueName("pt-not-shared-mode");
+ admin.topics().createPartitionedTopic(topic, 10);
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImplExclusive =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ // create first producer at initialization step
+ assertEquals(producerImplExclusive.getProducers().size(), 10);
+
+ producerImplExclusive.close();
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ assertEquals(producerImplWaitForExclusive.getProducers().size(), 10);
+ }
+
+ // AddPartitionAndLimitTest
+ @Test
+ public void testPtUpdateWithPartialPartition() throws Throwable {
+ final String topic =
BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ final Field field =
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+ field.setAccessible(true);
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImpl =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.CustomPartition)
+ .messageRouter(new PartialRoundRobinMessageRouterImpl(3))
+ .accessMode(ProducerAccessMode.Shared)
+ .autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .create();
+
+ final Supplier<Boolean> send = ()-> {
+ for (int i = 0; i < 10; i++) {
+ try {
+ producerImpl.newMessage().value("msg".getBytes()).send();
+ } catch (Throwable e) {
+ return false;
+ }
+ }
+ return true;
+ };
+
+ // create first producer at initialization step
+ assertEquals(producerImpl.getProducers().size(), 1);
+
+ assertTrue(send.get());
+ assertEquals(producerImpl.getProducers().size(), 2);
+
+ admin.topics().updatePartitionedTopic(topic, 3);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(((TopicMetadata)
field.get(producerImpl)).numPartitions(), 3));
+ assertEquals(producerImpl.getProducers().size(), 2);
+
+ assertTrue(send.get());
+ assertEquals(producerImpl.getProducers().size(), 3);
+
+ admin.topics().updatePartitionedTopic(topic, 4);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(((TopicMetadata)
field.get(producerImpl)).numPartitions(), 4));
+ assertTrue(send.get());
+ assertEquals(producerImpl.getProducers().size(), 3);
+ }
+
+ @Test
+ public void testPtUpdateNotSharedMode() throws Throwable {
+ final String topic =
BrokerTestUtil.newUniqueName("pt-update-not-shared");
+ admin.topics().createPartitionedTopic(topic, 2);
+
+ final Field field =
PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
+ field.setAccessible(true);
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImplExclusive =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .accessMode(ProducerAccessMode.Exclusive)
+ .autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .create();
+
+ assertEquals(producerImplExclusive.getProducers().size(), 2);
+
+ admin.topics().updatePartitionedTopic(topic, 3);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(((TopicMetadata)
field.get(producerImplExclusive)).numPartitions(), 3));
+ assertEquals(producerImplExclusive.getProducers().size(), 3);
+
+ producerImplExclusive.close();
+
+ @Cleanup
+ final PartitionedProducerImpl<byte[]> producerImplWaitForExclusive =
(PartitionedProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic(topic)
+ .enableLazyStartPartitionedProducers(true)
+ .enableBatching(false)
+ .messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
+ .accessMode(ProducerAccessMode.WaitForExclusive)
+ .autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval(1, TimeUnit.SECONDS)
+ .create();
+
+ assertEquals(producerImplWaitForExclusive.getProducers().size(), 3);
+
+ admin.topics().updatePartitionedTopic(topic, 4);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(((TopicMetadata)
field.get(producerImplWaitForExclusive)).numPartitions(), 4));
+ assertEquals(producerImplWaitForExclusive.getProducers().size(), 4);
+ }
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index ecac0b6..fd91101 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -576,4 +576,23 @@ public interface ProducerBuilder<T> extends Cloneable {
* @since 2.5.0
*/
ProducerBuilder<T> enableMultiSchema(boolean multiSchema);
+
+ /**
+ * This config affects Shared mode producers of partitioned topics only.
It controls whether
+ * producers register and connect immediately to the owner broker of each
partition
+ * or start lazily on demand. The internal producer of one partition is
always
+ * started eagerly, chosen by the routing policy, but the internal
producers of
+ * any additional partitions are started on demand, upon receiving their
first
+ * message.
+ * Using this mode can reduce the strain on brokers for topics with large
numbers of
+ * partitions and when the SinglePartition or some custom partial
partition routing policy
+ * like PartialRoundRobinMessageRouterImpl is used without keyed messages.
+ * Because producer connection can be on demand, this can produce extra
send latency
+ * for the first messages of a given partition.
+ *
+ * @param lazyStartPartitionedProducers
+ * true/false as to whether to start partition producers lazily
+ * @return the producer builder instance
+ */
+ ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean
lazyStartPartitionedProducers);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 5702563..4525531 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -22,10 +22,10 @@ import static
com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -35,11 +35,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
@@ -48,6 +50,7 @@ import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,10 +58,11 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
private static final Logger log =
LoggerFactory.getLogger(PartitionedProducerImpl.class);
- private List<ProducerImpl<T>> producers;
- private MessageRouter routerPolicy;
+ private final ConcurrentOpenHashMap<Integer, ProducerImpl<T>> producers;
+ private final MessageRouter routerPolicy;
private final ProducerStatsRecorderImpl stats;
private TopicMetadata topicMetadata;
+ private final int firstPartitionIndex;
// timeout related to auto check and subscribe partition increasement
private volatile Timeout partitionsAutoUpdateTimeout = null;
@@ -68,15 +72,29 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
public PartitionedProducerImpl(PulsarClientImpl client, String topic,
ProducerConfigurationData conf, int numPartitions,
CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T>
schema, ProducerInterceptors interceptors) {
super(client, topic, conf, producerCreatedFuture, schema,
interceptors);
- this.producers = Lists.newArrayListWithCapacity(numPartitions);
+ this.producers = new ConcurrentOpenHashMap<>();
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new
ProducerStatsRecorderImpl() : null;
+ // MaxPendingMessagesAcrossPartitions doesn't support partial
partition such as SinglePartition correctly
int maxPendingMessages = Math.min(conf.getMaxPendingMessages(),
conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
conf.setMaxPendingMessages(maxPendingMessages);
- start();
+
+ final List<Integer> indexList;
+ if (conf.isLazyStartPartitionedProducers() &&
+ conf.getAccessMode() == ProducerAccessMode.Shared) {
+ // try to create producer at least one partition
+ indexList = Collections.singletonList(routerPolicy
+ .choosePartition(((TypedMessageBuilderImpl<T>)
newMessage()).getMessage(), topicMetadata));
+ } else {
+ // try to create producer for all partitions
+ indexList = IntStream.range(0,
topicMetadata.numPartitions()).boxed().collect(Collectors.toList());
+ }
+
+ firstPartitionIndex = indexList.get(0);
+ start(indexList);
// start track and auto subscribe partition increasement
if (conf.isAutoUpdatePartitions()) {
@@ -113,25 +131,22 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
@Override
public String getProducerName() {
- return producers.get(0).getProducerName();
+ return producers.get(firstPartitionIndex).getProducerName();
}
@Override
public long getLastSequenceId() {
// Return the highest sequence id across all partitions. This will be
correct,
// since there is a single id generator across all partitions for the
same producer
- return
producers.stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1);
+ return
producers.values().stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1);
}
- private void start() {
+ private void start(List<Integer> indexList) {
AtomicReference<Throwable> createFail = new
AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger();
- for (int partitionIndex = 0; partitionIndex <
topicMetadata.numPartitions(); partitionIndex++) {
- String partitionName =
TopicName.get(topic).getPartition(partitionIndex).toString();
- ProducerImpl<T> producer = client.newProducerImpl(partitionName,
partitionIndex,
- conf, schema, interceptors, new CompletableFuture<>());
- producers.add(producer);
- producer.producerCreatedFuture().handle((prod, createException) ->
{
+
+ for (int partitionIndex : indexList) {
+
createProducer(partitionIndex).producerCreatedFuture().handle((prod,
createException) -> {
if (createException != null) {
setState(State.Failed);
createFail.compareAndSet(null, createException);
@@ -141,7 +156,7 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
// due to any
// failure in one of the partitions and close the successfully
// created partitions
- if (completed.incrementAndGet() ==
topicMetadata.numPartitions()) {
+ if (completed.incrementAndGet() == indexList.size()) {
if (createFail.get() == null) {
setState(State.Ready);
log.info("[{}] Created partitioned producer", topic);
@@ -159,7 +174,14 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
return null;
});
}
+ }
+ private ProducerImpl<T> createProducer(final int partitionIndex) {
+ return producers.computeIfAbsent(partitionIndex, (idx) -> {
+ String partitionName =
TopicName.get(topic).getPartition(idx).toString();
+ return client.newProducerImpl(partitionName, idx,
+ conf, schema, interceptors, new CompletableFuture<>());
+ });
}
@Override
@@ -169,6 +191,34 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message,
Transaction txn) {
+ int partition = routerPolicy.choosePartition(message, topicMetadata);
+ checkArgument(partition >= 0 && partition <
topicMetadata.numPartitions(),
+ "Illegal partition index chosen by the message routing policy:
" + partition);
+
+ if (conf.isLazyStartPartitionedProducers() &&
!producers.containsKey(partition)) {
+ final ProducerImpl<T> newProducer = createProducer(partition);
+ final State createState =
newProducer.producerCreatedFuture().handle((prod, createException) -> {
+ if (createException != null) {
+ log.error("[{}] Could not create internal producer.
partitionIndex: {}", topic, partition,
+ createException);
+ try {
+ producers.remove(partition, newProducer);
+ newProducer.close();
+ } catch (PulsarClientException e) {
+ log.error("[{}] Could not close internal producer.
partitionIndex: {}", topic, partition, e);
+ }
+ return State.Failed;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Created internal producer. partitionIndex:
{}", topic, partition);
+ }
+ return State.Ready;
+ }).join();
+ if (createState == State.Failed) {
+ return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
+ }
+ }
+
switch (getState()) {
case Ready:
case Connecting:
@@ -185,34 +235,30 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
return FutureUtil.failedFuture(new
PulsarClientException.NotConnectedException());
}
- int partition = routerPolicy.choosePartition(message, topicMetadata);
- checkArgument(partition >= 0 && partition <
topicMetadata.numPartitions(),
- "Illegal partition index chosen by the message routing policy:
" + partition);
return producers.get(partition).internalSendWithTxnAsync(message, txn);
}
@Override
public CompletableFuture<Void> flushAsync() {
- List<CompletableFuture<Void>> flushFutures =
-
producers.stream().map(ProducerImpl::flushAsync).collect(Collectors.toList());
- return CompletableFuture.allOf(flushFutures.toArray(new
CompletableFuture[flushFutures.size()]));
+ return CompletableFuture.allOf(
+
producers.values().stream().map(ProducerImpl::flushAsync).toArray(CompletableFuture[]::new));
}
@Override
void triggerFlush() {
- producers.forEach(ProducerImpl::triggerFlush);
+ producers.values().forEach(ProducerImpl::triggerFlush);
}
@Override
public boolean isConnected() {
// returns false if any of the partition is not connected
- return producers.stream().allMatch(ProducerImpl::isConnected);
+ return producers.values().stream().allMatch(ProducerImpl::isConnected);
}
@Override
public long getLastDisconnectedTimestamp() {
long lastDisconnectedTimestamp = 0;
- Optional<ProducerImpl<T>> p =
producers.stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
+ Optional<ProducerImpl<T>> p =
producers.values().stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
if (p.isPresent()) {
lastDisconnectedTimestamp = p.get().getLastDisconnectedTimestamp();
}
@@ -232,9 +278,9 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
}
AtomicReference<Throwable> closeFail = new
AtomicReference<Throwable>();
- AtomicInteger completed = new
AtomicInteger(topicMetadata.numPartitions());
+ AtomicInteger completed = new AtomicInteger((int) producers.size());
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
- for (Producer<T> producer : producers) {
+ for (Producer<T> producer : producers.values()) {
if (producer != null) {
producer.closeAsync().handle((closed, ex) -> {
if (ex != null) {
@@ -268,14 +314,14 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
return null;
}
stats.reset();
- for (int i = 0; i < topicMetadata.numPartitions(); i++) {
- stats.updateCumulativeStats(producers.get(i).getStats());
- }
+ producers.values().forEach(p ->
stats.updateCumulativeStats(p.getStats()));
return stats;
}
public List<ProducerImpl<T>> getProducers() {
- return producers.stream().collect(Collectors.toList());
+ return producers.values().stream()
+ .sorted(Comparator.comparingInt(e ->
TopicName.getPartitionIndex(e.getTopic())))
+ .collect(Collectors.toList());
}
@Override
@@ -308,41 +354,47 @@ public class PartitionedProducerImpl<T> extends
ProducerBase<T> {
future.complete(null);
return future;
} else if (oldPartitionNumber < currentPartitionNumber) {
- List<CompletableFuture<Producer<T>>> futureList = list
- .subList(oldPartitionNumber, currentPartitionNumber)
- .stream()
- .map(partitionName -> {
- int partitionIndex =
TopicName.getPartitionIndex(partitionName);
- ProducerImpl<T> producer =
- new ProducerImpl<>(client,
- partitionName, conf, new
CompletableFuture<>(),
- partitionIndex, schema, interceptors);
- producers.add(producer);
- return producer.producerCreatedFuture();
- }).collect(Collectors.toList());
-
- FutureUtil.waitForAll(futureList)
- .thenAccept(finalFuture -> {
- if (log.isDebugEnabled()) {
- log.debug("[{}] success create producers for
extended partitions. old: {}, new: {}",
- topic, oldPartitionNumber,
currentPartitionNumber);
- }
- topicMetadata = new
TopicMetadataImpl(currentPartitionNumber);
- future.complete(null);
- })
- .exceptionally(ex -> {
- // error happened, remove
- log.warn("[{}] fail create producers for extended
partitions. old: {}, new: {}",
- topic, oldPartitionNumber,
currentPartitionNumber);
- List<ProducerImpl<T>> sublist =
producers.subList(oldPartitionNumber, producers.size());
- sublist.forEach(newProducer ->
newProducer.closeAsync());
- sublist.clear();
- future.completeExceptionally(ex);
- return null;
- });
- // call interceptor with the metadata change
- onPartitionsChange(topic, currentPartitionNumber);
- return null;
+ if (conf.isLazyStartPartitionedProducers() &&
conf.getAccessMode() == ProducerAccessMode.Shared) {
+ topicMetadata = new
TopicMetadataImpl(currentPartitionNumber);
+ future.complete(null);
+ // call interceptor with the metadata change
+ onPartitionsChange(topic, currentPartitionNumber);
+ return future;
+ } else {
+ List<CompletableFuture<Producer<T>>> futureList = list
+ .subList(oldPartitionNumber,
currentPartitionNumber)
+ .stream()
+ .map(partitionName -> {
+ int partitionIndex =
TopicName.getPartitionIndex(partitionName);
+ return
producers.computeIfAbsent(partitionIndex, (idx) -> new ProducerImpl<>(
+ client, partitionName, conf, new
CompletableFuture<>(),
+ idx, schema,
interceptors)).producerCreatedFuture();
+ }).collect(Collectors.toList());
+
+ FutureUtil.waitForAll(futureList)
+ .thenAccept(finalFuture -> {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}] success create producers
for extended partitions."
+ + " old: {}, new: {}",
+ topic, oldPartitionNumber,
currentPartitionNumber);
+ }
+ topicMetadata = new
TopicMetadataImpl(currentPartitionNumber);
+ future.complete(null);
+ })
+ .exceptionally(ex -> {
+ // error happened, remove
+ log.warn("[{}] fail create producers for
extended partitions. old: {}, new: {}",
+ topic, oldPartitionNumber,
currentPartitionNumber);
+ IntStream.range(oldPartitionNumber, (int)
producers.size())
+ .forEach(i ->
producers.remove(i).closeAsync());
+ future.completeExceptionally(ex);
+ return null;
+ });
+ // call interceptor with the metadata change
+ onPartitionsChange(topic, currentPartitionNumber);
+ return null;
+ }
} else {
log.error("[{}] not support shrink topic partitions. old:
{}, new: {}",
topic, oldPartitionNumber, currentPartitionNumber);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index d3a1bb1..497b82a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -322,6 +322,12 @@ public class ProducerBuilderImpl<T> implements
ProducerBuilder<T> {
return this;
}
+ @Override
+ public ProducerBuilder<T> enableLazyStartPartitionedProducers(boolean
lazyStartPartitionedProducers) {
+ conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
+ return this;
+ }
+
private void setMessageRoutingMode() throws PulsarClientException {
if(conf.getMessageRoutingMode() == null &&
conf.getCustomMessageRouter() == null) {
messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
index 4b96767..12fb4ab 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -102,6 +102,8 @@ public class ProducerConfigurationData implements
Serializable, Cloneable {
private ProducerAccessMode accessMode = ProducerAccessMode.Shared;
+ private boolean lazyStartPartitionedProducers = false;
+
private SortedMap<String, String> properties = new TreeMap<>();
/**
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
new file mode 100644
index 0000000..fe244a6
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.customroute;
+
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.TopicMetadata;
+
+public class PartialRoundRobinMessageRouterImpl implements MessageRouter {
+ private final int numPartitionsLimit;
+ private final List<Integer> partialList = new CopyOnWriteArrayList<>();
+ private static final
AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl>
PARTITION_INDEX_UPDATER =
+
AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class,
"partitionIndex");
+ @SuppressWarnings("unused")
+ private volatile int partitionIndex = 0;
+
+ public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) {
+ if (numPartitionsLimit < 1) {
+ throw new IllegalArgumentException("numPartitionsLimit should be
greater than or equal to 1");
+ }
+ this.numPartitionsLimit = numPartitionsLimit;
+ }
+
+ /**
+ * Choose a partition based on the topic metadata.
+ * Key hash routing isn't supported.
+ *
+ * @param msg message
+ * @param metadata topic metadata
+ * @return the partition to route the message.
+ */
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ final List<Integer> newPartialList = new
ArrayList<>(getOrCreatePartialList(metadata));
+ return newPartialList
+
.get(signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this),
newPartialList.size()));
+ }
+
+ private List<Integer> getOrCreatePartialList(TopicMetadata metadata) {
+ if (partialList.isEmpty()
+ || partialList.size() < numPartitionsLimit &&
partialList.size() < metadata.numPartitions()) {
+ synchronized (this) {
+ if (partialList.isEmpty()) {
+ partialList.addAll(IntStream.range(0,
metadata.numPartitions()).boxed()
+
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
+ Collections.shuffle(list);
+ return list.stream();
+
})).limit(numPartitionsLimit).collect(Collectors.toList()));
+ } else if (partialList.size() < numPartitionsLimit &&
partialList.size() < metadata.numPartitions()) {
+ partialList.addAll(IntStream.range(0,
metadata.numPartitions()).boxed()
+ .filter(e -> !partialList.contains(e))
+
.collect(Collectors.collectingAndThen(Collectors.toList(), list -> {
+ Collections.shuffle(list);
+ return list.stream();
+ })).limit(numPartitionsLimit -
partialList.size()).collect(Collectors.toList()));
+ }
+ }
+ }
+ return partialList;
+ }
+}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
index d4c6aa4..1f9496b 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PartitionedProducerImplTest.java
@@ -18,18 +18,25 @@
*/
package org.apache.pulsar.client.impl;
+import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
+import com.google.api.client.util.Lists;
import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import org.apache.pulsar.client.api.Message;
@@ -40,7 +47,10 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import
org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.assertj.core.util.Sets;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -107,6 +117,45 @@ public class PartitionedProducerImplTest {
assertTrue(messageRouter instanceof CustomMessageRouter);
}
+ @Test
+ public void testPartialPartition() {
+ final MessageRouter router = new PartialRoundRobinMessageRouterImpl(3);
+ final Set<Integer> actualSet = Sets.newHashSet();
+ final Message<byte[]> msg = MessageImpl
+ .create(new MessageMetadata(), ByteBuffer.wrap(new byte[0]),
Schema.BYTES, null);
+
+ for (int i = 0; i < 10; i++) {
+ final TopicMetadata metadata = new TopicMetadataImpl(10);
+ actualSet.add(router.choosePartition(msg, metadata));
+ }
+ assertEquals(actualSet.size(), 3);
+
+ try {
+ new PartialRoundRobinMessageRouterImpl(0);
+ fail();
+ } catch (Exception e) {
+ assertEquals(e.getClass(), IllegalArgumentException.class);
+ }
+ }
+
+ @Test
+ public void testPartialPartitionWithKey() {
+ final MessageRouter router = new PartialRoundRobinMessageRouterImpl(3);
+ final Hash hash = Murmur3_32Hash.getInstance();
+ final List<Integer> expectedHashList = Lists.newArrayList();
+ final List<Integer> actualHashList = Lists.newArrayList();
+
+ for (int i = 0; i < 10; i++) {
+ final String key = String.valueOf(i);
+ final Message<byte[]> msg = MessageImpl
+ .create(new MessageMetadata().setPartitionKey(key),
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
+ final TopicMetadata metadata = new TopicMetadataImpl(10);
+ expectedHashList.add(signSafeMod(hash.makeHash(key), 10));
+ actualHashList.add(router.choosePartition(msg, metadata));
+ }
+ assertNotEquals(actualHashList, expectedHashList);
+ }
+
private MessageRouter getMessageRouter(ProducerConfigurationData
producerConfigurationData)
throws NoSuchFieldException, IllegalAccessException {
PartitionedProducerImpl impl = new PartitionedProducerImpl(