eolivelli commented on a change in pull request #10279: URL: https://github.com/apache/pulsar/pull/10279#discussion_r718160863
########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/customroute/PartialRoundRobinMessageRouterImpl.java ########## @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.customroute; + +import static org.apache.pulsar.client.util.MathUtils.signSafeMod; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageRouter; +import org.apache.pulsar.client.api.TopicMetadata; + +public class PartialRoundRobinMessageRouterImpl implements MessageRouter { + private final int numPartitionsLimit; + private final List<Integer> partialList = new CopyOnWriteArrayList<>(); + private static final AtomicIntegerFieldUpdater<PartialRoundRobinMessageRouterImpl> PARTITION_INDEX_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(PartialRoundRobinMessageRouterImpl.class, "partitionIndex"); + @SuppressWarnings("unused") + private volatile int partitionIndex = 0; + + public PartialRoundRobinMessageRouterImpl(final int numPartitionsLimit) { + this.numPartitionsLimit = numPartitionsLimit; + } + + /** + * Choose a partition based on the topic metadata. + * Key hash routing isn't supported. + * + * @param msg message + * @param metadata topic metadata + * @return the partition to route the message. + */ + public int choosePartition(Message<?> msg, TopicMetadata metadata) { + return getOrCreatePartialList(metadata) + .get(signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), partialList.size())); + } + + private List<Integer> getOrCreatePartialList(TopicMetadata metadata) { + if (partialList.isEmpty()) { + partialList.addAll(IntStream.range(0, metadata.numPartitions()) Review comment: Here we are kot handling concurrency correctly. Two threads may enter this branch ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java ########## @@ -68,15 +72,25 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) { super(client, topic, conf, producerCreatedFuture, schema, interceptors); - this.producers = Lists.newArrayListWithCapacity(numPartitions); + this.producers = new ConcurrentOpenHashMap<>(); this.topicMetadata = new TopicMetadataImpl(numPartitions); this.routerPolicy = getMessageRouter(); stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ProducerStatsRecorderImpl() : null; + // MaxPendingMessagesAcrossPartitions doesn't support partial partition such as SinglePartition correctly int maxPendingMessages = Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions); conf.setMaxPendingMessages(maxPendingMessages); - start(); + + final List<Integer> indexList = conf.isLazyStartPartitionedProducers() && + conf.getAccessMode() == ProducerAccessMode.Shared ? Review comment: Can wr convert this expression in a if/else block? Just to ease reading ########## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java ########## @@ -159,7 +170,14 @@ private void start() { return null; }); } + } + private ProducerImpl<T> createProducer(final int partitionIndex) { + return producers.computeIfAbsent(partitionIndex, (idx) -> { + String partitionName = TopicName.get(topic).getPartition(partitionIndex).toString(); Review comment: Use idx and not partitionIndex in order to create a non capturing lambda ########## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java ########## @@ -462,8 +463,98 @@ private void subscribeFailDoesNotFailOtherConsumer(String topic1, String topic2) mockBrokerService.resetHandleSubscribe(); } - // if a producer fails to connect while creating partitioned producer, it should close all successful connections of - // other producers and fail + // failed to connect to partition at initialization step if a producer which connects to broker as lazy-loading mode + @Test + public void testPartitionedProducerFailOnInitialization() throws Throwable { + @Cleanup + PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getHttpAddress()).build(); + final AtomicInteger producerCounter = new AtomicInteger(0); + + mockBrokerService.setHandleProducer((ctx, producer) -> { + if (producerCounter.incrementAndGet() == 1) { + ctx.writeAndFlush(Commands.newError(producer.getRequestId(), ServerError.AuthorizationError, "msg")); + return; + } + ctx.writeAndFlush(Commands.newProducerSuccess(producer.getRequestId(), "default-producer", SchemaVersion.Empty)); + }); + + try { + client.newProducer() + .enableLazyStartPartitionedProducers(true) + .accessMode(ProducerAccessMode.Shared) + .topic("persistent://prop/use/ns/multi-part-t1").create(); + fail("Should have failed with an authorization error"); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException.AuthorizationException); + } + Review comment: Please assert that the counter is 1, in order to verify that the mock took effect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
