This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 31f1ff801fc [improve][client] Test no exception could be thrown for 
invalid epoch in message (#25013)
31f1ff801fc is described below

commit 31f1ff801fc6d82ae704350dd47043793ebabc23
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 25 18:24:38 2025 +0800

    [improve][client] Test no exception could be thrown for invalid epoch in 
message (#25013)
    
    (cherry picked from commit 67dafa19399fd5be28b18b4d320d99cfd93409e1)
---
 .../apache/pulsar/client/impl/MockMessageTest.java | 113 +++++++++++++++++++++
 .../pulsar/client/util/ExecutorProvider.java       |  16 ++-
 2 files changed, 127 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
new file mode 100644
index 00000000000..92e598e2446
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MockMessageTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.buffer.Unpooled;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-impl")
+public class MockMessageTest extends ProducerConsumerBase {
+
+    private final Map<Thread, List<Throwable>> threadFailures = new 
ConcurrentHashMap<>();
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMessageWithWrongEpoch() throws Exception {
+        threadFailures.clear();
+        final var conf = new ClientConfigurationData();
+        conf.setServiceUrl(pulsar.getBrokerServiceUrl());
+        @Cleanup final var client = PulsarClientImpl.builder().conf(conf)
+                .internalExecutorProvider(new ExecutorProvider(1, "internal", 
false,
+                        this::newThreadFactory))
+                .externalExecutorProvider(new ExecutorProvider(1, "external", 
false))
+                .build();
+
+        final var topic = "test-message-with-wrong-epoch";
+        @Cleanup final var consumer = (ConsumerImpl<byte[]>) 
client.newConsumer()
+                
.topic(topic).subscriptionName("sub").poolMessages(true).subscribe();
+
+        final var cnx = consumer.cnx();
+        consumer.redeliverUnacknowledgedMessages(); // increase the consumer 
epoch
+        Assert.assertEquals(consumer.consumerEpoch, 1L);
+        final BiConsumer<Long, String> sendMessage = (epoch, value) -> {
+            cnx.ctx().executor().execute(() -> {
+                final var cmd = new BaseCommand();
+                cmd.copyFrom(Commands.newMessageCommand(consumer.consumerId, 
0L, 0L, 0, 0, null, epoch));
+                final var metadata = new 
MessageMetadata().setPublishTime(System.currentTimeMillis())
+                        
.setProducerName("producer").setSequenceId(0).clearNumMessagesInBatch();
+                final var buffer = 
Commands.serializeMetadataAndPayload(Commands.ChecksumType.None, metadata,
+                        Unpooled.wrappedBuffer(value.getBytes()));
+                cnx.handleMessage(cmd.getMessage(), buffer);
+            });
+        };
+        sendMessage.accept(0L, "msg-0"); // 0 is an old epoch that will be 
rejected
+        sendMessage.accept(1L, "msg-1");
+
+        final var msg = consumer.receive(3, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        Assert.assertEquals(msg.getValue(), 
"msg-1".getBytes(StandardCharsets.UTF_8));
+        Assert.assertTrue(threadFailures.isEmpty());
+    }
+
+    private ExecutorProvider.ExtendedThreadFactory newThreadFactory(String 
poolName, boolean daemon) {
+        return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon) {
+
+            @Override
+            public Thread newThread(Runnable r) {
+                final var thread = super.newThread(r);
+                thread.setUncaughtExceptionHandler((t, e) -> {
+                    log.error("Unexpected exception in {}", t.getName(), e);
+                    threadFailures.computeIfAbsent(t, __ -> new 
CopyOnWriteArrayList<>()).add(e);
+                });
+                return thread;
+            }
+        };
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 88654c51300..1fa0c166707 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -61,13 +63,23 @@ public class ExecutorProvider {
     }
 
     public ExecutorProvider(int numThreads, String poolName) {
+        this(numThreads, poolName, Thread.currentThread().isDaemon());
+    }
+
+    public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
+        this(numThreads, poolName, daemon, ExtendedThreadFactory::new);
+    }
+
+    @VisibleForTesting
+    public ExecutorProvider(
+            int numThreads, String poolName, boolean daemon,
+            BiFunction<String/* poolName */, Boolean/* daemon */, 
ExtendedThreadFactory> threadFactoryCreator) {
         checkArgument(numThreads > 0);
         this.numThreads = numThreads;
         Objects.requireNonNull(poolName);
         executors = new ArrayList<>(numThreads);
         for (int i = 0; i < numThreads; i++) {
-            ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
-                    poolName, Thread.currentThread().isDaemon());
+            ExtendedThreadFactory threadFactory = 
threadFactoryCreator.apply(poolName, daemon);
             ExecutorService executor = createExecutor(threadFactory);
             executors.add(Pair.of(executor, threadFactory));
         }

Reply via email to