This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 058236c  Upgrade to Apache Avro 1.10.2 (#9898)
058236c is described below

commit 058236c8f0e1a7548b190601939a7e0c805133a2
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Mar 19 11:08:29 2021 +0100

    Upgrade to Apache Avro 1.10.2 (#9898)
---
 distribution/server/src/assemble/LICENSE.bin.txt   |  4 +-
 pom.xml                                            |  2 +-
 .../pulsar/client/impl/schema/AvroSchema.java      | 35 ++++++++-
 .../impl/schema/generic/GenericAvroReader.java     |  4 +-
 .../client/impl/schema/ProtobufSchemaTest.java     | 27 ++++---
 .../client/impl/schema/SchemaBuilderTest.java      | 90 ++++++++++++++--------
 .../pulsar/client/impl/schema/SchemaInfoTest.java  | 56 +++++++-------
 pulsar-sql/presto-distribution/LICENSE             |  4 +-
 .../sql/presto/decoder/avro/TestAvroDecoder.java   | 12 +--
 .../sql/presto/decoder/json/TestJsonDecoder.java   | 12 +--
 10 files changed, 156 insertions(+), 90 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 544f33a..8d86a1f 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -481,8 +481,8 @@ The Apache Software License, Version 2.0
   * Jodah
     - net.jodah-typetools-0.5.0.jar
   * Apache Avro
-    - org.apache.avro-avro-1.9.1.jar
-    - org.apache.avro-avro-protobuf-1.9.1.jar
+    - org.apache.avro-avro-1.10.2.jar
+    - org.apache.avro-avro-protobuf-1.10.2.jar
   * Apache Curator
     - org.apache.curator-curator-client-5.1.0.jar
     - org.apache.curator-curator-framework-5.1.0.jar
diff --git a/pom.xml b/pom.xml
index 2ad4e74..f1a35c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <kafka-client.version>2.3.0</kafka-client.version>
     <rabbitmq-client.version>5.1.1</rabbitmq-client.version>
     <aws-sdk.version>1.11.774</aws-sdk.version>
-    <avro.version>1.9.1</avro.version>
+    <avro.version>1.10.2</avro.version>
     <joda.version>2.10.1</joda.version>
     <jclouds.version>2.2.1</jclouds.version>
     <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index d5a99f8..f4bc833 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl.schema;
 
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.avro.Conversion;
 import org.apache.avro.Conversions;
-import org.apache.avro.data.JodaTimeConversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.data.TimeConversions;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.Schema;
@@ -32,6 +34,8 @@ import 
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
 import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -117,11 +121,38 @@ public class AvroSchema<T> extends 
AvroBaseStructSchema<T> {
         } else {
             try {
                 Class.forName("org.joda.time.DateTime");
-                reflectData.addLogicalTypeConversion(new 
JodaTimeConversions.TimestampConversion());
+                reflectData.addLogicalTypeConversion(new 
TimestampConversion());
             } catch (ClassNotFoundException e) {
                 // Skip if have not provide joda-time dependency.
             }
         }
     }
 
+    public static class TimestampConversion extends Conversion<DateTime> {
+        @Override
+        public Class<DateTime> getConvertedType() {
+            return DateTime.class;
+        }
+
+        @Override
+        public String getLogicalTypeName() {
+            return "timestamp-millis";
+        }
+
+        @Override
+        public DateTime fromLong(Long millisFromEpoch, org.apache.avro.Schema 
schema, LogicalType type) {
+            return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+        }
+
+        @Override
+        public Long toLong(DateTime timestamp, org.apache.avro.Schema schema, 
LogicalType type) {
+            return timestamp.getMillis();
+        }
+
+        @Override
+        public org.apache.avro.Schema getRecommendedSchema() {
+            return 
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG));
+        }
+    }
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index b78b31e..036a6fd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -88,7 +88,7 @@ public class GenericAvroReader implements 
SchemaReader<GenericRecord> {
                     null,
                     decoder);
             return new GenericAvroRecord(schemaVersion, schema, fields, 
avroRecord);
-        } catch (IOException e) {
+        } catch (IOException | IndexOutOfBoundsException e) {
             throw new SchemaSerializationException(e);
         }
     }
