This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bd6ef46a0714ee558bf815c98aef577a29d26d44 Author: Zixuan Liu <[email protected]> AuthorDate: Sat Jan 29 15:34:57 2022 +0800 [Schema] Fix parse BigDecimal (#14019) ### Motivation I can use Avro schema with BigDecimal in Pulsar 2.8.0, but this doesn't work on Pulsar 2.8.1, so I check this codebase and PR, found https://github.com/apache/pulsar/pull/10428 breaks this. The following is error log: ``` org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.3.4.jar:5.3.4] ... 34 common frames omitted Caused by: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required) at org.apache.pulsar.shade.org.apache.avro.Conversions$DecimalConversion.getRecommendedSchema(Conversions.java:73) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:696) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.reflect.ReflectData.createFieldSchema(ReflectData.java:873) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.reflect.ReflectData$AllowNull.createFieldSchema(ReflectData.java:92) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.reflect.ReflectData.createSchema(ReflectData.java:736) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:328) ~[pulsar-client-2.8.1.jar:2.8.1] at org.apache.pulsar.shade.org.apache.avro.specific.SpecificData$3.computeValue(SpecificData.java:325) ~[pulsar-client-2.8.1.jar:2.8.1] at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228) ~[na:na] at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210) ~[na:na] at java.base/java.lang.ClassValue.get(ClassValue.java:116) ~[na:na] at org.apache.pulsar.shade.org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:339) ~[pulsar-client-2.8.1.jar:2.8.1] ... 52 common frames omitted ``` I think that Avro cannot work on using the ReflectData with Conversions.DecimalConversion to parse the BigDecimal field without AvroSchema when getting schema, but the Avro ReflectDatumWriter and ReflectDatumReader are working, it seems that Avro support for BigDecimal is not enough. Affected version: 2.8.1...2.8.x, 2.9.x ### Modifications - Skip add the DecimalConversion in `extractAvroSchema()` (cherry picked from commit 2ca8e8a7a94c7d2cbba771fe8bb1ce5b1f4445bd) --- .../pulsar/client/impl/schema/AvroSchema.java | 9 ++++++++- .../pulsar/client/impl/schema/util/SchemaUtil.java | 2 +- .../pulsar/client/impl/schema/AvroSchemaTest.java | 23 +++++++++++++++++++++- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index cc31b19..b34017e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -106,7 +106,14 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> { } public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) { - reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + addLogicalTypeConversions(reflectData, jsr310ConversionEnabled, true); + } + + public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled, + boolean decimalConversionEnabled) { + if (decimalConversionEnabled) { + reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + } reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion()); reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion()); reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java index 2d0c810..abf5208 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java @@ -94,7 +94,7 @@ public class SchemaUtil { ReflectData reflectData = schemaDefinition.getAlwaysAllowNull() ? new ReflectData.AllowNull() : new ReflectData(); - AvroSchema.addLogicalTypeConversions(reflectData, schemaDefinition.isJsr310ConversionEnabled()); + AvroSchema.addLogicalTypeConversions(reflectData, schemaDefinition.isJsr310ConversionEnabled(), false); return reflectData.getSchema(pojo); } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index 00cbbdd..9e707af 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -36,7 +36,6 @@ import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.UUID; - import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; @@ -438,4 +437,26 @@ public class AvroSchemaTest { assertEquals(pojo1.uid, pojo2.uid); } + static class MyBigDecimalPojo { + public BigDecimal value1; + @org.apache.avro.reflect.AvroSchema("{\n" + + " \"type\": \"bytes\",\n" + + " \"logicalType\": \"decimal\",\n" + + " \"precision\": 4,\n" + + " \"scale\": 2\n" + + "}") + public BigDecimal value2; + } + + @Test + public void testAvroBigDecimal() { + org.apache.pulsar.client.api.Schema<MyBigDecimalPojo> schema = + org.apache.pulsar.client.api.Schema.AVRO(MyBigDecimalPojo.class); + MyBigDecimalPojo myBigDecimalPojo = new MyBigDecimalPojo(); + myBigDecimalPojo.value1 = new BigDecimal("10.21"); + myBigDecimalPojo.value2 = new BigDecimal("10.22"); + MyBigDecimalPojo pojo2 = schema.decode(schema.encode(myBigDecimalPojo)); + assertEquals(pojo2.value1, myBigDecimalPojo.value1); + assertEquals(pojo2.value2, myBigDecimalPojo.value2); + } }
