This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 337ce5a6ab81d497e4b4a986604ace30bc41c115
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Nov 26 07:38:57 2025 +0200

    [improve] Eliminate unnecessary duplicate schema lookups for partitioned 
topics in client and geo-replication (#25011)
    
    (cherry picked from commit 163f35fd77af5d2ac6191f837a9d726c172fb0bd)
---
 .../pulsar/broker/service/persistent/PersistentReplicator.java    | 8 ++++++--
 .../main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java | 5 +++--
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 4661ece815a..b23f17dfc16 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -30,6 +30,7 @@ import io.netty.util.Recycler.Handle;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -65,6 +66,7 @@ import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.SendCallback;
 import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.stats.Rate;
@@ -78,6 +80,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
 
     protected final PersistentTopic topic;
     protected final ManagedCursor cursor;
+    protected final String localSchemaTopicName;
 
     protected Optional<DispatchRateLimiter> dispatchRateLimiter = 
Optional.empty();
     private final Object dispatchRateLimiterLock = new Object();
@@ -120,7 +123,8 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         super(localCluster, localTopic, remoteCluster, remoteTopic, 
localTopic.getReplicatorPrefix(),
                 brokerService, replicationClient);
         this.topic = localTopic;
-        this.cursor = cursor;
+        this.localSchemaTopicName = 
TopicName.getPartitionedTopicName(localTopicName).toString();
+        this.cursor = Objects.requireNonNull(cursor);
         this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName,
                 Codec.decode(cursor.getName()), cursor, null);
 
@@ -373,7 +377,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
         if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 
0) {
             return CompletableFuture.completedFuture(null);
         }
-        return client.getSchemaProviderLoadingCache().get(localTopicName)
+        return client.getSchemaProviderLoadingCache().get(localSchemaTopicName)
                 .getSchemaByVersion(msg.getSchemaVersion());
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c25d2397229..e2c875f6faa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1297,10 +1297,11 @@ public class PulsarClientImpl implements PulsarClient {
                                                                       String 
topicName) {
         if (schema != null && schema.supportSchemaVersioning()) {
             final SchemaInfoProvider schemaInfoProvider;
+            String schemaTopicName = 
TopicName.getPartitionedTopicName(topicName).toString();
             try {
-                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(topicName);
+                schemaInfoProvider = 
pulsarClientImpl.getSchemaProviderLoadingCache().get(schemaTopicName);
             } catch (ExecutionException e) {
-                log.error("Failed to load schema info provider for topic {}", 
topicName, e);
+                log.error("Failed to load schema info provider for topic {}", 
schemaTopicName, e);
                 return FutureUtil.failedFuture(e.getCause());
             }
             schema = schema.clone();

Reply via email to