This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8453d73 [schema] implement generic schema/record for Schema.JSON (#2497) 8453d73 is described below commit 8453d73e94f0a22de53a4a003c2084e1573ac855 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Tue Sep 4 15:56:07 2018 -0700 [schema] implement generic schema/record for Schema.JSON (#2497) ### Motivation The `GenericSchema` and `GenericRecord` are used for schema auto detection. Currently it only supports Avro. This PR is to support json. ### Changes Introduce generic schema and generic record for json schema --- .../pulsar/client/api/schema/GenericRecord.java | 4 +- .../schema/{ => generic}/GenericAvroRecord.java | 7 +- .../schema/{ => generic}/GenericAvroSchema.java | 33 ++------ .../impl/schema/generic/GenericJsonRecord.java | 74 ++++++++++++++++ .../impl/schema/generic/GenericJsonSchema.java | 63 ++++++++++++++ .../client/impl/schema/generic/GenericSchema.java | 77 +++++++++++++++++ .../impl/schema/generic/GenericSchemaTest.java | 99 ++++++++++++++++++++++ .../pulsar/client/schema/AvroSchemaTest.java | 93 +------------------- .../pulsar/client/schema/JSONSchemaTest.java | 38 ++------- .../pulsar/client/schema/SchemaTestUtils.java | 61 +++++++++++++ .../pulsar/client/impl/PulsarClientImpl.java | 10 +-- 11 files changed, 398 insertions(+), 161 deletions(-) diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java index 0a4fce4..46a49a1 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java @@ -38,7 +38,9 @@ public interface GenericRecord { * @param field the field to retrieve the value * @return the value object */ - Object getField(Field field); + default Object getField(Field field) { + return getField(field.getName()); + } /** * Retrieve the value of the provided <tt>fieldName</tt>. diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java similarity index 94% rename from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java rename to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java index fb65c7a..c9dbeb7 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl.schema; +package org.apache.pulsar.client.impl.schema.generic; import java.util.List; import java.util.stream.Collectors; @@ -49,11 +49,6 @@ class GenericAvroRecord implements GenericRecord { } @Override - public Object getField(Field field) { - return getField(field.getName()); - } - - @Override public Object getField(String fieldName) { Object value = record.get(fieldName); if (value instanceof Utf8) { diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java similarity index 74% rename from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java rename to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java index 4ccfe55..5fe4459 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java @@ -16,58 +16,40 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.client.impl.schema; +package org.apache.pulsar.client.impl.schema.generic; import static com.google.common.base.Preconditions.checkArgument; -import static java.nio.charset.StandardCharsets.UTF_8; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.List; -import java.util.stream.Collectors; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; -import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.schema.SchemaInfo; /** * A generic avro schema. */ -public class GenericAvroSchema implements Schema<GenericRecord> { +class GenericAvroSchema extends GenericSchema { - private final org.apache.avro.Schema schema; - private final List<Field> fields; - private final SchemaInfo schemaInfo; private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter; private BinaryEncoder encoder; private final ByteArrayOutputStream byteArrayOutputStream; private final GenericDatumReader<org.apache.avro.generic.GenericRecord> datumReader; public GenericAvroSchema(SchemaInfo schemaInfo) { - this.schemaInfo = schemaInfo; - this.schema = new org.apache.avro.Schema.Parser().parse( - new String(schemaInfo.getSchema(), UTF_8) - ); - this.fields = schema.getFields() - .stream() - .map(f -> new Field(f.name(), f.pos())) - .collect(Collectors.toList()); + super(schemaInfo); this.byteArrayOutputStream = new ByteArrayOutputStream(); this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder); this.datumWriter = new GenericDatumWriter(schema); this.datumReader = new GenericDatumReader(schema); } - public org.apache.avro.Schema getAvroSchema() { - return schema; - } - @Override public synchronized byte[] encode(GenericRecord message) { checkArgument(message instanceof GenericAvroRecord); @@ -86,17 +68,14 @@ public class GenericAvroSchema implements Schema<GenericRecord> { @Override public GenericRecord decode(byte[] bytes) { try { + Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); org.apache.avro.generic.GenericRecord avroRecord = datumReader.read( null, - DecoderFactory.get().binaryDecoder(bytes, null)); + decoder); return new GenericAvroRecord(schema, fields, avroRecord); } catch (IOException e) { throw new SchemaSerializationException(e); } } - @Override - public SchemaInfo getSchemaInfo() { - return schemaInfo; - } } diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java new file mode 100644 index 0000000..7d13ebc --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; + +/** + * Generic json record. + */ +class GenericJsonRecord implements GenericRecord { + + private final List<Field> fields; + private final JsonNode jn; + + GenericJsonRecord(List<Field> fields, + JsonNode jn) { + this.fields = fields; + this.jn = jn; + } + + JsonNode getJsonNode() { + return jn; + } + + @Override + public List<Field> getFields() { + return fields; + } + + @Override + public Object getField(String fieldName) { + JsonNode fn = jn.get(fieldName); + if (fn.isContainerNode()) { + AtomicInteger idx = new AtomicInteger(0); + List<Field> fields = Lists.newArrayList(fn.fieldNames()) + .stream() + .map(f -> new Field(f, idx.getAndIncrement())) + .collect(Collectors.toList()); + return new GenericJsonRecord(fields, fn); + } else if (fn.isBoolean()) { + return fn.asBoolean(); + } else if (fn.isInt()) { + return fn.asInt(); + } else if (fn.isFloatingPointNumber()) { + return fn.asDouble(); + } else if (fn.isDouble()) { + return fn.asDouble(); + } else { + return fn.asText(); + } + } +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java new file mode 100644 index 0000000..d0f2b54 --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import org.apache.pulsar.client.api.SchemaSerializationException; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; + +/** + * A generic json schema. + */ +class GenericJsonSchema extends GenericSchema { + + private final ObjectMapper objectMapper; + + public GenericJsonSchema(SchemaInfo schemaInfo) { + super(schemaInfo); + this.objectMapper = new ObjectMapper(); + } + + @Override + public byte[] encode(GenericRecord message) { + checkArgument(message instanceof GenericAvroRecord); + GenericJsonRecord gjr = (GenericJsonRecord) message; + try { + return objectMapper.writeValueAsBytes(gjr.getJsonNode().toString()); + } catch (IOException ioe) { + throw new RuntimeException(new SchemaSerializationException(ioe)); + } + } + + @Override + public GenericRecord decode(byte[] bytes) { + try { + JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8)); + return new GenericJsonRecord(fields, jn); + } catch (IOException ioe) { + throw new RuntimeException(new SchemaSerializationException(ioe)); + } + } +} diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java new file mode 100644 index 0000000..76ff3fe --- /dev/null +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import java.util.List; +import java.util.stream.Collectors; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.common.schema.SchemaInfo; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * A generic schema representation. + */ +public abstract class GenericSchema implements Schema<GenericRecord> { + + protected final org.apache.avro.Schema schema; + protected final List<Field> fields; + protected final SchemaInfo schemaInfo; + + protected GenericSchema(SchemaInfo schemaInfo) { + this.schemaInfo = schemaInfo; + this.schema = new org.apache.avro.Schema.Parser().parse( + new String(schemaInfo.getSchema(), UTF_8) + ); + this.fields = schema.getFields() + .stream() + .map(f -> new Field(f.name(), f.pos())) + .collect(Collectors.toList()); + } + + public org.apache.avro.Schema getAvroSchema() { + return schema; + } + + @Override + public SchemaInfo getSchemaInfo() { + return schemaInfo; + } + + /** + * Create a generic schema out of a <tt>SchemaInfo</tt>. + * + * @param schemaInfo schema info + * @return a generic schema instance + */ + public static GenericSchema of(SchemaInfo schemaInfo) { + switch (schemaInfo.getType()) { + case AVRO: + return new GenericAvroSchema(schemaInfo); + case JSON: + return new GenericJsonSchema(schemaInfo); + default: + throw new UnsupportedOperationException("Generic schema is not supported on schema type '" + + schemaInfo.getType() + "'"); + } + } + +} diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java new file mode 100644 index 0000000..f07c928 --- /dev/null +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.AutoSchema; +import org.apache.pulsar.client.schema.SchemaTestUtils.Bar; +import org.apache.pulsar.client.schema.SchemaTestUtils.Foo; +import org.testng.annotations.Test; + +/** + * Unit testing generic schemas. + */ +@Slf4j +public class GenericSchemaTest { + + @Test + public void testGenericAvroSchema() { + Schema<Foo> encodeSchema = Schema.AVRO(Foo.class); + GenericSchema decodeSchema = GenericSchema.of(encodeSchema.getSchemaInfo()); + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + @Test + public void testGenericJsonSchema() { + Schema<Foo> encodeSchema = Schema.JSON(Foo.class); + GenericSchema decodeSchema = GenericSchema.of(encodeSchema.getSchemaInfo()); + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + @Test + public void testAutoAvroSchema() { + Schema<Foo> encodeSchema = Schema.AVRO(Foo.class); + AutoSchema decodeSchema = new AutoSchema(); + decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo())); + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + @Test + public void testAutoJsonSchema() { + Schema<Foo> encodeSchema = Schema.JSON(Foo.class); + AutoSchema decodeSchema = new AutoSchema(); + decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo())); + testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema); + } + + public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema, + Schema<GenericRecord> decodeSchema) { + int numRecords = 10; + for (int i = 0; i < numRecords; i++) { + Foo foo = new Foo(); + foo.setField1("field-1-" + i); + foo.setField2("field-2-" + i); + foo.setField3(i); + Bar bar = new Bar(); + bar.setField1(i % 2 == 0); + foo.setField4(bar); + + byte[] data = encodeSchema.encode(foo); + + log.info("Decoding : {}", new String(data, UTF_8)); + + GenericRecord record = decodeSchema.decode(data); + Object field1 = record.getField("field1"); + assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); + Object field2 = record.getField("field2"); + assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); + Object field3 = record.getField("field3"); + assertEquals(i, field3, "Field 3 is " + field3.getClass()); + Object field4 = record.getField("field4"); + assertTrue(field4 instanceof GenericRecord); + GenericRecord field4Record = (GenericRecord) field4; + assertEquals(i % 2 == 0, field4Record.getField("field1")); + } + } + +} diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java index e5fcc3c..58d1593 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java @@ -18,18 +18,15 @@ */ package org.apache.pulsar.client.schema; +import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS; +import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.GenericAvroSchema; +import org.apache.pulsar.client.schema.SchemaTestUtils.Bar; +import org.apache.pulsar.client.schema.SchemaTestUtils.Foo; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; @@ -37,37 +34,6 @@ import org.testng.annotations.Test; @Slf4j public class AvroSchemaTest { - @Data - @ToString - @EqualsAndHashCode - private static class Foo { - private String field1; - private String field2; - private int field3; - private Bar field4; - } - - @Data - @ToString - @EqualsAndHashCode - private static class Bar { - private boolean field1; - } - - private static final String SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" + - ".pulsar.client" + - ".schema.AvroSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," + - "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}," + - "{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"," + - "\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}"; - - private static String[] FOO_FIELDS = { - "field1", - "field2", - "field3", - "field4" - }; - @Test public void testSchema() { AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class); @@ -113,55 +79,4 @@ public class AvroSchemaTest { assertEquals(object2, foo2); } - @Test - public void testEncodeAndDecodeGenericRecord() { - AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); - GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); - - log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); - - testGenericSchema(avroSchema, genericAvroSchema); - } - - @Test - public void testAutoSchema() { - AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null); - - GenericAvroSchema genericAvroSchema = new GenericAvroSchema(avroSchema.getSchemaInfo()); - - log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema()); - - AutoSchema schema = new AutoSchema(); - schema.setSchema(genericAvroSchema); - - testGenericSchema(avroSchema, schema); - } - - private void testGenericSchema(AvroSchema<Foo> avroSchema, - org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) { - int numRecords = 10; - for (int i = 0; i < numRecords; i++) { - Foo foo = new Foo(); - foo.setField1("field-1-" + i); - foo.setField2("field-2-" + i); - foo.setField3(i); - Bar bar = new Bar(); - bar.setField1(i % 2 == 0); - foo.setField4(bar); - - byte[] data = avroSchema.encode(foo); - - GenericRecord record = genericRecordSchema.decode(data); - Object field1 = record.getField("field1"); - assertEquals("field-1-" + i, field1, "Field 1 is " + field1.getClass()); - Object field2 = record.getField("field2"); - assertEquals("field-2-" + i, field2, "Field 2 is " + field2.getClass()); - Object field3 = record.getField("field3"); - assertEquals(i, field3, "Field 3 is " + field3.getClass()); - Object field4 = record.getField("field4"); - assertTrue(field4 instanceof GenericRecord); - GenericRecord field4Record = (GenericRecord) field4; - assertEquals(i % 2 == 0, field4Record.getField("field1")); - } - } } diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java index 2add60f..7a677f4 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java @@ -18,46 +18,18 @@ */ package org.apache.pulsar.client.schema; -import lombok.Data; -import lombok.EqualsAndHashCode; -import lombok.ToString; +import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS; +import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON; + import org.apache.avro.Schema; import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.schema.SchemaTestUtils.Bar; +import org.apache.pulsar.client.schema.SchemaTestUtils.Foo; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; public class JSONSchemaTest { - @Data - @ToString - @EqualsAndHashCode - private static class Foo { - private String field1; - private String field2; - private int field3; - private Bar field4; - } - - @Data - @ToString - @EqualsAndHashCode - private static class Bar { - private boolean field1; - } - - private static final String SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" + - ".pulsar.client" + - ".schema.JSONSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," + - "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}," + - "{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"," + - "\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}"; - - private static String[] FOO_FIELDS = { - "field1", - "field2", - "field3", - "field4" - }; @Test public void testSchema() { diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java new file mode 100644 index 0000000..52d80eb --- /dev/null +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.schema; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * Utils for testing avro. + */ +public class SchemaTestUtils { + + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + private String field1; + private String field2; + private int field3; + private Bar field4; + } + + @Data + @ToString + @EqualsAndHashCode + public static class Bar { + private boolean field1; + } + + public static final String SCHEMA_JSON = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" + + ".pulsar.client" + + ".schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," + + "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\"," + + "\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}"; + + public static String[] FOO_FIELDS = { + "field1", + "field2", + "field3", + "field4" + }; + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e82cf6c..4013068 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -63,7 +63,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.client.impl.schema.AutoSchema; -import org.apache.pulsar.client.impl.schema.GenericAvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericSchema; import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; @@ -391,10 +391,10 @@ public class PulsarClientImpl implements PulsarClient { return lookup.getSchema(TopicName.get(conf.getSingleTopic())) .thenCompose(schemaInfoOptional -> { if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { - GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get()); log.info("Auto detected schema for topic {} : {}", conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); - autoSchema.setSchema(genericAvroSchema); + autoSchema.setSchema(genericSchema); return doSingleTopicSubscribeAsync(conf, schema); } else { return FutureUtil.failedFuture( @@ -542,10 +542,10 @@ public class PulsarClientImpl implements PulsarClient { return lookup.getSchema(TopicName.get(conf.getTopicName())) .thenCompose(schemaInfoOptional -> { if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) { - GenericAvroSchema genericAvroSchema = new GenericAvroSchema(schemaInfoOptional.get()); + GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get()); log.info("Auto detected schema for topic {} : {}", conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8)); - autoSchema.setSchema(genericAvroSchema); + autoSchema.setSchema(genericSchema); return doCreateReaderAsync(conf, schema); } else { return FutureUtil.failedFuture(