gaoran10 commented on code in PR #24488:
URL: https://github.com/apache/pulsar/pull/24488#discussion_r2306786338


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java:
##########
@@ -508,4 +508,5 @@ public void setLookupProperties(Map<String, String> 
lookupProperties) {
     public Map<String, String> getLookupProperties() {
         return (lookupProperties == null) ? Collections.emptyMap() : 
Collections.unmodifiableMap(lookupProperties);
     }
+

Review Comment:
   fixed



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionSchemaInfoProvider.java:
##########
@@ -100,4 +100,5 @@ private CompletableFuture<SchemaInfo> loadSchema(byte[] 
schemaVersion) {
     public PulsarClientImpl getPulsarClient() {
         return pulsarClient;
     }
+

Review Comment:
   fixed



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java:
##########
@@ -158,26 +180,40 @@ public KeyValue<K, V> decode(byte[] bytes, byte[] 
schemaVersion) {
         if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
             throw new SchemaSerializationException("This method cannot be used 
under this SEPARATED encoding type");
         }
-        return KeyValue.decode(bytes, (keyBytes, valueBytes) -> 
decode(keyBytes, valueBytes, schemaVersion));
+        return KeyValue.decode(bytes, (keyBytes, valueBytes) ->
+                decode(null, keyBytes, valueBytes, schemaVersion));
     }
 
     @Override
     public KeyValue<K, V> decode(ByteBuf byteBuf) {
         return decode(ByteBufUtil.getBytes(byteBuf));
     }
 
+    @Override
+    public KeyValue<K, V> decode(String topic, byte[] data, byte[] schemaId) {
+        if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
+            throw new SchemaSerializationException("This method cannot be used 
under this SEPARATED encoding type");
+        }
+        return KeyValue.decode(data, (keyBytes, valueBytes) ->
+                decode(topic, keyBytes, valueBytes, schemaId));
+    }
+
     @Override
     public KeyValue<K, V> decode(ByteBuf byteBuf, byte[] schemaVersion) {
         return decode(ByteBufUtil.getBytes(byteBuf), schemaVersion);
     }
 
-    public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] 
schemaVersion) {
+    public KeyValue<K, V> decode(String topic, byte[] keyBytes, byte[] 
valueBytes, byte[] schemaIdOrVersion) {
         K k;
         if (keyBytes == null) {
             k = null;
         } else {
-            if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
-                k = keySchema.decode(keyBytes, schemaVersion);
+            if (keySchema.supportSchemaVersioning() && schemaIdOrVersion != 
null) {
+                byte[] keySchemaIdOrVersion = schemaIdOrVersion;
+                if 
(SchemaType.EXTERNAL.equals(keySchema.getSchemaInfo().getType())) {
+                    keySchemaIdOrVersion = 
KeyValue.getSchemaId(schemaIdOrVersion, true);
+                }
+                k = keySchema.decode(topic, keyBytes, keySchemaIdOrVersion);

Review Comment:
   fixed



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java:
##########
@@ -505,4 +505,5 @@ public ClientBuilder lookupProperties(Map<String, String> 
properties) {
         conf.setLookupProperties(properties);
         return this;
     }
+

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to