This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ea20a896b408d052fe0ef366c4a052b6ccfb6c72 Author: Cong Zhao <[email protected]> AuthorDate: Fri Jun 24 00:33:59 2022 +0800 [fix][client] Add classLoader field for `SchemaDefinition` (#15915) Fixes #15899 ### Motivation Now, don‘t register logical type conversions when use `SchemaDefinition.<T>builder().withJsonDef()` beacase it without classLoader param. See: https://github.com/apache/pulsar/blob/04aa9e8e51869d1621a7e25402a656084eebfc09/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/reader/AvroReader.java#L58-L68 We can add the classLoader field for `SchemaDefinition`, user can manually pass a classLoader to register logical type conversions ### Modifications Add classLoader field for `SchemaDefinition` (cherry picked from commit 8434500b6879abc9ab74de6e5b75883e8053fd9c) --- .../pulsar/client/api/schema/SchemaDefinition.java | 7 ++ .../client/api/schema/SchemaDefinitionBuilder.java | 9 +++ .../pulsar/client/impl/schema/AvroSchema.java | 5 +- .../impl/schema/SchemaDefinitionBuilderImpl.java | 15 ++++- .../client/impl/schema/SchemaDefinitionImpl.java | 11 +++- .../pulsar/client/impl/schema/AvroSchemaTest.java | 77 ++++++++++++++++++++-- .../client/impl/schema/SchemaBuilderTest.java | 8 ++- 7 files changed, 122 insertions(+), 10 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java index a6777c5c2fa..88dd3670608 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java @@ -75,6 +75,13 @@ public interface SchemaDefinition<T> { */ Class<T> getPojo(); + /** + * Get pojo classLoader. + * + * @return pojo schema + */ + ClassLoader getClassLoader(); + /** * Get supportSchemaVersioning schema definition. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java index 61d246674a8..97d822b927d 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java @@ -80,6 +80,15 @@ public interface SchemaDefinitionBuilder<T> { */ SchemaDefinitionBuilder<T> withPojo(Class pojo); + /** + * Set schema of pojo classLoader. + * + * @param classLoader pojo classLoader + * + * @return schema definition builder + */ + SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader); + /** * Set schema of json definition. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index 3d0bf157cb3..d2ea9cd4a9f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -89,9 +89,12 @@ public class AvroSchema<T> extends AvroBaseStructSchema<T> { schemaDefinition.getSchemaWriterOpt().get(), parseSchemaInfo(schemaDefinition, SchemaType.AVRO)); } ClassLoader pojoClassLoader = null; - if (schemaDefinition.getPojo() != null) { + if (schemaDefinition.getClassLoader() != null) { + pojoClassLoader = schemaDefinition.getClassLoader(); + } else if (schemaDefinition.getPojo() != null) { pojoClassLoader = schemaDefinition.getPojo().getClassLoader(); } + return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO), pojoClassLoader); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java index fe85a55e117..06a2f50abd6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java @@ -40,6 +40,11 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T */ private Class<T> clazz; + /** + * the classLoader definition class. + */ + private ClassLoader classLoader; + /** * The flag of schema type always allow null. * @@ -100,6 +105,12 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T return this; } + @Override + public SchemaDefinitionBuilder<T> withClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + @Override public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) { this.jsonDef = jsonDef; @@ -149,8 +160,8 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull)); properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled)); - return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning, - jsr310ConversionEnabled, reader, writer); + return new SchemaDefinitionImpl(clazz, jsonDef, classLoader, + alwaysAllowNull, properties, supportSchemaVersioning, jsr310ConversionEnabled, reader, writer); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java index d0db78963db..090211a63fe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java @@ -50,6 +50,8 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> { private final String jsonDef; + private final ClassLoader classLoader; + private final boolean supportSchemaVersioning; private final boolean jsr310ConversionEnabled; @@ -58,13 +60,15 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> { private final SchemaWriter<T> writer; - public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String, String> properties, + public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, ClassLoader classLoader, + boolean alwaysAllowNull, Map<String, String> properties, boolean supportSchemaVersioning, boolean jsr310ConversionEnabled, SchemaReader<T> reader, SchemaWriter<T> writer) { this.alwaysAllowNull = alwaysAllowNull; this.properties = properties; this.jsonDef = jsonDef; this.pojo = pojo; + this.classLoader = classLoader; this.supportSchemaVersioning = supportSchemaVersioning; this.jsr310ConversionEnabled = jsr310ConversionEnabled; this.reader = reader; @@ -104,6 +108,11 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T> { return pojo; } + @Override + public ClassLoader getClassLoader() { + return this.classLoader; + } + @Override public boolean getSupportSchemaVersioning() { return supportSchemaVersioning; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java index d69f8bf66ba..2a5040d7815 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java @@ -36,7 +36,9 @@ import java.time.LocalTime; import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.UUID; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.avro.SchemaValidationException; @@ -463,21 +465,88 @@ public class AvroSchemaTest { @Data - private static class TimestampStruct { + @AllArgsConstructor + @NoArgsConstructor + private static class TimestampPojo { Instant value; } @Test public void testTimestampWithJsr310Conversion() { - AvroSchema<TimestampStruct> schema = AvroSchema.of(TimestampStruct.class); + AvroSchema<TimestampPojo> schema = AvroSchema.of(TimestampPojo.class); Assert.assertEquals( schema.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(), new TimeConversions.TimestampMicrosConversion().getLogicalTypeName()); - AvroSchema<TimestampStruct> schema2 = AvroSchema.of(SchemaDefinition.<TimestampStruct>builder() - .withPojo(TimestampStruct.class).withJSR310ConversionEnabled(true).build()); + AvroSchema<TimestampPojo> schema2 = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withPojo(TimestampPojo.class).withJSR310ConversionEnabled(true).build()); Assert.assertEquals( schema2.getAvroSchema().getFields().get(0).schema().getTypes().get(1).getLogicalType().getName(), new TimeConversions.TimestampMillisConversion().getLogicalTypeName()); } + + @Test + public void testTimestampWithJsonDef(){ + AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withPojo(TimestampPojo.class) + .withJSR310ConversionEnabled(false).build()); + + TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z")); + byte[] encode = schemaWithPojo.encode(timestampPojo); + TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode); + + Assert.assertEquals(decodeWithPojo, timestampPojo); + + String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema()); + AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withJsonDef(schemaDefinition) + .withClassLoader(TimestampPojo.class.getClassLoader()) + .withJSR310ConversionEnabled(false).build()); + + TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode); + + Assert.assertEquals(decodeWithJson, decodeWithPojo); + Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass()); + + AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withJsonDef(schemaDefinition) + .withJSR310ConversionEnabled(false).build()); + + TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode); + Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo); + Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass()); + } + + @Test + public void testTimestampWithJsonDefAndJSR310ConversionEnabled(){ + AvroSchema<TimestampPojo> schemaWithPojo = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withPojo(TimestampPojo.class) + .withJSR310ConversionEnabled(true).build()); + + TimestampPojo timestampPojo = new TimestampPojo(Instant.parse("2022-06-10T12:38:59.039084Z")); + byte[] encode = schemaWithPojo.encode(timestampPojo); + TimestampPojo decodeWithPojo = schemaWithPojo.decode(encode); + + Assert.assertNotEquals(decodeWithPojo, timestampPojo); + + String schemaDefinition = new String(schemaWithPojo.schemaInfo.getSchema()); + AvroSchema<TimestampPojo> schemaWithJsonDef = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withJsonDef(schemaDefinition) + .withClassLoader(TimestampPojo.class.getClassLoader()) + .withJSR310ConversionEnabled(true).build()); + + TimestampPojo decodeWithJson = schemaWithJsonDef.decode(encode); + + Assert.assertEquals(decodeWithJson, decodeWithPojo); + Assert.assertEquals(Instant.class, decodeWithJson.getValue().getClass()); + + AvroSchema<TimestampPojo> schemaWithJsonDefNoClassLoader = AvroSchema.of(SchemaDefinition.<TimestampPojo>builder() + .withJsonDef(schemaDefinition) + .withJSR310ConversionEnabled(true).build()); + + TimestampPojo decodeWithJsonNoClassLoader = schemaWithJsonDefNoClassLoader.decode(encode); + Assert.assertNotEquals(decodeWithJsonNoClassLoader, decodeWithPojo); + Assert.assertNotEquals(Instant.class, decodeWithJsonNoClassLoader.getValue().getClass()); + } + } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java index fa88e144a31..a1530864c92 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java @@ -20,11 +20,15 @@ package org.apache.pulsar.client.impl.schema; import static java.nio.charset.StandardCharsets.UTF_8; import static org.testng.Assert.assertEquals; - import lombok.Data; import org.apache.avro.reflect.Nullable; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.schema.*; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericRecordBuilder; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaBuilder; +import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.impl.schema.reader.MultiVersionAvroReader; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType;
