This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 38285385e89235de41087b691ae35f52764d006e Author: Yunze Xu <[email protected]> AuthorDate: Tue Nov 25 18:24:38 2025 +0800 [improve][client] Test no exception could be thrown for invalid epoch in message (#25013) (cherry picked from commit 67dafa19399fd5be28b18b4d320d99cfd93409e1) --- .../apache/pulsar/client/impl/MockMessageTest.java | 113 +++++++++++++++++++++ .../pulsar/client/util/ExecutorProvider.java | 16 ++- 2 files changed, 127 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java new file mode 100644 index 00000000000..92e598e2446 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.Commands; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class MockMessageTest extends ProducerConsumerBase { + + private final Map<Thread, List<Throwable>> threadFailures = new ConcurrentHashMap<>(); + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testMessageWithWrongEpoch() throws Exception { + threadFailures.clear(); + final var conf = new ClientConfigurationData(); + conf.setServiceUrl(pulsar.getBrokerServiceUrl()); + @Cleanup final var client = PulsarClientImpl.builder().conf(conf) + .internalExecutorProvider(new ExecutorProvider(1, "internal", false, + this::newThreadFactory)) + .externalExecutorProvider(new ExecutorProvider(1, "external", false)) + .build(); + + final var topic = "test-message-with-wrong-epoch"; + @Cleanup final var consumer = (ConsumerImpl<byte[]>) client.newConsumer() + .topic(topic).subscriptionName("sub").poolMessages(true).subscribe(); + + final var cnx = consumer.cnx(); + consumer.redeliverUnacknowledgedMessages(); // increase the consumer epoch + Assert.assertEquals(consumer.consumerEpoch, 1L); + final BiConsumer<Long, String> sendMessage = (epoch, value) -> { + cnx.ctx().executor().execute(() -> { + final var cmd = new BaseCommand(); + cmd.copyFrom(Commands.newMessageCommand(consumer.consumerId, 0L, 0L, 0, 0, null, epoch)); + final var metadata = new MessageMetadata().setPublishTime(System.currentTimeMillis()) + .setProducerName("producer").setSequenceId(0).clearNumMessagesInBatch(); + final var buffer = Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, metadata, + Unpooled.wrappedBuffer(value.getBytes())); + cnx.handleMessage(cmd.getMessage(), buffer); + }); + }; + sendMessage.accept(0L, "msg-0"); // 0 is an old epoch that will be rejected + sendMessage.accept(1L, "msg-1"); + + final var msg = consumer.receive(3, TimeUnit.SECONDS); + Assert.assertNotNull(msg); + Assert.assertEquals(msg.getValue(), "msg-1".getBytes(StandardCharsets.UTF_8)); + Assert.assertTrue(threadFailures.isEmpty()); + } + + private ExecutorProvider.ExtendedThreadFactory newThreadFactory(String poolName, boolean daemon) { + return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon) { + + @Override + public Thread newThread(Runnable r) { + final var thread = super.newThread(r); + thread.setUncaughtExceptionHandler((t, e) -> { + log.error("Unexpected exception in {}", t.getName(), e); + threadFailures.computeIfAbsent(t, __ -> new CopyOnWriteArrayList<>()).add(e); + }); + return thread; + } + }; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index 88654c51300..1fa0c166707 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.util; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.List; @@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; @@ -61,13 +63,23 @@ public class ExecutorProvider { } public ExecutorProvider(int numThreads, String poolName) { + this(numThreads, poolName, Thread.currentThread().isDaemon()); + } + + public ExecutorProvider(int numThreads, String poolName, boolean daemon) { + this(numThreads, poolName, daemon, ExtendedThreadFactory::new); + } + + @VisibleForTesting + public ExecutorProvider( + int numThreads, String poolName, boolean daemon, + BiFunction<String/* poolName */, Boolean/* daemon */, ExtendedThreadFactory> threadFactoryCreator) { checkArgument(numThreads > 0); this.numThreads = numThreads; Objects.requireNonNull(poolName); executors = new ArrayList<>(numThreads); for (int i = 0; i < numThreads; i++) { - ExtendedThreadFactory threadFactory = new ExtendedThreadFactory( - poolName, Thread.currentThread().isDaemon()); + ExtendedThreadFactory threadFactory = threadFactoryCreator.apply(poolName, daemon); ExecutorService executor = createExecutor(threadFactory); executors.add(Pair.of(executor, threadFactory)); }
