This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf8fd1f64d [improve][cli] Allow pulsa-client produce to specify 
KeyValue schema key (Avro Key support) (#20447)
7bf8fd1f64d is described below

commit 7bf8fd1f64dd0ad5e74271763248cc3404c39782
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Jun 29 06:25:51 2023 +0200

    [improve][cli] Allow pulsa-client produce to specify KeyValue schema key 
(Avro Key support) (#20447)
---
 .../pulsar/client/cli/PulsarClientToolTest.java    | 157 +++++++++++++++++++++
 .../org/apache/pulsar/client/cli/CmdProduce.java   |  30 +++-
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index c401f3d0bea..8b32ad906ea 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.client.cli;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Properties;
 import java.util.UUID;
@@ -30,18 +33,24 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
 import lombok.Cleanup;
+import lombok.NoArgsConstructor;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.ProxyProtocol;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class PulsarClientToolTest extends BrokerTestBase {
@@ -395,4 +404,152 @@ public class PulsarClientToolTest extends BrokerTestBase {
         return String.format("persistent://prop/ns-abc/test/%s-%s", 
localNameBase, UUID.randomUUID().toString());
     }
 
+
+    @Test(timeOut = 20000)
+    public void testProducePartitioningKey() throws Exception {
+
+        Properties properties = initializeToolProperties();
+
+        final String topicName = getTopicWithRandomSuffix("key-topic");
+
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        executor.execute(() -> {
+            try {
+                PulsarClientTool pulsarClientToolConsumer = new 
PulsarClientTool(properties);
+                String[] args = {"produce", "-m", "test", "-k", 
"partition-key1", topicName};
+                Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+                future.complete(null);
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+        final Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
+        assertNotNull(message);
+        assertTrue(message.hasKey());
+        Assert.assertEquals(message.getKey(), "partition-key1");
+    }
+
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class TestKey {
+        public String key_a;
+        public int key_b;
+
+    }
+
+    @Test
+    public void testProduceKeyValueSchemaInlineValue() throws Exception {
+
+        Properties properties = initializeToolProperties();
+
+        final String topicName = getTopicWithRandomSuffix("key-topic");
+
+
+        @Cleanup
+        Consumer<KeyValue<TestKey, String>> consumer = 
pulsarClient.newConsumer(Schema.KeyValue(Schema.JSON(
+                TestKey.class), 
Schema.STRING)).topic(topicName).subscriptionName("sub").subscribe();
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        final Schema<TestKey> keySchema = Schema.JSON(TestKey.class);
+
+        executor.execute(() -> {
+            try {
+                PulsarClientTool pulsarClientToolConsumer = new 
PulsarClientTool(properties);
+                String[] args = {"produce",
+                        "-kvet", "inline",
+                        "-ks", String.format("json:%s", 
keySchema.getSchemaInfo().getSchemaDefinition()),
+                        "-kvk", 
ObjectMapperFactory.getMapper().writer().writeValueAsString(new 
TestKey("my-key", Integer.MAX_VALUE)),
+                        "-vs", "string",
+                        "-m", "test",
+                        topicName};
+                Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+                future.complete(null);
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+        final Message<KeyValue<TestKey, String>> message = 
consumer.receive(10, TimeUnit.SECONDS);
+        assertNotNull(message);
+        assertFalse(message.hasKey());
+        Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
+        Assert.assertEquals(message.getValue().getKey().key_b, 
Integer.MAX_VALUE);
+        Assert.assertEquals(message.getValue().getValue(), "test");
+    }
+
+    @DataProvider(name = "keyValueKeySchema")
+    public static Object[][] keyValueKeySchema() {
+        return new Object[][]{
+                {"json"},
+                {"avro"}
+        };
+    }
+
+    @Test(dataProvider = "keyValueKeySchema")
+    public void testProduceKeyValueSchemaFileValue(String schema) throws 
Exception {
+
+        Properties properties = initializeToolProperties();
+
+        final String topicName = getTopicWithRandomSuffix("key-topic");
+
+
+
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        File file = Files.createTempFile("", "").toFile();
+        final Schema<TestKey> keySchema;
+        if (schema.equals("json")) {
+           keySchema = Schema.JSON(TestKey.class);
+        } else if (schema.equals("avro")) {
+            keySchema = Schema.AVRO(TestKey.class);
+        } else {
+            throw new IllegalStateException();
+        }
+
+
+        Files.write(file.toPath(), keySchema.encode(new TestKey("my-key", 
Integer.MAX_VALUE)));
+
+        @Cleanup
+        Consumer<KeyValue<TestKey, String>> consumer = 
pulsarClient.newConsumer(Schema.KeyValue(keySchema, Schema.STRING))
+                .topic(topicName).subscriptionName("sub").subscribe();
+
+        executor.execute(() -> {
+            try {
+                PulsarClientTool pulsarClientToolConsumer = new 
PulsarClientTool(properties);
+                String[] args = {"produce",
+                        "-k", "partitioning-key",
+                        "-kvet", "inline",
+                        "-ks", String.format("%s:%s", schema, 
keySchema.getSchemaInfo().getSchemaDefinition()),
+                        "-kvkf", file.getAbsolutePath(),
+                        "-vs", "string",
+                        "-m", "test",
+                        topicName};
+                Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+                future.complete(null);
+            } catch (Throwable t) {
+                future.completeExceptionally(t);
+            }
+        });
+        final Message<KeyValue<TestKey, String>> message = 
consumer.receive(10, TimeUnit.SECONDS);
+        assertNotNull(message);
+        // -k should not be considered
+        assertFalse(message.hasKey());
+        Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
+        Assert.assertEquals(message.getValue().getKey().key_b, 
Integer.MAX_VALUE);
+    }
+
+    private Properties initializeToolProperties() {
+        Properties properties = new Properties();
+        properties.setProperty("serviceUrl", brokerUrl.toString());
+        properties.setProperty("useTls", "false");
+        return properties;
+    }
+
 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 04d557d2a93..74e4884856d 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -118,8 +118,14 @@ public class CmdProduce {
             + "key=value string, like k1=v1,k2=v2.")
     private List<String> properties = new ArrayList<>();
 
-    @Parameter(names = { "-k", "--key"}, description = "message key to add ")
+    @Parameter(names = { "-k", "--key"}, description = "Partitioning key to 
add to each message")
     private String key;
+    @Parameter(names = { "-kvk", "--key-value-key"}, description = "Value to 
add as message key in KeyValue schema")
+    private String keyValueKey;
+    @Parameter(names = { "-kvkf", "--key-value-key-file"},
+            description = "Path to file containing the value to add as message 
key in KeyValue schema. "
+            + "JSON and AVRO files are supported.")
+    private String keyValueKeyFile;
 
     @Parameter(names = { "-vs", "--value-schema"}, description = "Schema type 
(can be bytes,avro,json,string...)")
     private String valueSchema = "bytes";
@@ -268,6 +274,25 @@ public class CmdProduce {
                     kvMap.put(kv[0], kv[1]);
                 }
 
+                final byte[] keyValueKeyBytes;
+                if (this.keyValueKey != null) {
+                    if (keyValueEncodingType == 
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
+                        throw new ParameterException(
+                            "Key value encoding type must be set when using 
--key-value-key");
+                    }
+                    keyValueKeyBytes = 
this.keyValueKey.getBytes(StandardCharsets.UTF_8);
+                } else if (this.keyValueKeyFile != null) {
+                    if (keyValueEncodingType == 
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
+                        throw new ParameterException(
+                            "Key value encoding type must be set when using 
--key-value-key-file");
+                    }
+                    keyValueKeyBytes = 
Files.readAllBytes(Paths.get(this.keyValueKeyFile));
+                } else if (this.key != null) {
+                    keyValueKeyBytes = 
this.key.getBytes(StandardCharsets.UTF_8);
+                } else {
+                    keyValueKeyBytes = null;
+                }
+
                 for (int i = 0; i < this.numTimesProduce; i++) {
                     for (byte[] content : messageBodies) {
                         if (limiter != null) {
@@ -290,8 +315,7 @@ public class CmdProduce {
                             case KEY_VALUE_ENCODING_TYPE_SEPARATED:
                             case KEY_VALUE_ENCODING_TYPE_INLINE:
                                 KeyValue kv = new KeyValue<>(
-                                        // TODO: support AVRO encoded key
-                                        key != null ? 
key.getBytes(StandardCharsets.UTF_8) : null,
+                                        keyValueKeyBytes,
                                         content);
                                 message.value(kv);
                                 break;

Reply via email to