eolivelli commented on a change in pull request #10279:
URL: https://github.com/apache/pulsar/pull/10279#discussion_r700189337
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
##########
@@ -57,8 +58,17 @@ public void testPartitionedTopicAutoCreation() throws
PulsarAdminException, Puls
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
+ .enableBatching(false)
.create();
+ for (int i = 0; i < 3; i++) {
+ try {
+ producer.newMessage().value("msg".getBytes()).send();
+ } catch (Throwable e) {
Review comment:
the test will fail anyway, no need to catch the exception
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -924,13 +936,25 @@ public void testRemoveMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers,
persistenceTopic);
admin.topics().createPartitionedTopic(persistenceTopic, 2);
+ final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {
+ for (int i = 0; i < 2; i++) {
+ try {
+ p.newMessage().value("msg".getBytes()).send();
+ } catch (Throwable e) {
Review comment:
same here
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -113,25 +126,22 @@ private MessageRouter getMessageRouter() {
@Override
public String getProducerName() {
- return producers.get(0).getProducerName();
+ return producers.get(firstPartitionIndex).getProducerName();
Review comment:
shouldn't be the same producerName for all the producers ?
why not computing the name in the constructor and cache it ?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java
##########
@@ -418,7 +418,11 @@ public void testSeekTimeOnPartitionedTopic() throws
Exception {
long resetTimeInMillis = TimeUnit.SECONDS
.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(resetTimeStr));
admin.topics().createPartitionedTopic(topicName, partitions);
- Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
Review comment:
why do you need to change this test ? (`enableBatching(false)`)
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
##########
@@ -169,6 +186,30 @@ private void start() {
@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message,
Transaction txn) {
+ int partition = routerPolicy.choosePartition(message, topicMetadata);
+ checkArgument(partition >= 0 && partition <
topicMetadata.numPartitions(),
+ "Illegal partition index chosen by the message routing policy:
" + partition);
+
+ if (!producers.containsKey(partition)) {
+ final State createState = createProducer(partition).handle((prod,
createException) -> {
+ if (createException != null) {
+ log.error("[{}] Could not create internal producer.
partitionIndex: {}", topic, partition,
+ createException);
+ try {
+ producers.remove(partition).close();
Review comment:
I mean that we should test the result for "producers.remove(partition)"
for equality ('==') with `prod`
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -894,15 +895,26 @@ public void testGetMaxProducerApplied() throws Exception {
public void testSetMaxProducers() throws Exception {
Integer maxProducers = 2;
log.info("MaxProducers: {} will set to the topic: {}", maxProducers,
persistenceTopic);
+ final Function<Producer<byte[]>, Producer<byte[]>> send = (p) -> {
+ for (int i = 0; i < 2; i++) {
+ try {
+ p.newMessage().value("msg".getBytes()).send();
+ } catch (Throwable e) {
+ log.info("Exception: ", e);
+ fail();
Review comment:
what about
```
catch (Exception e) {
log.info("Exception: ", e);
throw new RuntimeException(e);
}
```
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java
##########
@@ -739,10 +740,14 @@ public void testConcurrentlyDeleteSchema() throws
Exception {
admin.namespaces().createNamespace("prop/ns-delete-schema", 3);
admin.topics().createPartitionedTopic(topic, partitions);
- Producer producer = pulsarClient
+ Producer<Schemas.BytesRecord> producer = pulsarClient
.newProducer(Schema.JSON(Schemas.BytesRecord.class))
.topic(topic)
+ .enableBatching(false)
Review comment:
why do you need to change this test ?
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
##########
@@ -863,7 +863,7 @@ public void
testConsumerDistributionInFailoverSubscriptionWhenUpdatePartitions()
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
- return Integer.parseInt(msg.getKey()) %
metadata.numPartitions();
+ return msg.hasKey() ? Integer.parseInt(msg.getKey()) %
metadata.numPartitions() : 0;
Review comment:
this change looks unrelated to this patch, please revert
##########
File path:
tests/integration/src/test/java/org/apache/pulsar/tests/integration/compaction/TestCompaction.java
##########
@@ -182,7 +182,7 @@ public void
testPublishCompactAndConsumePartitionedTopics(Supplier<String> servi
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata
metadata) {
- return Integer.parseInt(msg.getKey()) %
metadata.numPartitions();
+ return msg.hasKey() ? Integer.parseInt(msg.getKey()) %
metadata.numPartitions() : 0;
Review comment:
why do you need to change this test ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]