This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 058236c Upgrade to Apache Avro 1.10.2 (#9898)
058236c is described below
commit 058236c8f0e1a7548b190601939a7e0c805133a2
Author: Enrico Olivelli <[email protected]>
AuthorDate: Fri Mar 19 11:08:29 2021 +0100
Upgrade to Apache Avro 1.10.2 (#9898)
---
distribution/server/src/assemble/LICENSE.bin.txt | 4 +-
pom.xml | 2 +-
.../pulsar/client/impl/schema/AvroSchema.java | 35 ++++++++-
.../impl/schema/generic/GenericAvroReader.java | 4 +-
.../client/impl/schema/ProtobufSchemaTest.java | 27 ++++---
.../client/impl/schema/SchemaBuilderTest.java | 90 ++++++++++++++--------
.../pulsar/client/impl/schema/SchemaInfoTest.java | 56 +++++++-------
pulsar-sql/presto-distribution/LICENSE | 4 +-
.../sql/presto/decoder/avro/TestAvroDecoder.java | 12 +--
.../sql/presto/decoder/json/TestJsonDecoder.java | 12 +--
10 files changed, 156 insertions(+), 90 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt
b/distribution/server/src/assemble/LICENSE.bin.txt
index 544f33a..8d86a1f 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -481,8 +481,8 @@ The Apache Software License, Version 2.0
* Jodah
- net.jodah-typetools-0.5.0.jar
* Apache Avro
- - org.apache.avro-avro-1.9.1.jar
- - org.apache.avro-avro-protobuf-1.9.1.jar
+ - org.apache.avro-avro-1.10.2.jar
+ - org.apache.avro-avro-protobuf-1.10.2.jar
* Apache Curator
- org.apache.curator-curator-client-5.1.0.jar
- org.apache.curator-curator-framework-5.1.0.jar
diff --git a/pom.xml b/pom.xml
index 2ad4e74..f1a35c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,7 +136,7 @@ flexible messaging model and an intuitive client
API.</description>
<kafka-client.version>2.3.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.774</aws-sdk.version>
- <avro.version>1.9.1</avro.version>
+ <avro.version>1.10.2</avro.version>
<joda.version>2.10.1</joda.version>
<jclouds.version>2.2.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index d5a99f8..f4bc833 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -20,8 +20,10 @@ package org.apache.pulsar.client.impl.schema;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
-import org.apache.avro.data.JodaTimeConversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
@@ -32,6 +34,8 @@ import
org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -117,11 +121,38 @@ public class AvroSchema<T> extends
AvroBaseStructSchema<T> {
} else {
try {
Class.forName("org.joda.time.DateTime");
- reflectData.addLogicalTypeConversion(new
JodaTimeConversions.TimestampConversion());
+ reflectData.addLogicalTypeConversion(new
TimestampConversion());
} catch (ClassNotFoundException e) {
// Skip if have not provide joda-time dependency.
}
}
}
+ public static class TimestampConversion extends Conversion<DateTime> {
+ @Override
+ public Class<DateTime> getConvertedType() {
+ return DateTime.class;
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "timestamp-millis";
+ }
+
+ @Override
+ public DateTime fromLong(Long millisFromEpoch, org.apache.avro.Schema
schema, LogicalType type) {
+ return new DateTime(millisFromEpoch, DateTimeZone.UTC);
+ }
+
+ @Override
+ public Long toLong(DateTime timestamp, org.apache.avro.Schema schema,
LogicalType type) {
+ return timestamp.getMillis();
+ }
+
+ @Override
+ public org.apache.avro.Schema getRecommendedSchema() {
+ return
LogicalTypes.timestampMillis().addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.LONG));
+ }
+ }
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
index b78b31e..036a6fd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroReader.java
@@ -88,7 +88,7 @@ public class GenericAvroReader implements
SchemaReader<GenericRecord> {
null,
decoder);
return new GenericAvroRecord(schemaVersion, schema, fields,
avroRecord);
- } catch (IOException e) {
+ } catch (IOException | IndexOutOfBoundsException e) {
throw new SchemaSerializationException(e);
}
}
@@ -102,7 +102,7 @@ public class GenericAvroReader implements
SchemaReader<GenericRecord> {
null,
decoder);
return new GenericAvroRecord(schemaVersion, schema, fields,
avroRecord);
- } catch (IOException e) {
+ } catch (IOException | IndexOutOfBoundsException e) {
throw new SchemaSerializationException(e);
} finally {
try {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
index cb8138d..7f3eed1 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufSchemaTest.java
@@ -38,17 +38,22 @@ public class ProtobufSchemaTest {
private static final String NAME = "foo";
private static final String EXPECTED_SCHEMA_JSON =
"{\"type\":\"record\",\"name\":\"TestMessage\"," +
-
"\"namespace\":\"org.apache.pulsar.client.schema.proto.Test\",\"fields\":[{\"name\":\"stringField\","
+
-
"\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},"
+
-
"{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\","
+
-
"\"type\":\"int\",\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\",\"name\":\"TestEnum\","
+
-
"\"symbols\":[\"SHARED\",\"FAILOVER\"]},\"default\":\"SHARED\"},{\"name\":\"nestedField\",\"type\":[\"null\","
+
-
"{\"type\":\"record\",\"name\":\"SubMessage\",\"fields\":[{\"name\":\"foo\",\"type\":{\"type\":\"string\","
+
-
"\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"bar\",\"type\":\"double\",\"default\":0}]}],"
+
-
"\"default\":null},{\"name\":\"repeatedField\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\","
+
-
"\"avro.java.string\":\"String\"}}},{\"name\":\"externalMessage\",\"type\":[\"null\",{\"type\":\"record\","
+
-
"\"name\":\"ExternalMessage\",\"namespace\":\"org.apache.pulsar.client.schema.proto.ExternalTest\","
+
-
"\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},"
+
+ "\"namespace\":\"org.apache.pulsar.client.schema.proto.Test\"," +
+
"\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\"," +
+
"\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"doubleField\","
+
+
"\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\"," +
+
"\"default\":0},{\"name\":\"testEnum\",\"type\":{\"type\":\"enum\"," +
+ "\"name\":\"TestEnum\",\"symbols\":[\"SHARED\",\"FAILOVER\"]}," +
+ "\"default\":\"SHARED\"},{\"name\":\"nestedField\"," +
+ "\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubMessage\","
+
+ "\"fields\":[{\"name\":\"foo\",\"type\":{\"type\":\"string\"," +
+ "\"avro.java.string\":\"String\"},\"default\":\"\"}" +
+ ",{\"name\":\"bar\",\"type\":\"double\",\"default\":0}]}]" +
+
",\"default\":null},{\"name\":\"repeatedField\",\"type\":{\"type\":\"array\"" +
+
",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},\"default\":[]}"
+
+
",{\"name\":\"externalMessage\",\"type\":[\"null\",{\"type\":\"record\"" +
+
",\"name\":\"ExternalMessage\",\"namespace\":\"org.apache.pulsar.client.schema.proto.ExternalTest\""
+
+
",\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},"
+
"\"default\":\"\"},{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0}]}],\"default\":null}]}";
private static final String EXPECTED_PARSING_INFO =
"{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\"," +
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index bd88762..fa88e14 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -25,6 +25,7 @@ import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;
@@ -82,18 +83,18 @@ public class SchemaBuilderTest {
public void testAllOptionalFieldsSchema() {
RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest.AllOptionalFields");
+ recordSchemaBuilder.field("boolField")
+ .type(SchemaType.BOOLEAN).optional();
+ recordSchemaBuilder.field("doubleField")
+ .type(SchemaType.DOUBLE).optional();
+ recordSchemaBuilder.field("floatField")
+ .type(SchemaType.FLOAT).optional();
recordSchemaBuilder.field("intField")
.type(SchemaType.INT32).optional();
recordSchemaBuilder.field("longField")
.type(SchemaType.INT64).optional();
recordSchemaBuilder.field("stringField")
.type(SchemaType.STRING).optional();
- recordSchemaBuilder.field("boolField")
- .type(SchemaType.BOOLEAN).optional();
- recordSchemaBuilder.field("floatField")
- .type(SchemaType.FLOAT).optional();
- recordSchemaBuilder.field("doubleField")
- .type(SchemaType.DOUBLE).optional();
SchemaInfo schemaInfo = recordSchemaBuilder.build(
SchemaType.AVRO
);
@@ -115,16 +116,16 @@ public class SchemaBuilderTest {
public void testAllPrimitiveFieldsSchema() {
RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record("org.apache.pulsar.client.impl.schema.SchemaBuilderTest.AllPrimitiveFields");
+ recordSchemaBuilder.field("boolField")
+ .type(SchemaType.BOOLEAN);
+ recordSchemaBuilder.field("doubleField")
+ .type(SchemaType.DOUBLE);
+ recordSchemaBuilder.field("floatField")
+ .type(SchemaType.FLOAT);
recordSchemaBuilder.field("intField")
.type(SchemaType.INT32);
recordSchemaBuilder.field("longField")
.type(SchemaType.INT64);
- recordSchemaBuilder.field("boolField")
- .type(SchemaType.BOOLEAN);
- recordSchemaBuilder.field("floatField")
- .type(SchemaType.FLOAT);
- recordSchemaBuilder.field("doubleField")
- .type(SchemaType.DOUBLE);
SchemaInfo schemaInfo = recordSchemaBuilder.build(
SchemaType.AVRO
);
@@ -172,8 +173,10 @@ public class SchemaBuilderTest {
// create a POJO schema to deserialize the serialized data
Schema<AllPrimitiveFields> pojoSchema =
Schema.AVRO(AllPrimitiveFields.class);
- AllPrimitiveFields fields = pojoSchema.decode(serializedData);
+ injectWriterSchema(pojoSchema, schema);
+
+ AllPrimitiveFields fields = pojoSchema.decode(serializedData);
assertEquals(32, fields.intField);
assertEquals(1234L, fields.longField);
assertEquals(true, fields.boolField);
@@ -206,11 +209,13 @@ public class SchemaBuilderTest {
.set(schema.getFields().get(3), 0.7f)
.set(schema.getFields().get(4), 1.34d)
.build();
-
byte[] serializedData = schema.encode(record);
// create a POJO schema to deserialize the serialized data
Schema<AllPrimitiveFields> pojoSchema =
Schema.AVRO(AllPrimitiveFields.class);
+
+ injectWriterSchema(pojoSchema, schema);
+
AllPrimitiveFields fields = pojoSchema.decode(serializedData);
assertEquals(32, fields.intField);
@@ -222,7 +227,7 @@ public class SchemaBuilderTest {
@Test
public void testGenericRecordBuilderAvroByFieldname() {
- RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record("People1");
+ RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record(People1.class.getCanonicalName());
people1SchemaBuilder.field("age").type(SchemaType.INT32);
people1SchemaBuilder.field("height").type(SchemaType.INT32);
people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -238,7 +243,7 @@ public class SchemaBuilderTest {
people1RecordBuilder.set("name", "people1");
GenericRecord people1GenericRecord = people1RecordBuilder.build();
- RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record("People2");
+ RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record(People2.class.getCanonicalName());
people2SchemaBuilder.field("age").type(SchemaType.INT32);
people2SchemaBuilder.field("height").type(SchemaType.INT32);
people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -252,7 +257,7 @@ public class SchemaBuilderTest {
people2RecordBuilder.set("name", "people2");
GenericRecord people2GenericRecord = people2RecordBuilder.build();
- RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record("People");
+ RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record(People.class.getCanonicalName());
peopleSchemaBuilder.field("people1",
people1Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("people2",
people2Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -275,8 +280,8 @@ public class SchemaBuilderTest {
assertEquals((people.getField("name")), peopleRecord.getField("name"));
assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
people1GenericRecord.getField("age"));
-
assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
- people1GenericRecord.getField("heigth"));
+
assertEquals(((GenericRecord)people.getField("people1")).getField("height"),
+ people1GenericRecord.getField("height"));
assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
people1GenericRecord.getField("name"));
assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
@@ -290,7 +295,7 @@ public class SchemaBuilderTest {
@Test
public void testGenericRecordBuilderAvroByFieldnamePojo() {
- RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record("People1");
+ RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record(People1.class.getCanonicalName());
people1SchemaBuilder.field("age").type(SchemaType.INT32);
people1SchemaBuilder.field("height").type(SchemaType.INT32);
people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -306,7 +311,7 @@ public class SchemaBuilderTest {
people1RecordBuilder.set("name", "people1");
GenericRecord people1GenericRecord = people1RecordBuilder.build();
- RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record("People2");
+ RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record(People2.class.getCanonicalName());
people2SchemaBuilder.field("age").type(SchemaType.INT32);
people2SchemaBuilder.field("height").type(SchemaType.INT32);
people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -320,7 +325,7 @@ public class SchemaBuilderTest {
people2RecordBuilder.set("name", "people2");
GenericRecord people2GenericRecord = people2RecordBuilder.build();
- RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record("People");
+ RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record(People.class.getCanonicalName());
peopleSchemaBuilder.field("people1",
people1Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("people2",
people2Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -339,6 +344,9 @@ public class SchemaBuilderTest {
Schema<People> peopleDecodeSchema = Schema.AVRO(
SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+
+ injectWriterSchema(peopleDecodeSchema, peopleSchema);
+
People people = peopleDecodeSchema.decode(peopleEncode);
assertEquals(people.name, peopleRecord.getField("name"));
@@ -353,7 +361,7 @@ public class SchemaBuilderTest {
@Test
public void testGenericRecordBuilderAvroByFieldIndex() {
- RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record("People1");
+ RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record(People1.class.getCanonicalName());
people1SchemaBuilder.field("age").type(SchemaType.INT32);
people1SchemaBuilder.field("height").type(SchemaType.INT32);
people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -369,7 +377,7 @@ public class SchemaBuilderTest {
people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
GenericRecord people1GenericRecord = people1RecordBuilder.build();
- RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record("People2");
+ RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record(People2.class.getCanonicalName());
people2SchemaBuilder.field("age").type(SchemaType.INT32);
people2SchemaBuilder.field("height").type(SchemaType.INT32);
people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -383,7 +391,7 @@ public class SchemaBuilderTest {
people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
GenericRecord people2GenericRecord = people2RecordBuilder.build();
- RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record("People");
+ RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record(People.class.getCanonicalName());
peopleSchemaBuilder.field("people1",
people1Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("people2",
people2Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -406,8 +414,8 @@ public class SchemaBuilderTest {
assertEquals((people.getField("name")), peopleRecord.getField("name"));
assertEquals(((GenericRecord)people.getField("people1")).getField("age"),
people1GenericRecord.getField("age"));
-
assertEquals(((GenericRecord)people.getField("people1")).getField("heigth"),
- people1GenericRecord.getField("heigth"));
+
assertEquals(((GenericRecord)people.getField("people1")).getField("height"),
+ people1GenericRecord.getField("height"));
assertEquals(((GenericRecord)people.getField("people1")).getField("name"),
people1GenericRecord.getField("name"));
assertEquals(((GenericRecord)people.getField("people2")).getField("age"),
@@ -421,7 +429,7 @@ public class SchemaBuilderTest {
@Test
public void testGenericRecordBuilderAvroByFieldIndexPojo() {
- RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record("People1");
+ RecordSchemaBuilder people1SchemaBuilder =
SchemaBuilder.record(People1.class.getCanonicalName());
people1SchemaBuilder.field("age").type(SchemaType.INT32);
people1SchemaBuilder.field("height").type(SchemaType.INT32);
people1SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -437,7 +445,7 @@ public class SchemaBuilderTest {
people1RecordBuilder.set(people1Schema.getFields().get(2), "people1");
GenericRecord people1GenericRecord = people1RecordBuilder.build();
- RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record("People2");
+ RecordSchemaBuilder people2SchemaBuilder =
SchemaBuilder.record(People2.class.getCanonicalName());
people2SchemaBuilder.field("age").type(SchemaType.INT32);
people2SchemaBuilder.field("height").type(SchemaType.INT32);
people2SchemaBuilder.field("name").type(SchemaType.STRING);
@@ -451,7 +459,7 @@ public class SchemaBuilderTest {
people2RecordBuilder.set(people2Schema.getFields().get(2), "people2");
GenericRecord people2GenericRecord = people2RecordBuilder.build();
- RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record("People");
+ RecordSchemaBuilder peopleSchemaBuilder =
SchemaBuilder.record(People.class.getCanonicalName());
peopleSchemaBuilder.field("people1",
people1Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("people2",
people2Schema).type(SchemaType.AVRO);
peopleSchemaBuilder.field("name").type(SchemaType.STRING);
@@ -470,6 +478,9 @@ public class SchemaBuilderTest {
Schema<People> peopleDecodeSchema = Schema.AVRO(
SchemaDefinition.<People>builder().withPojo(People.class).withAlwaysAllowNull(false).build());
+
+ injectWriterSchema(peopleDecodeSchema, peopleSchema);
+
People people = peopleDecodeSchema.decode(peopleEncode);
assertEquals(people.name, peopleRecord.getField("name"));
@@ -480,4 +491,23 @@ public class SchemaBuilderTest {
assertEquals(people.getPeople2().height,
people2GenericRecord.getField("height"));
assertEquals(people.getPeople2().name,
people2GenericRecord.getField("name"));
}
+
+ /**
+ * Set the schema used to encode the data into the schema that we are
using to read.
+ * Avro is able to decode successfully the record and apply the contents
to the requested
+ * schema
+ * @param decoder the schema used for reading
+ * @param writer the schema used for writing
+ */
+ private static final void injectWriterSchema(Schema decoder, Schema
writer) {
+ AvroSchema<?> avroSchema = (AvroSchema<?>) decoder;
+ avroSchema.setReader(new MultiVersionAvroReader<>(
+ AvroSchema.of(SchemaDefinition.
+ builder()
+
.withJsonDef(writer.getSchemaInfo().getSchemaDefinition())
+ .build())
+ .getAvroSchema(),
+ Thread.currentThread().getContextClassLoader(),
+ false));
+ }
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index 5361060..d187446 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -80,6 +80,20 @@ public class SchemaInfoTest {
+ " \"namespace\":
\"org.apache.pulsar.client.impl.schema.SchemaTestUtils\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ + " \"name\": \"color\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"enum\",\n"
+ + " \"name\": \"Color\",\n"
+ + " \"symbols\": [\n"
+ + " \"RED\",\n"
+ + " \"BLUE\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ " \"name\": \"field1\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
@@ -114,20 +128,6 @@ public class SchemaInfoTest {
+ " ]\n"
+ " },\n"
+ " {\n"
- + " \"name\": \"color\",\n"
- + " \"type\": [\n"
- + " \"null\",\n"
- + " {\n"
- + " \"type\": \"enum\",\n"
- + " \"name\": \"Color\",\n"
- + " \"symbols\": [\n"
- + " \"RED\",\n"
- + " \"BLUE\"\n"
- + " ]\n"
- + " }\n"
- + " ]\n"
- + " },\n"
- + " {\n"
+ " \"name\": \"fieldUnableNull\",\n"
+ " \"type\": \"string\",\n"
+ " \"default\": \"defaultValue\"\n"
@@ -155,6 +155,20 @@ public class SchemaInfoTest {
+ " \"namespace\":
\"org.apache.pulsar.client.impl.schema.SchemaTestUtils\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ + " \"name\": \"color\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"enum\",\n"
+ + " \"name\": \"Color\",\n"
+ + " \"symbols\": [\n"
+ + " \"RED\",\n"
+ + " \"BLUE\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ " \"name\": \"field1\",\n"
+ " \"type\": [\n"
+ " \"null\",\n"
@@ -189,20 +203,6 @@ public class SchemaInfoTest {
+ " ]\n"
+ " },\n"
+ " {\n"
- + " \"name\": \"color\",\n"
- + " \"type\": [\n"
- + " \"null\",\n"
- + " {\n"
- + " \"type\": \"enum\",\n"
- + " \"name\": \"Color\",\n"
- + " \"symbols\": [\n"
- + " \"RED\",\n"
- + " \"BLUE\"\n"
- + " ]\n"
- + " }\n"
- + " ]\n"
- + " },\n"
- + " {\n"
+ " \"name\": \"fieldUnableNull\",\n"
+ " \"type\": \"string\",\n"
+ " \"default\": \"defaultValue\"\n"
diff --git a/pulsar-sql/presto-distribution/LICENSE
b/pulsar-sql/presto-distribution/LICENSE
index 29dc534..2dd4213 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -358,8 +358,8 @@ The Apache Software License, Version 2.0
* Apache XBean :: Reflect
- xbean-reflect-3.4.jar
* Avro
- - avro-1.9.1.jar
- - avro-protobuf-1.9.1.jar
+ - avro-1.10.2.jar
+ - avro-protobuf-1.10.2.jar
* Caffeine
- caffeine-2.6.2.jar
* Javax
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 39295a9..a03a017 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -140,12 +140,12 @@ public class TestAvroDecoder extends
AbstractDecoderTester {
Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
RowType columnType =
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("intField", INTEGER))
.add(RowType.field("nestedRow",
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build())))
+ .add(RowType.field("stringField", VARCHAR))
.build());
PulsarColumnHandle columnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(),
@@ -244,23 +244,23 @@ public class TestAvroDecoder extends
AbstractDecoderTester {
Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
RowType columnType =
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("arrayField", new ArrayType(
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build()))))
.add(RowType.field("mapField",
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
TypeSignatureParameter.typeParameter(RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField",
VARCHAR))
.add(RowType.field("longField",
BIGINT))
+ .add(RowType.field("stringField",
VARCHAR))
.build()).getTypeSignature())
))))
.add(RowType.field("nestedRow",
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build())))
+ .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("structedField",
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 9f56e30..2a22b58 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -160,12 +160,12 @@ public class TestJsonDecoder extends
AbstractDecoderTester {
Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
RowType columnType =
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("intField", INTEGER))
.add(RowType.field("nestedRow",
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build())))
+ .add(RowType.field("stringField", VARCHAR))
.build());
PulsarColumnHandle columnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(), "rowField", columnType,
false, false, "rowField", null, null,
PulsarColumnHandle.HandleKeyValueType.NONE);
@@ -239,23 +239,23 @@ public class TestJsonDecoder extends
AbstractDecoderTester {
Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
RowType columnType =
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("arrayField", new ArrayType(
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build()))))
.add(RowType.field("mapField",
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),
TypeSignatureParameter.typeParameter(RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField",
VARCHAR))
.add(RowType.field("longField",
BIGINT))
+ .add(RowType.field("stringField",
VARCHAR))
.build()).getTypeSignature())
))))
.add(RowType.field("nestedRow",
RowType.from(ImmutableList.<RowType.Field>builder()
- .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("longField", BIGINT))
+ .add(RowType.field("stringField", VARCHAR))
.build())))
+ .add(RowType.field("stringField", VARCHAR))
.add(RowType.field("structedField",
decoderFactory.getTypeManager().getParameterizedType(StandardTypes.MAP,
ImmutableList.of(TypeSignatureParameter.typeParameter(VarcharType.VARCHAR.getTypeSignature()),