This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 399a0b10f4a [refactor][java] Improve docs and code quality about 
KeyValueSchema usages (#17256)
399a0b10f4a is described below

commit 399a0b10f4a9854cb1af2875b74a5ce6726e0fb8
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Nov 16 09:31:36 2022 +0800

    [refactor][java] Improve docs and code quality about KeyValueSchema usages 
(#17256)
    
    (cherry picked from commit 5d6a88efa78969073c9dad015b4727b59e8a76d8)
---
 .../java/org/apache/pulsar/client/api/Schema.java  |  6 +-
 .../PulsarClientImplementationBinding.java         |  2 -
 .../PulsarClientImplementationBindingImpl.java     |  4 -
 .../client/impl/TypedMessageBuilderImpl.java       | 98 +++++++++++++---------
 4 files changed, 62 insertions(+), 48 deletions(-)

diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 6803187521a..d23daa5defa 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -367,10 +367,12 @@ public interface Schema<T> extends Cloneable{
     }
 
     /**
-     * Key Value Schema using passed in key and value schemas.
+     * Key Value Schema using passed in key and value schemas with {@link 
KeyValueEncodingType#INLINE} encoding type.
+     *
+     * @see Schema#KeyValue(Schema, Schema, KeyValueEncodingType)
      */
     static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> 
value) {
-        return 
DefaultImplementation.getDefaultImplementation().newKeyValueSchema(key, value);
+        return KeyValue(key, value, KeyValueEncodingType.INLINE);
     }
 
     /**
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 75cd7dc1fee..acafec37a40 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -131,8 +131,6 @@ public interface PulsarClientImplementationBinding {
 
     Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema();
 
-    <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, 
Schema<V> valueSchema);
-
     <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, 
Schema<V> valueSchema,
                                                            
KeyValueEncodingType keyValueEncodingType);
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 2747d39a735..da68d503789 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -213,10 +213,6 @@ public final class PulsarClientImplementationBindingImpl 
implements PulsarClient
         return KeyValueSchemaImpl.kvBytes();
     }
 
-    public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> 
keySchema, Schema<V> valueSchema) {
-        return KeyValueSchemaImpl.of(keySchema, valueSchema);
-    }
-
     public <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> 
keySchema, Schema<V> valueSchema,
                                                     KeyValueEncodingType 
keyValueEncodingType) {
         return KeyValueSchemaImpl.of(keySchema, valueSchema, 
keyValueEncodingType);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 1bf4cd2eb1b..acc60cfcfe7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -27,6 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -35,7 +36,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -107,14 +108,12 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> key(String key) {
-        if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
-            checkArgument(!(kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED),
-                    "This method is not allowed to set keys when in encoding 
type is SEPARATED");
-            if (key == null) {
-                msgMetadata.setNullPartitionKey(true);
-                return this;
-            }
+        getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
+                keyValueSchema.getKeyValueEncodingType() != 
KeyValueEncodingType.SEPARATED,
+                "This method is not allowed to set keys when in encoding type 
is SEPARATED"));
+        if (key == null) {
+            msgMetadata.setNullPartitionKey(true);
+            return this;
         }
         msgMetadata.setPartitionKey(key);
         msgMetadata.setPartitionKeyB64Encoded(false);
@@ -123,14 +122,12 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> keyBytes(byte[] key) {
-        if (schema instanceof KeyValueSchemaImpl && 
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
-            checkArgument(!(kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED),
-                    "This method is not allowed to set keys when in encoding 
type is SEPARATED");
-            if (key == null) {
-                msgMetadata.setNullPartitionKey(true);
-                return this;
-            }
+        getKeyValueSchema().ifPresent(keyValueSchema -> checkArgument(
+                keyValueSchema.getKeyValueEncodingType() != 
KeyValueEncodingType.SEPARATED,
+                "This method is not allowed to set keys when in encoding type 
is SEPARATED"));
+        if (key == null) {
+            msgMetadata.setNullPartitionKey(true);
+            return this;
         }
         msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(key));
         msgMetadata.setPartitionKeyB64Encoded(true);
@@ -149,31 +146,18 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
             msgMetadata.setNullValue(true);
             return this;
         }
-        if (value instanceof org.apache.pulsar.common.schema.KeyValue
-                && schema.getSchemaInfo() != null && 
schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
-            org.apache.pulsar.common.schema.KeyValue kv = 
(org.apache.pulsar.common.schema.KeyValue) value;
-            if (kvSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
-                // set key as the message key
-                if (kv.getKey() != null) {
-                    msgMetadata.setPartitionKey(
-                            
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
-                    msgMetadata.setPartitionKeyB64Encoded(true);
-                } else {
-                    this.msgMetadata.setNullPartitionKey(true);
-                }
-
-                // set value as the payload
-                if (kv.getValue() != null) {
-                    this.content = 
ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
-                } else {
-                    this.msgMetadata.setNullValue(true);
-                }
+
+        return getKeyValueSchema().map(keyValueSchema -> {
+            if (keyValueSchema.getKeyValueEncodingType() == 
KeyValueEncodingType.SEPARATED) {
+                setSeparateKeyValue(value, keyValueSchema);
                 return this;
+            } else {
+                return null;
             }
-        }
-        this.content = ByteBuffer.wrap(schema.encode(value));
-        return this;
+        }).orElseGet(() -> {
+            content = ByteBuffer.wrap(schema.encode(value));
+            return this;
+        });
     }
 
     @Override
@@ -302,4 +286,38 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
     public ByteBuffer getContent() {
         return content;
     }
+
+    private Optional<KeyValueSchema<?, ?>> getKeyValueSchema() {
+        if (schema.getSchemaInfo() != null
+                && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE
+                // The schema's class could also be AutoProduceBytesSchema 
when its type is KEY_VALUE
+                && schema instanceof KeyValueSchema) {
+            return Optional.of((KeyValueSchema<?, ?>) schema);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private <K, V> void setSeparateKeyValue(T value, KeyValueSchema<K, V> 
keyValueSchema) {
+        checkArgument(value instanceof 
org.apache.pulsar.common.schema.KeyValue);
+        org.apache.pulsar.common.schema.KeyValue<K, V> keyValue =
+                (org.apache.pulsar.common.schema.KeyValue<K, V>) value;
+
+        // set key as the message key
+        if (keyValue.getKey() != null) {
+            msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(
+                    keyValueSchema.getKeySchema().encode(keyValue.getKey())));
+            msgMetadata.setPartitionKeyB64Encoded(true);
+        } else {
+            msgMetadata.setNullPartitionKey(true);
+        }
+
+        // set value as the payload
+        if (keyValue.getValue() != null) {
+            content = 
ByteBuffer.wrap(keyValueSchema.getValueSchema().encode(keyValue.getValue()));
+        } else {
+            msgMetadata.setNullValue(true);
+        }
+    }
 }

Reply via email to