This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7bf8fd1f64d [improve][cli] Allow pulsa-client produce to specify
KeyValue schema key (Avro Key support) (#20447)
7bf8fd1f64d is described below
commit 7bf8fd1f64dd0ad5e74271763248cc3404c39782
Author: Nicolò Boschi <[email protected]>
AuthorDate: Thu Jun 29 06:25:51 2023 +0200
[improve][cli] Allow pulsa-client produce to specify KeyValue schema key
(Avro Key support) (#20447)
---
.../pulsar/client/cli/PulsarClientToolTest.java | 157 +++++++++++++++++++++
.../org/apache/pulsar/client/cli/CmdProduce.java | 30 +++-
2 files changed, 184 insertions(+), 3 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index c401f3d0bea..8b32ad906ea 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -20,8 +20,11 @@ package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import java.io.File;
+import java.nio.file.Files;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
@@ -30,18 +33,24 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
import lombok.Cleanup;
+import lombok.NoArgsConstructor;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProxyProtocol;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class PulsarClientToolTest extends BrokerTestBase {
@@ -395,4 +404,152 @@ public class PulsarClientToolTest extends BrokerTestBase {
return String.format("persistent://prop/ns-abc/test/%s-%s",
localNameBase, UUID.randomUUID().toString());
}
+
+ @Test(timeOut = 20000)
+ public void testProducePartitioningKey() throws Exception {
+
+ Properties properties = initializeToolProperties();
+
+ final String topicName = getTopicWithRandomSuffix("key-topic");
+
+ @Cleanup
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ executor.execute(() -> {
+ try {
+ PulsarClientTool pulsarClientToolConsumer = new
PulsarClientTool(properties);
+ String[] args = {"produce", "-m", "test", "-k",
"partition-key1", topicName};
+ Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+ final Message<byte[]> message = consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertTrue(message.hasKey());
+ Assert.assertEquals(message.getKey(), "partition-key1");
+ }
+
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class TestKey {
+ public String key_a;
+ public int key_b;
+
+ }
+
+ @Test
+ public void testProduceKeyValueSchemaInlineValue() throws Exception {
+
+ Properties properties = initializeToolProperties();
+
+ final String topicName = getTopicWithRandomSuffix("key-topic");
+
+
+ @Cleanup
+ Consumer<KeyValue<TestKey, String>> consumer =
pulsarClient.newConsumer(Schema.KeyValue(Schema.JSON(
+ TestKey.class),
Schema.STRING)).topic(topicName).subscriptionName("sub").subscribe();
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ final Schema<TestKey> keySchema = Schema.JSON(TestKey.class);
+
+ executor.execute(() -> {
+ try {
+ PulsarClientTool pulsarClientToolConsumer = new
PulsarClientTool(properties);
+ String[] args = {"produce",
+ "-kvet", "inline",
+ "-ks", String.format("json:%s",
keySchema.getSchemaInfo().getSchemaDefinition()),
+ "-kvk",
ObjectMapperFactory.getMapper().writer().writeValueAsString(new
TestKey("my-key", Integer.MAX_VALUE)),
+ "-vs", "string",
+ "-m", "test",
+ topicName};
+ Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+ final Message<KeyValue<TestKey, String>> message =
consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertFalse(message.hasKey());
+ Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
+ Assert.assertEquals(message.getValue().getKey().key_b,
Integer.MAX_VALUE);
+ Assert.assertEquals(message.getValue().getValue(), "test");
+ }
+
+ @DataProvider(name = "keyValueKeySchema")
+ public static Object[][] keyValueKeySchema() {
+ return new Object[][]{
+ {"json"},
+ {"avro"}
+ };
+ }
+
+ @Test(dataProvider = "keyValueKeySchema")
+ public void testProduceKeyValueSchemaFileValue(String schema) throws
Exception {
+
+ Properties properties = initializeToolProperties();
+
+ final String topicName = getTopicWithRandomSuffix("key-topic");
+
+
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ File file = Files.createTempFile("", "").toFile();
+ final Schema<TestKey> keySchema;
+ if (schema.equals("json")) {
+ keySchema = Schema.JSON(TestKey.class);
+ } else if (schema.equals("avro")) {
+ keySchema = Schema.AVRO(TestKey.class);
+ } else {
+ throw new IllegalStateException();
+ }
+
+
+ Files.write(file.toPath(), keySchema.encode(new TestKey("my-key",
Integer.MAX_VALUE)));
+
+ @Cleanup
+ Consumer<KeyValue<TestKey, String>> consumer =
pulsarClient.newConsumer(Schema.KeyValue(keySchema, Schema.STRING))
+ .topic(topicName).subscriptionName("sub").subscribe();
+
+ executor.execute(() -> {
+ try {
+ PulsarClientTool pulsarClientToolConsumer = new
PulsarClientTool(properties);
+ String[] args = {"produce",
+ "-k", "partitioning-key",
+ "-kvet", "inline",
+ "-ks", String.format("%s:%s", schema,
keySchema.getSchemaInfo().getSchemaDefinition()),
+ "-kvkf", file.getAbsolutePath(),
+ "-vs", "string",
+ "-m", "test",
+ topicName};
+ Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
+ future.complete(null);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ });
+ final Message<KeyValue<TestKey, String>> message =
consumer.receive(10, TimeUnit.SECONDS);
+ assertNotNull(message);
+ // -k should not be considered
+ assertFalse(message.hasKey());
+ Assert.assertEquals(message.getValue().getKey().key_a, "my-key");
+ Assert.assertEquals(message.getValue().getKey().key_b,
Integer.MAX_VALUE);
+ }
+
+ private Properties initializeToolProperties() {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("useTls", "false");
+ return properties;
+ }
+
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index 04d557d2a93..74e4884856d 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -118,8 +118,14 @@ public class CmdProduce {
+ "key=value string, like k1=v1,k2=v2.")
private List<String> properties = new ArrayList<>();
- @Parameter(names = { "-k", "--key"}, description = "message key to add ")
+ @Parameter(names = { "-k", "--key"}, description = "Partitioning key to
add to each message")
private String key;
+ @Parameter(names = { "-kvk", "--key-value-key"}, description = "Value to
add as message key in KeyValue schema")
+ private String keyValueKey;
+ @Parameter(names = { "-kvkf", "--key-value-key-file"},
+ description = "Path to file containing the value to add as message
key in KeyValue schema. "
+ + "JSON and AVRO files are supported.")
+ private String keyValueKeyFile;
@Parameter(names = { "-vs", "--value-schema"}, description = "Schema type
(can be bytes,avro,json,string...)")
private String valueSchema = "bytes";
@@ -268,6 +274,25 @@ public class CmdProduce {
kvMap.put(kv[0], kv[1]);
}
+ final byte[] keyValueKeyBytes;
+ if (this.keyValueKey != null) {
+ if (keyValueEncodingType ==
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
+ throw new ParameterException(
+ "Key value encoding type must be set when using
--key-value-key");
+ }
+ keyValueKeyBytes =
this.keyValueKey.getBytes(StandardCharsets.UTF_8);
+ } else if (this.keyValueKeyFile != null) {
+ if (keyValueEncodingType ==
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
+ throw new ParameterException(
+ "Key value encoding type must be set when using
--key-value-key-file");
+ }
+ keyValueKeyBytes =
Files.readAllBytes(Paths.get(this.keyValueKeyFile));
+ } else if (this.key != null) {
+ keyValueKeyBytes =
this.key.getBytes(StandardCharsets.UTF_8);
+ } else {
+ keyValueKeyBytes = null;
+ }
+
for (int i = 0; i < this.numTimesProduce; i++) {
for (byte[] content : messageBodies) {
if (limiter != null) {
@@ -290,8 +315,7 @@ public class CmdProduce {
case KEY_VALUE_ENCODING_TYPE_SEPARATED:
case KEY_VALUE_ENCODING_TYPE_INLINE:
KeyValue kv = new KeyValue<>(
- // TODO: support AVRO encoded key
- key != null ?
key.getBytes(StandardCharsets.UTF_8) : null,
+ keyValueKeyBytes,
content);
message.value(kv);
break;