This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8453d73 [schema] implement generic schema/record for Schema.JSON
(#2497)
8453d73 is described below
commit 8453d73e94f0a22de53a4a003c2084e1573ac855
Author: Sijie Guo <[email protected]>
AuthorDate: Tue Sep 4 15:56:07 2018 -0700
[schema] implement generic schema/record for Schema.JSON (#2497)
### Motivation
The `GenericSchema` and `GenericRecord` are used for schema auto detection.
Currently it only supports Avro.
This PR is to support json.
### Changes
Introduce generic schema and generic record for json schema
---
.../pulsar/client/api/schema/GenericRecord.java | 4 +-
.../schema/{ => generic}/GenericAvroRecord.java | 7 +-
.../schema/{ => generic}/GenericAvroSchema.java | 33 ++------
.../impl/schema/generic/GenericJsonRecord.java | 74 ++++++++++++++++
.../impl/schema/generic/GenericJsonSchema.java | 63 ++++++++++++++
.../client/impl/schema/generic/GenericSchema.java | 77 +++++++++++++++++
.../impl/schema/generic/GenericSchemaTest.java | 99 ++++++++++++++++++++++
.../pulsar/client/schema/AvroSchemaTest.java | 93 +-------------------
.../pulsar/client/schema/JSONSchemaTest.java | 38 ++-------
.../pulsar/client/schema/SchemaTestUtils.java | 61 +++++++++++++
.../pulsar/client/impl/PulsarClientImpl.java | 10 +--
11 files changed, 398 insertions(+), 161 deletions(-)
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
index 0a4fce4..46a49a1 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -38,7 +38,9 @@ public interface GenericRecord {
* @param field the field to retrieve the value
* @return the value object
*/
- Object getField(Field field);
+ default Object getField(Field field) {
+ return getField(field.getName());
+ }
/**
* Retrieve the value of the provided <tt>fieldName</tt>.
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
similarity index 94%
rename from
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
rename to
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
index fb65c7a..c9dbeb7 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
import java.util.List;
import java.util.stream.Collectors;
@@ -49,11 +49,6 @@ class GenericAvroRecord implements GenericRecord {
}
@Override
- public Object getField(Field field) {
- return getField(field.getName());
- }
-
- @Override
public Object getField(String fieldName) {
Object value = record.get(fieldName);
if (value instanceof Utf8) {
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
similarity index 74%
rename from
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
rename to
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 4ccfe55..5fe4459 100644
---
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -16,58 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
-import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.SchemaInfo;
/**
* A generic avro schema.
*/
-public class GenericAvroSchema implements Schema<GenericRecord> {
+class GenericAvroSchema extends GenericSchema {
- private final org.apache.avro.Schema schema;
- private final List<Field> fields;
- private final SchemaInfo schemaInfo;
private final GenericDatumWriter<org.apache.avro.generic.GenericRecord>
datumWriter;
private BinaryEncoder encoder;
private final ByteArrayOutputStream byteArrayOutputStream;
private final GenericDatumReader<org.apache.avro.generic.GenericRecord>
datumReader;
public GenericAvroSchema(SchemaInfo schemaInfo) {
- this.schemaInfo = schemaInfo;
- this.schema = new org.apache.avro.Schema.Parser().parse(
- new String(schemaInfo.getSchema(), UTF_8)
- );
- this.fields = schema.getFields()
- .stream()
- .map(f -> new Field(f.name(), f.pos()))
- .collect(Collectors.toList());
+ super(schemaInfo);
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder =
EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
this.datumWriter = new GenericDatumWriter(schema);
this.datumReader = new GenericDatumReader(schema);
}
- public org.apache.avro.Schema getAvroSchema() {
- return schema;
- }
-
@Override
public synchronized byte[] encode(GenericRecord message) {
checkArgument(message instanceof GenericAvroRecord);
@@ -86,17 +68,14 @@ public class GenericAvroSchema implements
Schema<GenericRecord> {
@Override
public GenericRecord decode(byte[] bytes) {
try {
+ Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
org.apache.avro.generic.GenericRecord avroRecord =
datumReader.read(
null,
- DecoderFactory.get().binaryDecoder(bytes, null));
+ decoder);
return new GenericAvroRecord(schema, fields, avroRecord);
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
}
- @Override
- public SchemaInfo getSchemaInfo() {
- return schemaInfo;
- }
}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
new file mode 100644
index 0000000..7d13ebc
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+
+/**
+ * Generic json record.
+ */
+class GenericJsonRecord implements GenericRecord {
+
+ private final List<Field> fields;
+ private final JsonNode jn;
+
+ GenericJsonRecord(List<Field> fields,
+ JsonNode jn) {
+ this.fields = fields;
+ this.jn = jn;
+ }
+
+ JsonNode getJsonNode() {
+ return jn;
+ }
+
+ @Override
+ public List<Field> getFields() {
+ return fields;
+ }
+
+ @Override
+ public Object getField(String fieldName) {
+ JsonNode fn = jn.get(fieldName);
+ if (fn.isContainerNode()) {
+ AtomicInteger idx = new AtomicInteger(0);
+ List<Field> fields = Lists.newArrayList(fn.fieldNames())
+ .stream()
+ .map(f -> new Field(f, idx.getAndIncrement()))
+ .collect(Collectors.toList());
+ return new GenericJsonRecord(fields, fn);
+ } else if (fn.isBoolean()) {
+ return fn.asBoolean();
+ } else if (fn.isInt()) {
+ return fn.asInt();
+ } else if (fn.isFloatingPointNumber()) {
+ return fn.asDouble();
+ } else if (fn.isDouble()) {
+ return fn.asDouble();
+ } else {
+ return fn.asText();
+ }
+ }
+}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
new file mode 100644
index 0000000..d0f2b54
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A generic json schema.
+ */
+class GenericJsonSchema extends GenericSchema {
+
+ private final ObjectMapper objectMapper;
+
+ public GenericJsonSchema(SchemaInfo schemaInfo) {
+ super(schemaInfo);
+ this.objectMapper = new ObjectMapper();
+ }
+
+ @Override
+ public byte[] encode(GenericRecord message) {
+ checkArgument(message instanceof GenericAvroRecord);
+ GenericJsonRecord gjr = (GenericJsonRecord) message;
+ try {
+ return
objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
+ } catch (IOException ioe) {
+ throw new RuntimeException(new SchemaSerializationException(ioe));
+ }
+ }
+
+ @Override
+ public GenericRecord decode(byte[] bytes) {
+ try {
+ JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
+ return new GenericJsonRecord(fields, jn);
+ } catch (IOException ioe) {
+ throw new RuntimeException(new SchemaSerializationException(ioe));
+ }
+ }
+}
diff --git
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
new file mode 100644
index 0000000..76ff3fe
--- /dev/null
+++
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * A generic schema representation.
+ */
+public abstract class GenericSchema implements Schema<GenericRecord> {
+
+ protected final org.apache.avro.Schema schema;
+ protected final List<Field> fields;
+ protected final SchemaInfo schemaInfo;
+
+ protected GenericSchema(SchemaInfo schemaInfo) {
+ this.schemaInfo = schemaInfo;
+ this.schema = new org.apache.avro.Schema.Parser().parse(
+ new String(schemaInfo.getSchema(), UTF_8)
+ );
+ this.fields = schema.getFields()
+ .stream()
+ .map(f -> new Field(f.name(), f.pos()))
+ .collect(Collectors.toList());
+ }
+
+ public org.apache.avro.Schema getAvroSchema() {
+ return schema;
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return schemaInfo;
+ }
+
+ /**
+ * Create a generic schema out of a <tt>SchemaInfo</tt>.
+ *
+ * @param schemaInfo schema info
+ * @return a generic schema instance
+ */
+ public static GenericSchema of(SchemaInfo schemaInfo) {
+ switch (schemaInfo.getType()) {
+ case AVRO:
+ return new GenericAvroSchema(schemaInfo);
+ case JSON:
+ return new GenericJsonSchema(schemaInfo);
+ default:
+ throw new UnsupportedOperationException("Generic schema is not
supported on schema type '"
+ + schemaInfo.getType() + "'");
+ }
+ }
+
+}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
new file mode 100644
index 0000000..f07c928
--- /dev/null
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.testng.annotations.Test;
+
+/**
+ * Unit testing generic schemas.
+ */
+@Slf4j
+public class GenericSchemaTest {
+
+ @Test
+ public void testGenericAvroSchema() {
+ Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+ GenericSchema decodeSchema =
GenericSchema.of(encodeSchema.getSchemaInfo());
+ testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ }
+
+ @Test
+ public void testGenericJsonSchema() {
+ Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+ GenericSchema decodeSchema =
GenericSchema.of(encodeSchema.getSchemaInfo());
+ testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ }
+
+ @Test
+ public void testAutoAvroSchema() {
+ Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+ AutoSchema decodeSchema = new AutoSchema();
+ decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+ testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ }
+
+ @Test
+ public void testAutoJsonSchema() {
+ Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+ AutoSchema decodeSchema = new AutoSchema();
+ decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+ testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+ }
+
+ public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+ Schema<GenericRecord>
decodeSchema) {
+ int numRecords = 10;
+ for (int i = 0; i < numRecords; i++) {
+ Foo foo = new Foo();
+ foo.setField1("field-1-" + i);
+ foo.setField2("field-2-" + i);
+ foo.setField3(i);
+ Bar bar = new Bar();
+ bar.setField1(i % 2 == 0);
+ foo.setField4(bar);
+
+ byte[] data = encodeSchema.encode(foo);
+
+ log.info("Decoding : {}", new String(data, UTF_8));
+
+ GenericRecord record = decodeSchema.decode(data);
+ Object field1 = record.getField("field1");
+ assertEquals("field-1-" + i, field1, "Field 1 is " +
field1.getClass());
+ Object field2 = record.getField("field2");
+ assertEquals("field-2-" + i, field2, "Field 2 is " +
field2.getClass());
+ Object field3 = record.getField("field3");
+ assertEquals(i, field3, "Field 3 is " + field3.getClass());
+ Object field4 = record.getField("field4");
+ assertTrue(field4 instanceof GenericRecord);
+ GenericRecord field4Record = (GenericRecord) field4;
+ assertEquals(i % 2 == 0, field4Record.getField("field1"));
+ }
+ }
+
+}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index e5fcc3c..58d1593 100644
---
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -18,18 +18,15 @@
*/
package org.apache.pulsar.client.schema;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -37,37 +34,6 @@ import org.testng.annotations.Test;
@Slf4j
public class AvroSchemaTest {
- @Data
- @ToString
- @EqualsAndHashCode
- private static class Foo {
- private String field1;
- private String field2;
- private int field3;
- private Bar field4;
- }
-
- @Data
- @ToString
- @EqualsAndHashCode
- private static class Bar {
- private boolean field1;
- }
-
- private static final String SCHEMA_JSON =
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
- ".pulsar.client" +
-
".schema.AvroSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
+
-
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
+
-
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
+
-
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
- private static String[] FOO_FIELDS = {
- "field1",
- "field2",
- "field3",
- "field4"
- };
-
@Test
public void testSchema() {
AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
@@ -113,55 +79,4 @@ public class AvroSchemaTest {
assertEquals(object2, foo2);
}
- @Test
- public void testEncodeAndDecodeGenericRecord() {
- AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
- GenericAvroSchema genericAvroSchema = new
GenericAvroSchema(avroSchema.getSchemaInfo());
-
- log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
- testGenericSchema(avroSchema, genericAvroSchema);
- }
-
- @Test
- public void testAutoSchema() {
- AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
-
- GenericAvroSchema genericAvroSchema = new
GenericAvroSchema(avroSchema.getSchemaInfo());
-
- log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
- AutoSchema schema = new AutoSchema();
- schema.setSchema(genericAvroSchema);
-
- testGenericSchema(avroSchema, schema);
- }
-
- private void testGenericSchema(AvroSchema<Foo> avroSchema,
-
org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) {
- int numRecords = 10;
- for (int i = 0; i < numRecords; i++) {
- Foo foo = new Foo();
- foo.setField1("field-1-" + i);
- foo.setField2("field-2-" + i);
- foo.setField3(i);
- Bar bar = new Bar();
- bar.setField1(i % 2 == 0);
- foo.setField4(bar);
-
- byte[] data = avroSchema.encode(foo);
-
- GenericRecord record = genericRecordSchema.decode(data);
- Object field1 = record.getField("field1");
- assertEquals("field-1-" + i, field1, "Field 1 is " +
field1.getClass());
- Object field2 = record.getField("field2");
- assertEquals("field-2-" + i, field2, "Field 2 is " +
field2.getClass());
- Object field3 = record.getField("field3");
- assertEquals(i, field3, "Field 3 is " + field3.getClass());
- Object field4 = record.getField("field4");
- assertTrue(field4 instanceof GenericRecord);
- GenericRecord field4Record = (GenericRecord) field4;
- assertEquals(i % 2 == 0, field4Record.getField("field1"));
- }
- }
}
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index 2add60f..7a677f4 100644
---
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,46 +18,18 @@
*/
package org.apache.pulsar.client.schema;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
+
import org.apache.avro.Schema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;
public class JSONSchemaTest {
- @Data
- @ToString
- @EqualsAndHashCode
- private static class Foo {
- private String field1;
- private String field2;
- private int field3;
- private Bar field4;
- }
-
- @Data
- @ToString
- @EqualsAndHashCode
- private static class Bar {
- private boolean field1;
- }
-
- private static final String SCHEMA_JSON =
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
- ".pulsar.client" +
-
".schema.JSONSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
+
-
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
+
-
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
+
-
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
- private static String[] FOO_FIELDS = {
- "field1",
- "field2",
- "field3",
- "field4"
- };
@Test
public void testSchema() {
diff --git
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
new file mode 100644
index 0000000..52d80eb
--- /dev/null
+++
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Utils for testing avro.
+ */
+public class SchemaTestUtils {
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ private String field1;
+ private String field2;
+ private int field3;
+ private Bar field4;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Bar {
+ private boolean field1;
+ }
+
+ public static final String SCHEMA_JSON =
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
+ ".pulsar.client" +
+
".schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
+
+
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
+
+
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
+
+
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
+
+ public static String[] FOO_FIELDS = {
+ "field1",
+ "field2",
+ "field3",
+ "field4"
+ };
+
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e82cf6c..4013068 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -63,7 +63,7 @@ import
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
@@ -391,10 +391,10 @@ public class PulsarClientImpl implements PulsarClient {
return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() &&
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
- GenericAvroSchema genericAvroSchema = new
GenericAvroSchema(schemaInfoOptional.get());
+ GenericSchema genericSchema =
GenericSchema.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getSingleTopic(), new
String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoSchema.setSchema(genericAvroSchema);
+ autoSchema.setSchema(genericSchema);
return doSingleTopicSubscribeAsync(conf, schema);
} else {
return FutureUtil.failedFuture(
@@ -542,10 +542,10 @@ public class PulsarClientImpl implements PulsarClient {
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() &&
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
- GenericAvroSchema genericAvroSchema = new
GenericAvroSchema(schemaInfoOptional.get());
+ GenericSchema genericSchema =
GenericSchema.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getTopicName(), new
String(schemaInfoOptional.get().getSchema(), UTF_8));
- autoSchema.setSchema(genericAvroSchema);
+ autoSchema.setSchema(genericSchema);
return doCreateReaderAsync(conf, schema);
} else {
return FutureUtil.failedFuture(