@@ -102,7 +102,7 @@ public class GenericAvroReader implements 
SchemaReader<GenericRecord> {
                             null,
                             decoder);
             return new GenericAvroRecord(schemaVersion, schema, fields, 
avroRecord);
-        } catch (IOException e) {
+        } catch (IOException | IndexOutOfBoundsException e) {
             throw new SchemaSerializationException(e);
         } finally {
             try {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index cb8138d..7f3eed1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -38,17 +38,22 @@ public class ProtobufSchemaTest {
     private static final String NAME = "foo";
 
     private static final String EXPECTED_SCHEMA_JSON = 
"{\"type\":\"record\",\"name\":\"TestMessage\"," +
-            
"\"namespace\":\"org.apache.pulsar.client.schema.proto.Test\",\"fields\":[{\"name\":\"stringField\","
 +
-            
"\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},"
 +
-            
"{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\","
 +
-            
"\"type\":\"int\",\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\",\"name\":\"TestEnum\","
 +
-            
"\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\",\"type\":[\"null\","
 +
-            
"{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\",\"type\":{\"type\":\"string\","
 +
-            
"\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\",\"type\":\"double\",\"default\":0}]}],"
 +
-            
"\"default\":null},{\"name\":\"repeatedField\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\","
 +
-            
"\"avro.java.string\":\"String\"}}},{\"name\":\"externalMessage\",\"type\":[\"null\",{\"type\":\"record\","
 +
-            
"\"name\":\"ExternalMessage\",\"namespace\":\"org.apache.pulsar.client.schema.proto.ExternalTest\","
 +
-            
"\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},"
 +
+            "\"namespace\":\"org.apache.pulsar.client.schema.proto.Test\"," +
+            
"\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\"," +
+            
"\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"doubleField\"," 
+
+            
"\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\"," +
+            
"\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\"," +
+            "\"name\":\"TestEnum\",\"symbols\":[\"SHARED\",\"FAILOVER\"]}," +
+            "\"default\":\"SHARED\"},{\"name\":\"nestedField\"," +
+            "\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\"," 
+
+            "\"fields\":[{\"name\":\"foo\",\"type\":{\"type\":\"string\"," +
+            "\"avro.java.string\":\"String\"},\"default\":\"\"}" +
+            ",{\"name\":\"bar\",\"type\":\"double\",\"default\":0}]}]" +
+            
",\"default\":null},{\"name\":\"repeatedField\",\"type\":{\"type\":\"array\"" +
+            
",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},\"default\":[]}"
 +
+            
",{\"name\":\"externalMessage\",\"type\":[\"null\",{\"type\":\"record\"" +
+            
",\"name\":\"ExternalMessage\",\"namespace\":\"org.apache.pulsar.client.schema.proto.ExternalTest\""
 +
+            
",\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},"
 +
             
"\"default\":\"\"},{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
 
     private static final String EXPECTED_PARSING_INFO = 
"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"," +
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index bd88762..fa88e14 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -25,6 +25,7 @@ import lombok.Data;
 import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.Test;
@@ -82,18 +83,18 @@ public class SchemaBuilderTest {
     public void testAllOptionalFieldsSchema() {
         RecordSchemaBuilder recordSchemaBuilder =
             
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest.AllOptionalFields");
+        recordSchemaBuilder.field("boolField")
+                .type(SchemaType.BOOLEAN).optional();
+        recordSchemaBuilder.field("doubleField")
+                .type(SchemaType.DOUBLE).optional();
+        recordSchemaBuilder.field("floatField")
+                .type(SchemaType.FLOAT).optional();
         recordSchemaBuilder.field("intField")
             .type(SchemaType.INT32).optional();
         recordSchemaBuilder.field("longField")
             .type(SchemaType.INT64).optional();
         recordSchemaBuilder.field("stringField")
             .type(SchemaType.STRING).optional();
-        recordSchemaBuilder.field("boolField")
-            .type(SchemaType.BOOLEAN).optional();
-        recordSchemaBuilder.field("floatField")
-            .type(SchemaType.FLOAT).optional();
-        recordSchemaBuilder.field("doubleField")
-            .type(SchemaType.DOUBLE).optional();
         SchemaInfo schemaInfo = recordSchemaBuilder.build(
             SchemaType.AVRO
         );
@@ -115,16 +116,16 @@ public class SchemaBuilderTest {
     public void testAllPrimitiveFieldsSchema() {
         RecordSchemaBuilder recordSchemaBuilder =
             
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest.AllPrimitiveFields");
+        recordSchemaBuilder.field("boolField")
+                .type(SchemaType.BOOLEAN);
+        recordSchemaBuilder.field("doubleField")
+                .type(SchemaType.DOUBLE);
+        recordSchemaBuilder.field("floatField")
+                .type(SchemaType.FLOAT);
         recordSchemaBuilder.field("intField")
             .type(SchemaType.INT32);
         recordSchemaBuilder.field("longField")
             .type(SchemaType.INT64);
-        recordSchemaBuilder.field("boolField")
-            .type(SchemaType.BOOLEAN);
-        recordSchemaBuilder.field("floatField")
-            .type(SchemaType.FLOAT);
-        recordSchemaBuilder.field("doubleField")
-            .type(SchemaType.DOUBLE);
         SchemaInfo schemaInfo = recordSchemaBuilder.build(
             SchemaType.AVRO
         );
@@ -172,8 +173,10 @@ public class SchemaBuilderTest {
 
         // create a POJO schema to deserialize the serialized data
         Schema<AllPrimitiveFields> pojoSchema = 
Schema.AVRO(AllPrimitiveFields.class);
-        AllPrimitiveFields fields = pojoSchema.decode(serializedData);
 
+        injectWriterSchema(pojoSchema, schema);
+
+        AllPrimitiveFields fields = pojoSchema.decode(serializedData);
         assertEquals(32, fields.intField);
         assertEquals(1234L, fields.longField);
         assertEquals(true, fields.boolField);
@@ -206,11 +209,13 @@ public class SchemaBuilderTest {
             .set(schema.getFields().get(3), 0.7f)
             .set(schema.getFields().get(4), 1.34d)
             .build();
-
         byte[] serializedData = schema.encode(record);
 
         // create a POJO schema to deserialize the serialized data
         Schema<AllPrimitiveFields> pojoSchema = 
Schema.AVRO(AllPrimitiveFields.class);
+
+        injectWriterSchema(pojoSchema, schema);
+
         AllPrimitiveFields fields = pojoSchema.decode(serializedData);
 
         assertEquals(32, fields.intField);
@@ -222,7 +227,7 @@ public class SchemaBuilderTest {
 
     @Test
     public void testGenericRecordBuilderAvroByFieldname() {
-        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record("People1");
+        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record(People1.class.getCanonicalName());
         people1SchemaBuilder.field("age").type(SchemaType.INT32);
         people1SchemaBuilder.field("height").type(SchemaType.INT32);
         people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -238,7 +243,7 @@ public class SchemaBuilderTest {
         people1RecordBuilder.set("name", "people1");
         GenericRecord people1GenericRecord = people1RecordBuilder.build();
 
-        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record("People2");
+        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record(People2.class.getCanonicalName());
         people2SchemaBuilder.field("age").type(SchemaType.INT32);
         people2SchemaBuilder.field("height").type(SchemaType.INT32);
         people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -252,7 +257,7 @@ public class SchemaBuilderTest {
         people2RecordBuilder.set("name", "people2");
         GenericRecord people2GenericRecord = people2RecordBuilder.build();
 
-        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record("People");
+        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record(People.class.getCanonicalName());
         peopleSchemaBuilder.field("people1", 
people1Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("people2", 
people2Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -275,8 +280,8 @@ public class SchemaBuilderTest {
         assertEquals((people.getField("name")), peopleRecord.getField("name"));
         
assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
                 people1GenericRecord.getField("age"));
-        
assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
-                people1GenericRecord.getField("heigth"));
+        
assertEquals(((GenericRecord)people.getField("people1")).getField("height"),
+                people1GenericRecord.getField("height"));
         
assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
                 people1GenericRecord.getField("name"));
         
assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
@@ -290,7 +295,7 @@ public class SchemaBuilderTest {
 
     @Test
     public void testGenericRecordBuilderAvroByFieldnamePojo() {
-        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record("People1");
+        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record(People1.class.getCanonicalName());
         people1SchemaBuilder.field("age").type(SchemaType.INT32);
         people1SchemaBuilder.field("height").type(SchemaType.INT32);
         people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -306,7 +311,7 @@ public class SchemaBuilderTest {
         people1RecordBuilder.set("name", "people1");
         GenericRecord people1GenericRecord = people1RecordBuilder.build();
 
-        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record("People2");
+        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record(People2.class.getCanonicalName());
         people2SchemaBuilder.field("age").type(SchemaType.INT32);
         people2SchemaBuilder.field("height").type(SchemaType.INT32);
         people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -320,7 +325,7 @@ public class SchemaBuilderTest {
         people2RecordBuilder.set("name", "people2");
         GenericRecord people2GenericRecord = people2RecordBuilder.build();
 
-        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record("People");
+        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record(People.class.getCanonicalName());
         peopleSchemaBuilder.field("people1", 
people1Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("people2", 
people2Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -339,6 +344,9 @@ public class SchemaBuilderTest {
 
         Schema<People> peopleDecodeSchema = Schema.AVRO(
                 
SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+
+        injectWriterSchema(peopleDecodeSchema, peopleSchema);
+
         People people = peopleDecodeSchema.decode(peopleEncode);
 
         assertEquals(people.name, peopleRecord.getField("name"));
@@ -353,7 +361,7 @@ public class SchemaBuilderTest {
 
     @Test
     public void testGenericRecordBuilderAvroByFieldIndex() {
-        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record("People1");
+        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record(People1.class.getCanonicalName());
         people1SchemaBuilder.field("age").type(SchemaType.INT32);
         people1SchemaBuilder.field("height").type(SchemaType.INT32);
         people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -369,7 +377,7 @@ public class SchemaBuilderTest {
         people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
         GenericRecord people1GenericRecord = people1RecordBuilder.build();
 
-        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record("People2");
+        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record(People2.class.getCanonicalName());
         people2SchemaBuilder.field("age").type(SchemaType.INT32);
         people2SchemaBuilder.field("height").type(SchemaType.INT32);
         people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -383,7 +391,7 @@ public class SchemaBuilderTest {
         people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
         GenericRecord people2GenericRecord = people2RecordBuilder.build();
 
-        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record("People");
+        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record(People.class.getCanonicalName());
         peopleSchemaBuilder.field("people1", 
people1Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("people2", 
people2Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -406,8 +414,8 @@ public class SchemaBuilderTest {
         assertEquals((people.getField("name")), peopleRecord.getField("name"));
         
assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
                 people1GenericRecord.getField("age"));
-        
assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
-                people1GenericRecord.getField("heigth"));
+        
assertEquals(((GenericRecord)people.getField("people1")).getField("height"),
+                people1GenericRecord.getField("height"));
         
assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
                 people1GenericRecord.getField("name"));
         
assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
@@ -421,7 +429,7 @@ public class SchemaBuilderTest {
 
     @Test
     public void testGenericRecordBuilderAvroByFieldIndexPojo() {
-        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record("People1");
+        RecordSchemaBuilder people1SchemaBuilder = 
SchemaBuilder.record(People1.class.getCanonicalName());
         people1SchemaBuilder.field("age").type(SchemaType.INT32);
         people1SchemaBuilder.field("height").type(SchemaType.INT32);
         people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -437,7 +445,7 @@ public class SchemaBuilderTest {
         people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
         GenericRecord people1GenericRecord = people1RecordBuilder.build();
 
-        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record("People2");
+        RecordSchemaBuilder people2SchemaBuilder = 
SchemaBuilder.record(People2.class.getCanonicalName());
         people2SchemaBuilder.field("age").type(SchemaType.INT32);
         people2SchemaBuilder.field("height").type(SchemaType.INT32);
         people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -451,7 +459,7 @@ public class SchemaBuilderTest {
         people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
         GenericRecord people2GenericRecord = people2RecordBuilder.build();
 
-        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record("People");
+        RecordSchemaBuilder peopleSchemaBuilder = 
SchemaBuilder.record(People.class.getCanonicalName());
         peopleSchemaBuilder.field("people1", 
people1Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("people2", 
people2Schema).type(SchemaType.AVRO);
         peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -470,6 +478,9 @@ public class SchemaBuilderTest {
 
         Schema<People> peopleDecodeSchema = Schema.AVRO(
                 
SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+
+        injectWriterSchema(peopleDecodeSchema, peopleSchema);
+
         People people = peopleDecodeSchema.decode(peopleEncode);
 
         assertEquals(people.name, peopleRecord.getField("name"));
@@ -480,4 +491,23 @@ public class SchemaBuilderTest {
         assertEquals(people.getPeople2().height, 
people2GenericRecord.getField("height"));
         assertEquals(people.getPeople2().name, 
people2GenericRecord.getField("name"));
     }
+
+    /**
+     * Set the schema used to encode the data into the schema that we are 
using to read.
+     * Avro is able to decode successfully the record and apply the contents 
to the requested
+     * schema
+     * @param decoder the schema used for reading
+     * @param writer the schema used for writing
+     */
+    private static final void injectWriterSchema(Schema decoder, Schema 
writer) {
+        AvroSchema<?> avroSchema = (AvroSchema<?>) decoder;
+        avroSchema.setReader(new MultiVersionAvroReader<>(
+                AvroSchema.of(SchemaDefinition.
+                        builder()
+                        
.withJsonDef(writer.getSchemaInfo().getSchemaDefinition())
+                        .build())
+                        .getAvroSchema(),
+                Thread.currentThread().getContextClassLoader(),
+                false));
+    }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index 5361060..d187446 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -80,6 +80,20 @@ public class SchemaInfoTest {
         + "    \"namespace\": 
\"org.apache.pulsar.client.impl.schema.SchemaTestUtils\",\n"
         + "    \"fields\": [\n"
         + "      {\n"
+        + "        \"name\": \"color\",\n"
+        + "        \"type\": [\n"
+        + "          \"null\",\n"
+        + "          {\n"
+        + "            \"type\": \"enum\",\n"
+        + "            \"name\": \"Color\",\n"
+        + "            \"symbols\": [\n"
+        + "              \"RED\",\n"
+        + "              \"BLUE\"\n"
+        + "            ]\n"
+        + "          }\n"
+        + "        ]\n"
+        + "      },\n"
+        + "      {\n"
         + "        \"name\": \"field1\",\n"
         + "        \"type\": [\n"
         + "          \"null\",\n"
@@ -114,20 +128,6 @@ public class SchemaInfoTest {
         + "        ]\n"
         + "      },\n"
         + "      {\n"
-        + "        \"name\": \"color\",\n"
-        + "        \"type\": [\n"
-        + "          \"null\",\n"
-        + "          {\n"
-        + "            \"type\": \"enum\",\n"
-        + "            \"name\": \"Color\",\n"
-        + "            \"symbols\": [\n"
-        + "              \"RED\",\n"
-        + "              \"BLUE\"\n"
-        + "            ]\n"
-        + "          }\n"
-        + "        ]\n"
-        + "      },\n"
-        + "      {\n"
         + "        \"name\": \"fieldUnableNull\",\n"
         + "        \"type\": \"string\",\n"
         + "        \"default\": \"defaultValue\"\n"
@@ -155,6 +155,20 @@ public class SchemaInfoTest {
         + "        \"namespace\": 
\"org.apache.pulsar.client.impl.schema.SchemaTestUtils\",\n"
         + "        \"fields\": [\n"
         + "          {\n"
+        + "            \"name\": \"color\",\n"
+        + "            \"type\": [\n"
+        + "              \"null\",\n"
+        + "              {\n"
+        + "                \"type\": \"enum\",\n"
+        + "                \"name\": \"Color\",\n"
+        + "                \"symbols\": [\n"
+        + "                  \"RED\",\n"
+        + "                  \"BLUE\"\n"
+        + "                ]\n"
+        + "              }\n"
+        + "            ]\n"
+        + "          },\n"
+        + "          {\n"
         + "            \"name\": \"field1\",\n"
         + "            \"type\": [\n"
         + "              \"null\",\n"
@@ -189,20 +203,6 @@ public class SchemaInfoTest {
         + "            ]\n"
         + "          },\n"
         + "          {\n"
-        + "            \"name\": \"color\",\n"
-        + "            \"type\": [\n"
-        + "              \"null\",\n"
-        + "              {\n"
-        + "                \"type\": \"enum\",\n"
-        + "                \"name\": \"Color\",\n"
-        + "                \"symbols\": [\n"
-        + "                  \"RED\",\n"
-        + "                  \"BLUE\"\n"
-        + "                ]\n"
-        + "              }\n"
-        + "            ]\n"
-        + "          },\n"
-        + "          {\n"
         + "            \"name\": \"fieldUnableNull\",\n"
         + "            \"type\": \"string\",\n"
         + "            \"default\": \"defaultValue\"\n"
diff --git a/pulsar-sql/presto-distribution/LICENSE 
b/pulsar-sql/presto-distribution/LICENSE
index 29dc534..2dd4213 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -358,8 +358,8 @@ The Apache Software License, Version 2.0
   * Apache XBean :: Reflect
     - xbean-reflect-3.4.jar
   * Avro
-    - avro-1.9.1.jar
-    - avro-protobuf-1.9.1.jar
+    - avro-1.10.2.jar
+    - avro-protobuf-1.10.2.jar
   * Caffeine
     - caffeine-2.6.2.jar
   * Javax
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 39295a9..a03a017 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -140,12 +140,12 @@ public class TestAvroDecoder extends 
AbstractDecoderTester {
 
         Map<DecoderColumnHandle, FieldValueProvider> decodedRow = 
pulsarRowDecoder.decodeRow(payload).get();
         RowType columnType = 
RowType.from(ImmutableList.<RowType.Field>builder()
-                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("intField", INTEGER))
                 .add(RowType.field("nestedRow", 
RowType.from(ImmutableList.<RowType.Field>builder()
-                        .add(RowType.field("stringField", VARCHAR))
                         .add(RowType.field("longField", BIGINT))
+                        .add(RowType.field("stringField", VARCHAR))
                         .build())))
+                .add(RowType.field("stringField", VARCHAR))
                 .build());
 
         PulsarColumnHandle columnHandle = new 
PulsarColumnHandle(getPulsarConnectorId().toString(),
@@ -244,23 +244,23 @@ public class TestAvroDecoder extends 
AbstractDecoderTester {
         Map<DecoderColumnHandle, FieldValueProvider> decodedRow = 
pulsarRowDecoder.decodeRow(payload).get();
 
         RowType columnType = 
RowType.from(ImmutableList.<RowType.Field>builder()
-                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("arrayField", new ArrayType(
                         RowType.from(ImmutableList.<RowType.Field>builder()
-                                .add(RowType.field("stringField", VARCHAR))
                                 .add(RowType.field("longField", BIGINT))
+                                .add(RowType.field("stringField", VARCHAR))
                                 .build()))))
                 .add(RowType.field("mapField", 
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
                         
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
                                 
TypeSignatureParameter.typeParameter(RowType.from(ImmutableList.<RowType.Field>builder()
-                                        .add(RowType.field("stringField", 
VARCHAR))
                                         .add(RowType.field("longField", 
BIGINT))
+                                        .add(RowType.field("stringField", 
VARCHAR))
                                         .build()).getTypeSignature())
                         ))))
                 .add(RowType.field("nestedRow", 
RowType.from(ImmutableList.<RowType.Field>builder()
-                        .add(RowType.field("stringField", VARCHAR))
                         .add(RowType.field("longField", BIGINT))
+                        .add(RowType.field("stringField", VARCHAR))
                         .build())))
+                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("structedField",
                         
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
                                 
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 9f56e30..2a22b58 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -160,12 +160,12 @@ public class TestJsonDecoder extends 
AbstractDecoderTester {
         Map<DecoderColumnHandle, FieldValueProvider> decodedRow = 
pulsarRowDecoder.decodeRow(payload).get();
 
         RowType columnType = 
RowType.from(ImmutableList.<RowType.Field>builder()
-                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("intField", INTEGER))
                 .add(RowType.field("nestedRow", 
RowType.from(ImmutableList.<RowType.Field>builder()
-                        .add(RowType.field("stringField", VARCHAR))
                         .add(RowType.field("longField", BIGINT))
+                        .add(RowType.field("stringField", VARCHAR))
                         .build())))
+                .add(RowType.field("stringField", VARCHAR))
                 .build());
 
         PulsarColumnHandle columnHandle = new 
PulsarColumnHandle(getPulsarConnectorId().toString(), "rowField", columnType, 
false, false, "rowField", null, null, 
PulsarColumnHandle.HandleKeyValueType.NONE);
@@ -239,23 +239,23 @@ public class TestJsonDecoder extends 
AbstractDecoderTester {
         Map<DecoderColumnHandle, FieldValueProvider> decodedRow = 
pulsarRowDecoder.decodeRow(payload).get();
 
         RowType columnType = 
RowType.from(ImmutableList.<RowType.Field>builder()
-                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("arrayField", new ArrayType(
                         RowType.from(ImmutableList.<RowType.Field>builder()
-                                .add(RowType.field("stringField", VARCHAR))
                                 .add(RowType.field("longField", BIGINT))
+                                .add(RowType.field("stringField", VARCHAR))
                                 .build()))))
                 .add(RowType.field("mapField", 
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
                         
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
                                 
TypeSignatureParameter.typeParameter(RowType.from(ImmutableList.<RowType.Field>builder()
-                                        .add(RowType.field("stringField", 
VARCHAR))
                                         .add(RowType.field("longField", 
BIGINT))
+                                        .add(RowType.field("stringField", 
VARCHAR))
                                         .build()).getTypeSignature())
                         ))))
                 .add(RowType.field("nestedRow", 
RowType.from(ImmutableList.<RowType.Field>builder()
-                        .add(RowType.field("stringField", VARCHAR))
                         .add(RowType.field("longField", BIGINT))
+                        .add(RowType.field("stringField", VARCHAR))
                         .build())))
+                .add(RowType.field("stringField", VARCHAR))
                 .add(RowType.field("structedField",
                         
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
                                 
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),

Reply via email to