This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8453d73  [schema] implement generic schema/record for Schema.JSON 
(#2497)
8453d73 is described below

commit 8453d73e94f0a22de53a4a003c2084e1573ac855
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Tue Sep 4 15:56:07 2018 -0700

    [schema] implement generic schema/record for Schema.JSON (#2497)
    
     ### Motivation
    
    The `GenericSchema` and `GenericRecord` are used for schema auto detection. 
Currently it only supports Avro.
    This PR is to support json.
    
     ### Changes
    
    Introduce generic schema and generic record for json schema
---
 .../pulsar/client/api/schema/GenericRecord.java    |  4 +-
 .../schema/{ => generic}/GenericAvroRecord.java    |  7 +-
 .../schema/{ => generic}/GenericAvroSchema.java    | 33 ++------
 .../impl/schema/generic/GenericJsonRecord.java     | 74 ++++++++++++++++
 .../impl/schema/generic/GenericJsonSchema.java     | 63 ++++++++++++++
 .../client/impl/schema/generic/GenericSchema.java  | 77 +++++++++++++++++
 .../impl/schema/generic/GenericSchemaTest.java     | 99 ++++++++++++++++++++++
 .../pulsar/client/schema/AvroSchemaTest.java       | 93 +-------------------
 .../pulsar/client/schema/JSONSchemaTest.java       | 38 ++-------
 .../pulsar/client/schema/SchemaTestUtils.java      | 61 +++++++++++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 10 +--
 11 files changed, 398 insertions(+), 161 deletions(-)

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
index 0a4fce4..46a49a1 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/schema/GenericRecord.java
@@ -38,7 +38,9 @@ public interface GenericRecord {
      * @param field the field to retrieve the value
      * @return the value object
      */
-    Object getField(Field field);
+    default Object getField(Field field) {
+        return getField(field.getName());
+    }
 
     /**
      * Retrieve the value of the provided <tt>fieldName</tt>.
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
similarity index 94%
rename from 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
rename to 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
index fb65c7a..c9dbeb7 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroRecord.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroRecord.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -49,11 +49,6 @@ class GenericAvroRecord implements GenericRecord {
     }
 
     @Override
-    public Object getField(Field field) {
-        return getField(field.getName());
-    }
-
-    @Override
     public Object getField(String fieldName) {
         Object value = record.get(fieldName);
         if (value instanceof Utf8) {
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
similarity index 74%
rename from 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
rename to 
pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
index 4ccfe55..5fe4459 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/GenericAvroSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java
@@ -16,58 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl.schema;
+package org.apache.pulsar.client.impl.schema.generic;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.List;
-import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
  * A generic avro schema.
  */
-public class GenericAvroSchema implements Schema<GenericRecord> {
+class GenericAvroSchema extends GenericSchema {
 
-    private final org.apache.avro.Schema schema;
-    private final List<Field> fields;
-    private final SchemaInfo schemaInfo;
     private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> 
datumWriter;
     private BinaryEncoder encoder;
     private final ByteArrayOutputStream byteArrayOutputStream;
     private final GenericDatumReader<org.apache.avro.generic.GenericRecord> 
datumReader;
 
     public GenericAvroSchema(SchemaInfo schemaInfo) {
-        this.schemaInfo = schemaInfo;
-        this.schema = new org.apache.avro.Schema.Parser().parse(
-            new String(schemaInfo.getSchema(), UTF_8)
-        );
-        this.fields = schema.getFields()
-            .stream()
-            .map(f -> new Field(f.name(), f.pos()))
-            .collect(Collectors.toList());
+        super(schemaInfo);
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = 
EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);
         this.datumWriter = new GenericDatumWriter(schema);
         this.datumReader = new GenericDatumReader(schema);
     }
 
-    public org.apache.avro.Schema getAvroSchema() {
-        return schema;
-    }
-
     @Override
     public synchronized byte[] encode(GenericRecord message) {
         checkArgument(message instanceof GenericAvroRecord);
@@ -86,17 +68,14 @@ public class GenericAvroSchema implements 
Schema<GenericRecord> {
     @Override
     public GenericRecord decode(byte[] bytes) {
         try {
+            Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
             org.apache.avro.generic.GenericRecord avroRecord = 
datumReader.read(
                 null,
-                DecoderFactory.get().binaryDecoder(bytes, null));
+                decoder);
             return new GenericAvroRecord(schema, fields, avroRecord);
         } catch (IOException e) {
             throw new SchemaSerializationException(e);
         }
     }
 
-    @Override
-    public SchemaInfo getSchemaInfo() {
-        return schemaInfo;
-    }
 }
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
new file mode 100644
index 0000000..7d13ebc
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+
+/**
+ * Generic json record.
+ */
+class GenericJsonRecord implements GenericRecord {
+
+    private final List<Field> fields;
+    private final JsonNode jn;
+
+    GenericJsonRecord(List<Field> fields,
+                      JsonNode jn) {
+        this.fields = fields;
+        this.jn = jn;
+    }
+
+    JsonNode getJsonNode() {
+        return jn;
+    }
+
+    @Override
+    public List<Field> getFields() {
+        return fields;
+    }
+
+    @Override
+    public Object getField(String fieldName) {
+        JsonNode fn = jn.get(fieldName);
+        if (fn.isContainerNode()) {
+            AtomicInteger idx = new AtomicInteger(0);
+            List<Field> fields = Lists.newArrayList(fn.fieldNames())
+                .stream()
+                .map(f -> new Field(f, idx.getAndIncrement()))
+                .collect(Collectors.toList());
+            return new GenericJsonRecord(fields, fn);
+        } else if (fn.isBoolean()) {
+            return fn.asBoolean();
+        } else if (fn.isInt()) {
+            return fn.asInt();
+        } else if (fn.isFloatingPointNumber()) {
+            return fn.asDouble();
+        } else if (fn.isDouble()) {
+            return fn.asDouble();
+        } else {
+            return fn.asText();
+        }
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
new file mode 100644
index 0000000..d0f2b54
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A generic json schema.
+ */
+class GenericJsonSchema extends GenericSchema {
+
+    private final ObjectMapper objectMapper;
+
+    public GenericJsonSchema(SchemaInfo schemaInfo) {
+        super(schemaInfo);
+        this.objectMapper = new ObjectMapper();
+    }
+
+    @Override
+    public byte[] encode(GenericRecord message) {
+        checkArgument(message instanceof GenericAvroRecord);
+        GenericJsonRecord gjr = (GenericJsonRecord) message;
+        try {
+            return 
objectMapper.writeValueAsBytes(gjr.getJsonNode().toString());
+        } catch (IOException ioe) {
+            throw new RuntimeException(new SchemaSerializationException(ioe));
+        }
+    }
+
+    @Override
+    public GenericRecord decode(byte[] bytes) {
+        try {
+            JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
+            return new GenericJsonRecord(fields, jn);
+        } catch (IOException ioe) {
+            throw new RuntimeException(new SchemaSerializationException(ioe));
+        }
+    }
+}
diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
new file mode 100644
index 0000000..76ff3fe
--- /dev/null
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.Field;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * A generic schema representation.
+ */
+public abstract class GenericSchema implements Schema<GenericRecord> {
+
+    protected final org.apache.avro.Schema schema;
+    protected final List<Field> fields;
+    protected final SchemaInfo schemaInfo;
+
+    protected GenericSchema(SchemaInfo schemaInfo) {
+        this.schemaInfo = schemaInfo;
+        this.schema = new org.apache.avro.Schema.Parser().parse(
+            new String(schemaInfo.getSchema(), UTF_8)
+        );
+        this.fields = schema.getFields()
+            .stream()
+            .map(f -> new Field(f.name(), f.pos()))
+            .collect(Collectors.toList());
+    }
+
+    public org.apache.avro.Schema getAvroSchema() {
+        return schema;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
+
+    /**
+     * Create a generic schema out of a <tt>SchemaInfo</tt>.
+     *
+     * @param schemaInfo schema info
+     * @return a generic schema instance
+     */
+    public static GenericSchema of(SchemaInfo schemaInfo) {
+        switch (schemaInfo.getType()) {
+            case AVRO:
+                return new GenericAvroSchema(schemaInfo);
+            case JSON:
+                return new GenericJsonSchema(schemaInfo);
+            default:
+                throw new UnsupportedOperationException("Generic schema is not 
supported on schema type '"
+                    + schemaInfo.getType() + "'");
+        }
+    }
+
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
new file mode 100644
index 0000000..f07c928
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema.generic;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.testng.annotations.Test;
+
+/**
+ * Unit testing generic schemas.
+ */
+@Slf4j
+public class GenericSchemaTest {
+
+    @Test
+    public void testGenericAvroSchema() {
+        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        GenericSchema decodeSchema = 
GenericSchema.of(encodeSchema.getSchemaInfo());
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testGenericJsonSchema() {
+        Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+        GenericSchema decodeSchema = 
GenericSchema.of(encodeSchema.getSchemaInfo());
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testAutoAvroSchema() {
+        Schema<Foo> encodeSchema = Schema.AVRO(Foo.class);
+        AutoSchema decodeSchema = new AutoSchema();
+        decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    @Test
+    public void testAutoJsonSchema() {
+        Schema<Foo> encodeSchema = Schema.JSON(Foo.class);
+        AutoSchema decodeSchema = new AutoSchema();
+        decodeSchema.setSchema(GenericSchema.of(encodeSchema.getSchemaInfo()));
+        testEncodeAndDecodeGenericRecord(encodeSchema, decodeSchema);
+    }
+
+    public void testEncodeAndDecodeGenericRecord(Schema<Foo> encodeSchema,
+                                                 Schema<GenericRecord> 
decodeSchema) {
+        int numRecords = 10;
+        for (int i = 0; i < numRecords; i++) {
+            Foo foo = new Foo();
+            foo.setField1("field-1-" + i);
+            foo.setField2("field-2-" + i);
+            foo.setField3(i);
+            Bar bar = new Bar();
+            bar.setField1(i % 2 == 0);
+            foo.setField4(bar);
+
+            byte[] data = encodeSchema.encode(foo);
+
+            log.info("Decoding : {}", new String(data, UTF_8));
+
+            GenericRecord record = decodeSchema.decode(data);
+            Object field1 = record.getField("field1");
+            assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
+            Object field2 = record.getField("field2");
+            assertEquals("field-2-" + i, field2, "Field 2 is " + 
field2.getClass());
+            Object field3 = record.getField("field3");
+            assertEquals(i, field3, "Field 3 is " + field3.getClass());
+            Object field4 = record.getField("field4");
+            assertTrue(field4 instanceof GenericRecord);
+            GenericRecord field4Record = (GenericRecord) field4;
+            assertEquals(i % 2 == 0, field4Record.getField("field1"));
+        }
+    }
+
+}
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
index e5fcc3c..58d1593 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/AvroSchemaTest.java
@@ -18,18 +18,15 @@
  */
 package org.apache.pulsar.client.schema;
 
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -37,37 +34,6 @@ import org.testng.annotations.Test;
 @Slf4j
 public class AvroSchemaTest {
 
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Foo {
-        private String field1;
-        private String field2;
-        private int field3;
-        private Bar field4;
-    }
-
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Bar {
-        private boolean field1;
-    }
-
-    private static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
-            ".pulsar.client" +
-            
".schema.AvroSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
-            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
-            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
-            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
-    private static String[] FOO_FIELDS = {
-            "field1",
-            "field2",
-            "field3",
-            "field4"
-    };
-
     @Test
     public void testSchema() {
         AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
@@ -113,55 +79,4 @@ public class AvroSchemaTest {
         assertEquals(object2, foo2);
     }
 
-    @Test
-    public void testEncodeAndDecodeGenericRecord() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
-        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
-
-        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
-        testGenericSchema(avroSchema, genericAvroSchema);
-    }
-
-    @Test
-    public void testAutoSchema() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
-
-        GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(avroSchema.getSchemaInfo());
-
-        log.info("Avro Schema : {}", genericAvroSchema.getAvroSchema());
-
-        AutoSchema schema = new AutoSchema();
-        schema.setSchema(genericAvroSchema);
-
-        testGenericSchema(avroSchema, schema);
-    }
-
-    private void testGenericSchema(AvroSchema<Foo> avroSchema,
-                                   
org.apache.pulsar.client.api.Schema<GenericRecord> genericRecordSchema) {
-        int numRecords = 10;
-        for (int i = 0; i < numRecords; i++) {
-            Foo foo = new Foo();
-            foo.setField1("field-1-" + i);
-            foo.setField2("field-2-" + i);
-            foo.setField3(i);
-            Bar bar = new Bar();
-            bar.setField1(i % 2 == 0);
-            foo.setField4(bar);
-
-            byte[] data = avroSchema.encode(foo);
-
-            GenericRecord record = genericRecordSchema.decode(data);
-            Object field1 = record.getField("field1");
-            assertEquals("field-1-" + i, field1, "Field 1 is " + 
field1.getClass());
-            Object field2 = record.getField("field2");
-            assertEquals("field-2-" + i, field2, "Field 2 is " + 
field2.getClass());
-            Object field3 = record.getField("field3");
-            assertEquals(i, field3, "Field 3 is " + field3.getClass());
-            Object field4 = record.getField("field4");
-            assertTrue(field4 instanceof GenericRecord);
-            GenericRecord field4Record = (GenericRecord) field4;
-            assertEquals(i % 2 == 0, field4Record.getField("field1"));
-        }
-    }
 }
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index 2add60f..7a677f4 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,46 +18,18 @@
  */
 package org.apache.pulsar.client.schema;
 
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.FOO_FIELDS;
+import static org.apache.pulsar.client.schema.SchemaTestUtils.SCHEMA_JSON;
+
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class JSONSchemaTest {
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Foo {
-        private String field1;
-        private String field2;
-        private int field3;
-        private Bar field4;
-    }
-
-    @Data
-    @ToString
-    @EqualsAndHashCode
-    private static class Bar {
-        private boolean field1;
-    }
-
-    private static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
-            ".pulsar.client" +
-            
".schema.JSONSchemaTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
-            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
-            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
-            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
-
-    private static String[] FOO_FIELDS = {
-            "field1",
-            "field2",
-            "field3",
-            "field4"
-    };
 
     @Test
     public void testSchema() {
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
new file mode 100644
index 0000000..52d80eb
--- /dev/null
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * Utils for testing avro.
+ */
+public class SchemaTestUtils {
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        private String field1;
+        private String field2;
+        private int field3;
+        private Bar field4;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
+    public static final String SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache" +
+            ".pulsar.client" +
+            
".schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],"
 +
+            
"\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},"
 +
+            
"{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\","
 +
+            
"\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null}]}";
+
+    public static String[] FOO_FIELDS = {
+            "field1",
+            "field2",
+            "field3",
+            "field4"
+    };
+
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index e82cf6c..4013068 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -63,7 +63,7 @@ import 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.impl.schema.AutoSchema;
-import org.apache.pulsar.client.impl.schema.GenericAvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -391,10 +391,10 @@ public class PulsarClientImpl implements PulsarClient {
             return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            GenericSchema genericSchema = 
GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getSingleTopic(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericAvroSchema);
+                            autoSchema.setSchema(genericSchema);
                             return doSingleTopicSubscribeAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(
@@ -542,10 +542,10 @@ public class PulsarClientImpl implements PulsarClient {
             return lookup.getSchema(TopicName.get(conf.getTopicName()))
                     .thenCompose(schemaInfoOptional -> {
                         if (schemaInfoOptional.isPresent() && 
schemaInfoOptional.get().getType() == SchemaType.AVRO) {
-                            GenericAvroSchema genericAvroSchema = new 
GenericAvroSchema(schemaInfoOptional.get());
+                            GenericSchema genericSchema = 
GenericSchema.of(schemaInfoOptional.get());
                             log.info("Auto detected schema for topic {} : {}",
                                 conf.getTopicName(), new 
String(schemaInfoOptional.get().getSchema(), UTF_8));
-                            autoSchema.setSchema(genericAvroSchema);
+                            autoSchema.setSchema(genericSchema);
                             return doCreateReaderAsync(conf, schema);
                         } else {
                             return FutureUtil.failedFuture(

Reply via email to