This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b92dc2 [Java client] Fix behaviour of Schema.AUTO_CONSUME() with
KeyValueSchema and multi versions (#10492)
9b92dc2 is described below
commit 9b92dc26a6b7fad9e1406ecfb010069119fab7bc
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri May 7 22:56:37 2021 +0200
[Java client] Fix behaviour of Schema.AUTO_CONSUME() with KeyValueSchema
and multi versions (#10492)
Co-authored-by: Enrico Olivelli <[email protected]>
---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 379 ++++++++++++++++-----
.../client/impl/schema/AutoConsumeSchema.java | 2 +-
2 files changed, 291 insertions(+), 90 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 9690fe6..3b976e3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -30,9 +30,11 @@ import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import
org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
@@ -60,17 +62,19 @@ import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Test(groups = "broker-api")
+@Slf4j
public class SimpleSchemaTest extends ProducerConsumerBase {
@DataProvider(name = "batchingModes")
public static Object[][] batchingModes() {
return new Object[][] {
- { true },
- { false }
+ { true },
+ { false }
};
}
@@ -113,7 +117,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
.topic("persistent://my-property/my-ns/my-topic1")
.subscriptionName("my-subscriber-name").subscribe();
Producer<String> producer =
pulsarClient.newProducer(Schema.STRING)
- .topic("persistent://my-property/my-ns/my-topic1").create()) {
+
.topic("persistent://my-property/my-ns/my-topic1").create()) {
int N = 10;
for (int i = 0; i < N; i++) {
@@ -205,14 +209,14 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
p.send("junkdata".getBytes(UTF_8));
} else {
Assert.fail("Shouldn't be able to connect to a schema'd topic
with no schema"
- + " if SchemaValidationEnabled is enabled");
+ + " if SchemaValidationEnabled is enabled");
}
} catch (PulsarClientException e) {
if (schemaValidationEnforced) {
Assert.assertTrue(e instanceof IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
- + " if SchemaValidationEnforced is disabled");
+ + " if SchemaValidationEnforced is disabled");
}
}
@@ -236,10 +240,10 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<V1Data> ignored = pulsarClient.newProducer(v1Schema)
- .topic(topic).create()) {
+ .topic(topic).create()) {
}
try (Producer<V2Data> p =
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
- .topic(topic).create()) {
+ .topic(topic).create()) {
p.send(new V2Data(-1, -1));
}
V1Data dataV1 = new V1Data(2);
@@ -247,14 +251,14 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
byte[] contentV1 = v1Writer.write(dataV1);
byte[] contentV2 = v2Writer.write(dataV2);
try (Producer<byte[]> p =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
- .topic(topic).create();
- Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
- .topic(topic)
-
.subscriptionName("sub1").subscribe()) {
+ .topic(topic).create();
+ Consumer<V2Data> c = pulsarClient.newConsumer(v2Schema)
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
Assert.expectThrows(SchemaSerializationException.class, () ->
p.send(contentV1));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(contentV1).send();
+ .value(contentV1).send();
p.send(contentV2);
Message<V2Data> msg1 = c.receive();
V2Data msg1Value = msg1.getValue();
@@ -270,7 +274,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
p.newMessage(Schema.BYTES).value(contentV1).send();
if (schemaValidationEnforced) {
Assert.fail("Shouldn't be able to send to a schema'd topic
with no schema"
- + " if SchemaValidationEnabled is
enabled");
+ + " if SchemaValidationEnabled is enabled");
}
Message<V2Data> msg3 = c.receive();
Assert.assertEquals(msg3.getSchemaVersion(),
SchemaVersion.Empty.bytes());
@@ -279,7 +283,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Assert.assertTrue(e instanceof
IncompatibleSchemaException);
} else {
Assert.fail("Shouldn't throw IncompatibleSchemaException"
- + " if SchemaValidationEnforced is
disabled");
+ + " if SchemaValidationEnforced is disabled");
}
}
}
@@ -292,10 +296,10 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
V2Data data2 = new V2Data(3, 5);
V1Data data3 = new V1Data(8);
try (Producer<V1Data> p =
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic).create();
+ .topic(topic).create();
Consumer<V2Data> c =
pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
- .topic(topic)
-
.subscriptionName("sub1").subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
p.newMessage().value(data1).send();
p.newMessage(Schema.AVRO(V2Data.class)).value(data2).send();
p.newMessage(Schema.AVRO(V1Data.class)).value(data3).send();
@@ -329,10 +333,10 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
AvroWriter<V2Data> v2Writer = new AvroWriter<>(
new Parser().parse(new ByteArrayInputStream(v2SchemaBytes)));
try (Producer<byte[]> p = pulsarClient.newProducer()
- .topic(topic).create();
+ .topic(topic).create();
Consumer<byte[]> c = pulsarClient.newConsumer()
- .topic(topic)
-
.subscriptionName("sub1").subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1").subscribe()) {
for (int i = 0; i < 2; ++i) {
V1Data dataV1 = new V1Data(i);
V2Data dataV2 = new V2Data(i, -i);
@@ -351,19 +355,19 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
List<SchemaInfo> allSchemas = admin.schemas().getAllSchemas(topic);
Assert.assertEquals(allSchemas, Arrays.asList(v1Schema.getSchemaInfo(),
-
v2Schema.getSchemaInfo()));
+ v2Schema.getSchemaInfo()));
}
@Test
public void newProducerForMessageSchemaWithBatch() throws Exception {
String topic = "my-property/my-ns/schema-test";
Consumer<V2Data> c =
pulsarClient.newConsumer(Schema.AVRO(V2Data.class))
- .topic(topic)
- .subscriptionName("sub1").subscribe();
+ .topic(topic)
+ .subscriptionName("sub1").subscribe();
Producer<byte[]> p =
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
- .topic(topic)
- .enableBatching(true)
- .batchingMaxPublishDelay(10,
TimeUnit.SECONDS).create();
+ .topic(topic)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(10, TimeUnit.SECONDS).create();
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
AvroWriter<V2Data> v2DataAvroWriter = new AvroWriter<>(
@@ -377,17 +381,17 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
if (i / batch % 2 == 0) {
byte[] content = v1DataAvroWriter.write(new V1Data(i));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(content).sendAsync();
+ .value(content).sendAsync();
} else {
byte[] content = v2DataAvroWriter.write(new V2Data(i, i +
total));
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V2Data.class)))
- .value(content).sendAsync();
+ .value(content).sendAsync();
}
if ((i + 1) % incompatible == 0) {
byte[] content = incompatibleDataAvroWriter.write(new
IncompatibleData(-i, -i));
try {
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(IncompatibleData.class)))
- .value(content).send();
+ .value(content).send();
} catch (Exception e) {
Assert.assertTrue(e instanceof
IncompatibleSchemaException, e.getMessage());
}
@@ -412,11 +416,11 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
AvroWriter<V1Data> v1DataAvroWriter = new AvroWriter<>(
ReflectData.AllowNull.get().getSchema(V1Data.class));
try (Producer<byte[]> p = pulsarClient.newProducer()
- .topic(topic)
-
.enableMultiSchema(false).create()) {
+ .topic(topic)
+ .enableMultiSchema(false).create()) {
Assert.assertThrows(InvalidMessageException.class,
() ->
p.newMessage(Schema.AUTO_PRODUCE_BYTES(Schema.AVRO(V1Data.class)))
- .value(v1DataAvroWriter.write(new
V1Data(0))).send());
+ .value(v1DataAvroWriter.write(new
V1Data(0))).send());
}
}
@@ -452,7 +456,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
try (Producer<V1Data> p =
pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topic).create();
Consumer<V1Data> c =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
- .topic(topic).subscriptionName("sub1").subscribe()) {
+ .topic(topic).subscriptionName("sub1").subscribe()) {
V1Data toSend = new V1Data(1);
p.send(toSend);
Assert.assertEquals(toSend, c.receive().getValue());
@@ -484,14 +488,14 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test";
try (Producer<V1Data> p =
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
Consumer<V1Data> c =
pulsarClient.newConsumer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
p.send(new V1Data(1));
Message<V1Data> data = c.receive();
@@ -505,14 +509,14 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test-auto-consume-" +
batching;
try (Producer<V1Data> p =
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
Consumer<GenericRecord> c =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()) {
+ .topic(topic)
+ .subscriptionName("sub1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()) {
int numMessages = 10;
@@ -539,50 +543,55 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-"
+ batching;
Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED);
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED);
try (Producer<KeyValue<V1Data, V1Data>> p =
pulsarClient.newProducer(pojoSchema)
- .topic(topic)
- .enableBatching(batching)
- .create();
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
+ Consumer<GenericRecord> c0 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<GenericRecord, GenericRecord>> c1 =
pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AUTO_CONSUME(),
- Schema.AUTO_CONSUME(),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub1")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub1")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<V1Data, V1Data>> c2 = pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub2")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub2")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<GenericRecord, V1Data>> c3 =
pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AUTO_CONSUME(),
- Schema.AVRO(V1Data.class),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub3")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe();
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub3")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
Consumer<KeyValue<V1Data, GenericRecord>> c4 =
pulsarClient.newConsumer(
- Schema.KeyValue(
- Schema.AVRO(V1Data.class),
- Schema.AUTO_CONSUME(),
- KeyValueEncodingType.SEPARATED))
- .topic(topic)
- .subscriptionName("sub4")
- .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
- .subscribe()
+ Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub4")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe()
) {
int numMessages = 10;
@@ -592,12 +601,22 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
}
p.flush();
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data =
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ c0.acknowledge(wrapper);
+ }
// verify c1
for (int i = 0; i < numMessages; i++) {
Message<KeyValue<GenericRecord, GenericRecord>> data =
c1.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().getField("i"), i *
1000);
+ c1.acknowledge(data);
}
// verify c2
@@ -606,6 +625,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
+ c2.acknowledge(data);
}
// verify c3
@@ -614,6 +634,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().getField("i"), i * 100);
assertEquals(data.getValue().getValue().i, i * 1000);
+ c3.acknowledge(data);
}
// verify c4
@@ -622,6 +643,186 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getKey().i, i * 100);
assertEquals(data.getValue().getValue().getField("i"), i *
1000);
+ c4.acknowledge(data);
+ }
+ }
+
+ // new schema version
+
+ Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V2Data, V2Data>> p =
pulsarClient.newProducer(pojoSchemaV2)
+ .topic(topic)
+ .enableBatching(batching)
+ .create();
+ Consumer<GenericRecord> c0 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+ .subscribe();
+ Consumer<KeyValue<GenericRecord, GenericRecord>> c1 =
pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub1")
+ .subscribe();
+ Consumer<KeyValue<V2Data, V2Data>> c2 = pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub2")
+ .subscribe();
+ Consumer<KeyValue<GenericRecord, V2Data>> c3 =
pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AUTO_CONSUME(),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub3")
+ .subscribe();
+ Consumer<KeyValue<V2Data, GenericRecord>> c4 =
pulsarClient.newConsumer(
+ Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AUTO_CONSUME(),
+ KeyValueEncodingType.SEPARATED))
+ .topic(topic)
+ .subscriptionName("sub4")
+ .subscribe()
+ ) {
+
+ int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++) {
+ p.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new
V2Data(i * 1000, i * 20)));
+ }
+ p.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ KeyValue<GenericRecord, GenericRecord> data =
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ assertEquals(data.getKey().getField("j"), i);
+ assertEquals(data.getValue().getField("j"), i * 20);
+ }
+ // verify c1
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<GenericRecord, GenericRecord>> data =
c1.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getValue().getField("i"), i *
1000);
+ assertEquals(data.getValue().getKey().getField("j"), i);
+ assertEquals(data.getValue().getValue().getField("j"), i * 20);
+ }
+
+ // verify c2
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<V2Data, V2Data>> data = c2.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().i, i * 100);
+ assertEquals(data.getValue().getValue().i, i * 1000);
+ assertEquals(data.getValue().getKey().j, (Integer) i);
+ assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+ }
+
+ // verify c3
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<GenericRecord, V2Data>> data = c3.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getValue().i, i * 1000);
+ assertEquals(data.getValue().getKey().getField("j"), (Integer)
i);
+ assertEquals(data.getValue().getValue().j, (Integer) (i * 20));
+ }
+
+ // verify c4
+ for (int i = 0; i < numMessages; i++) {
+ Message<KeyValue<V2Data, GenericRecord>> data = c4.receive();
+ assertNotNull(data.getSchemaVersion());
+ assertEquals(data.getValue().getKey().i, i * 100);
+ assertEquals(data.getValue().getValue().getField("i"), i *
1000);
+ assertEquals(data.getValue().getKey().j, (Integer) i);
+ assertEquals(data.getValue().getValue().getField("j"), i * 20);
+ }
+ }
+
+ }
+
+ @Test
+ public void testAutoKeyValueConsumeGenericObject() throws Exception {
+ String topic = "my-property/my-ns/schema-test-auto-keyvalue-consume-"+
UUID.randomUUID();
+
+ Schema<KeyValue<V1Data, V1Data>> pojoSchema = Schema.KeyValue(
+ Schema.AVRO(V1Data.class),
+ Schema.AVRO(V1Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V1Data, V1Data>> p =
pulsarClient.newProducer(pojoSchema)
+ .topic(topic)
+ .create();
+ Consumer<GenericRecord> c0 =
pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+ .topic(topic)
+ .subscriptionName("sub0")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscribe();
+ ) {
+
+ int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++) {
+ p.sendAsync(new KeyValue<>(new V1Data(i * 100), new V1Data(i *
1000)));
+ }
+ p.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ log.info("schema version {}",
BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+ KeyValue<GenericRecord, GenericRecord> data =
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ c0.acknowledge(wrapper);
+ }
+
+
+ // new schema version
+
+ Schema<KeyValue<V2Data, V2Data>> pojoSchemaV2 = Schema.KeyValue(
+ Schema.AVRO(V2Data.class),
+ Schema.AVRO(V2Data.class),
+ KeyValueEncodingType.SEPARATED);
+
+ try (Producer<KeyValue<V2Data, V2Data>> p2 =
pulsarClient.newProducer(pojoSchemaV2)
+ .topic(topic)
+ .create();
+ ) {
+
+ for (int i = 0; i < numMessages; i++) {
+ p2.sendAsync(new KeyValue<>(new V2Data(i * 100, i), new
V2Data(i * 1000, i * 20)));
+ }
+ p2.flush();
+
+ // verify c0
+ for (int i = 0; i < numMessages; i++) {
+ Message<GenericRecord> wrapper = c0.receive();
+ log.info("schema version {}",
BytesSchemaVersion.of(wrapper.getSchemaVersion()));
+ KeyValue<GenericRecord, GenericRecord> data =
(KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
+ assertNotNull(wrapper.getSchemaVersion());
+ assertEquals(data.getKey().getField("i"), i * 100);
+ assertEquals(data.getValue().getField("i"), i * 1000);
+ assertEquals(data.getKey().getField("j"), i);
+ assertEquals(data.getValue().getField("j"), i * 20);
+ }
+
}
}
}
@@ -635,12 +836,12 @@ public class SimpleSchemaTest extends
ProducerConsumerBase {
PulsarClientImpl binaryProtocolClient = (PulsarClientImpl)
pulsarClient;
pulsarClient.newProducer(Schema.AVRO(V1Data.class))
- .topic(topic)
- .create();
+ .topic(topic)
+ .create();
pulsarClient.newProducer(Schema.AVRO(V2Data.class))
- .topic(topic)
- .create();
+ .topic(topic)
+ .create();
LookupService httpLookupService = httpProtocolClient.getLookup();
LookupService binaryLookupService = binaryProtocolClient.getLookup();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 62ca026..177e1aa 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -178,7 +178,7 @@ public class AutoConsumeSchema implements
Schema<GenericRecord> {
return LocalDateTimeSchema.of();
case JSON:
case AVRO:
- return GenericSchemaImpl.of(schemaInfo);
+ return GenericSchemaImpl.of(schemaInfo, false);
case PROTOBUF_NATIVE:
return GenericProtobufNativeSchema.of(schemaInfo);
case KEY_VALUE: