This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new a46acef36c4 [fix][client] Cache empty schema version in ProducerImpl 
schemaCache. (#19929)
a46acef36c4 is described below

commit a46acef36c48f3972836b333a665e7c522edc5bd
Author: lifepuzzlefun <[email protected]>
AuthorDate: Wed Apr 12 11:00:14 2023 +0800

    [fix][client] Cache empty schema version in ProducerImpl schemaCache. 
(#19929)
    
    Co-authored-by: wangjinlong <[email protected]>
    (cherry picked from commit cff3f9baa03f166697d31ff517f234ecf0fc702d)
---
 .../pulsar/client/api/MockBrokerService.java       |  21 +++++
 .../pulsar/client/api/MockBrokerServiceHooks.java  |   5 ++
 .../client/impl/ProducerEmptySchemaCacheTest.java  | 100 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  23 +++--
 4 files changed, 144 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
index b16fac427aa..e240bd77399 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerService.java
@@ -44,6 +44,7 @@ import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseConsumerH
 import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandCloseProducerHook;
 import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandConnectHook;
 import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandFlowHook;
+import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandGetOrCreateSchemaHook;
 import 
org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandPartitionLookupHook;
 import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandProducerHook;
 import org.apache.pulsar.client.api.MockBrokerServiceHooks.CommandSendHook;
@@ -55,6 +56,7 @@ import 
org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnect;
 import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import 
org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
@@ -77,6 +79,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ *
  */
 public class MockBrokerService {
     private LookupData lookupData;
@@ -244,6 +247,19 @@ public class MockBrokerService {
             
ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
         }
 
+        @Override
+        protected void handleGetOrCreateSchema(CommandGetOrCreateSchema 
commandGetOrCreateSchema) {
+            if (handleGetOrCreateSchema != null) {
+                handleGetOrCreateSchema.apply(ctx, commandGetOrCreateSchema);
+                return;
+            }
+
+            // default
+            ctx.writeAndFlush(
+                    
Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(),
+                            SchemaVersion.Empty));
+        }
+
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
             log.warn("Got exception", cause);
@@ -276,6 +292,7 @@ public class MockBrokerService {
     private CommandUnsubscribeHook handleUnsubscribe = null;
     private CommandCloseProducerHook handleCloseProducer = null;
     private CommandCloseConsumerHook handleCloseConsumer = null;
+    private CommandGetOrCreateSchemaHook handleGetOrCreateSchema = null;
 
     public MockBrokerService() {
         server = new Server(0);
@@ -416,6 +433,10 @@ public class MockBrokerService {
         handleCloseConsumer = hook;
     }
 
+    public void setHandleGetOrCreateSchema(CommandGetOrCreateSchemaHook hook) {
+        handleGetOrCreateSchema = hook;
+    }
+
     public void resetHandleCloseConsumer() {
         handleCloseConsumer = null;
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java
index a68e2d5c32c..3f9f535e796 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MockBrokerServiceHooks.java
@@ -26,6 +26,7 @@ import 
org.apache.pulsar.common.api.proto.CommandCloseConsumer;
 import org.apache.pulsar.common.api.proto.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.CommandConnect;
 import org.apache.pulsar.common.api.proto.CommandFlow;
+import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
 import org.apache.pulsar.common.api.proto.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.CommandProducer;
@@ -77,4 +78,8 @@ public interface MockBrokerServiceHooks {
     interface CommandCloseConsumerHook {
         void apply(ChannelHandlerContext ctx, CommandCloseConsumer 
closeConsumer);
     }
+
+    interface CommandGetOrCreateSchemaHook {
+        void apply(ChannelHandlerContext ctx, CommandGetOrCreateSchema 
closeConsumer);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java
new file mode 100644
index 00000000000..079b8bd8ecc
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerEmptySchemaCacheTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.MockBrokerService;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.testng.Assert.assertEquals;
+
+@Test(groups = "broker-impl")
+public class ProducerEmptySchemaCacheTest {
+
+    MockBrokerService mockBrokerService;
+
+    @BeforeClass(alwaysRun = true)
+    public void setup() {
+        mockBrokerService = new MockBrokerService();
+        mockBrokerService.start();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void teardown() {
+        mockBrokerService.stop();
+    }
+
+    @Test
+    public void testProducerShouldCacheEmptySchema() throws Exception {
+        @Cleanup
+        PulsarClientImpl client = (PulsarClientImpl) PulsarClient.builder()
+                .serviceUrl(mockBrokerService.getBrokerAddress())
+                .build();
+
+        AtomicLong counter = new AtomicLong(0);
+
+        mockBrokerService.setHandleGetOrCreateSchema((ctx, 
commandGetOrCreateSchema) -> {
+            counter.incrementAndGet();
+            ctx.writeAndFlush(
+                    
Commands.newGetOrCreateSchemaResponse(commandGetOrCreateSchema.getRequestId(),
+                            SchemaVersion.Empty));
+        });
+
+        // this schema mode is used in consumer retry and dlq Producer
+        // when the origin consumer has Schema.BYTES schema
+        // and when retry message or dlq message is send
+        // will use typed message builder set Schema.Bytes to send message.
+
+        Schema<byte[]> schema = Schema.BYTES;
+        Schema<byte[]> readerSchema = Schema.BYTES;
+
+        @Cleanup
+        Producer<byte[]> dlqProducer = 
client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
+                .topic("testAutoProduceBytesSchemaShouldCache")
+                .sendTimeout(5, TimeUnit.SECONDS)
+                .maxPendingMessages(0)
+                .enableBatching(false)
+                .create();
+
+        for (int i = 10; i > 0; i--) {
+            TypedMessageBuilder<byte[]> typedMessageBuilderNew =
+                    
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(readerSchema))
+                            .value("hello".getBytes());
+
+            typedMessageBuilderNew.send();
+        }
+
+        // schema should only be requested once.
+        // and if the schemaVersion is empty (e.g. Schema.BYTES)
+        // it should be cached by the client
+        // to avoid continuously send `CommandGetOrCreateSchema` rpc
+        assertEquals(counter.get(), 1);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 486891bbd0e..f2334650ad8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -87,6 +87,7 @@ import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.protocol.schema.SchemaHash;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.DateFormatter;
@@ -670,11 +671,16 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), 
callback, e);
             return false;
         }
+
         byte[] schemaVersion = schemaCache.get(msg.getSchemaHash());
         if (schemaVersion != null) {
-            msgMetadataBuilder.setSchemaVersion(schemaVersion);
+            if (schemaVersion != SchemaVersion.Empty.bytes()) {
+                msgMetadataBuilder.setSchemaVersion(schemaVersion);
+            }
+
             msg.setSchemaState(MessageImpl.SchemaState.Ready);
         }
+
         return true;
     }
 
@@ -683,7 +689,11 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         if (schemaVersion == null) {
             return false;
         }
-        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+
+        if (schemaVersion != SchemaVersion.Empty.bytes()) {
+            msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+        }
+
         msg.setSchemaState(MessageImpl.SchemaState.Ready);
         return true;
     }
@@ -706,12 +716,15 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 }
             } else {
                 log.info("[{}] [{}] GetOrCreateSchema succeed", topic, 
producerName);
-                // In broker, if schema version is an empty byte array, it 
means the topic doesn't have schema. In this
-                // case, we should not cache the schema version so that the 
schema version of the message metadata will
-                // be null, instead of an empty array.
+                // In broker, if schema version is an empty byte array, it 
means the topic doesn't have schema.
+                // In this case, we cache the schema version to 
`SchemaVersion.Empty.bytes()`.
+                // When we need to set the schema version of the message 
metadata,
+                // we should check if the cached schema version is 
`SchemaVersion.Empty.bytes()`
                 if (v.length != 0) {
                     schemaCache.putIfAbsent(msg.getSchemaHash(), v);
                     msg.getMessageBuilder().setSchemaVersion(v);
+                } else {
+                    schemaCache.putIfAbsent(msg.getSchemaHash(), 
SchemaVersion.Empty.bytes());
                 }
                 msg.setSchemaState(MessageImpl.SchemaState.Ready);
             }

Reply via email to