jerrypeng closed pull request #2969: JSONSchema fails to serialise fields on objects that are nested in a collection URL: https://github.com/apache/pulsar/pull/2969
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java index d46c84b570..02185ec555 100644 --- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java +++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java @@ -50,25 +50,7 @@ private JSONSchema(Class<T> pojo, Map<String, String> properties) { this.pojo = pojo; this.properties = properties; - this.gson = new GsonBuilder().addSerializationExclusionStrategy(new ExclusionStrategy() { - Set<String> classes = new HashSet<>(); - - @Override - public boolean shouldSkipField(FieldAttributes f) { - boolean skip = !(f.getDeclaringClass().equals(pojo) - || classes.contains(f.getDeclaringClass().getName()) - || f.getDeclaringClass().isAssignableFrom(pojo)); - if (!skip) { - classes.add(f.getDeclaredClass().getName()); - } - return skip; - } - - @Override - public boolean shouldSkipClass(Class<?> clazz) { - return false; - } - }).create(); + this.gson = new Gson(); this.schema = ReflectData.AllowNull.get().getSchema(pojo); this.schemaInfo = new SchemaInfo(); diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java index c7b16bf0fa..93f34785f2 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java @@ -18,12 +18,17 @@ */ package org.apache.pulsar.client.schema; +import java.util.Collections; +import java.util.List; + import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.client.schema.SchemaTestUtils.Bar; import org.apache.pulsar.client.schema.SchemaTestUtils.DerivedFoo; import org.apache.pulsar.client.schema.SchemaTestUtils.Foo; +import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBar; +import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBarList; import org.apache.pulsar.common.schema.SchemaType; import org.testng.Assert; import org.testng.annotations.Test; @@ -40,7 +45,6 @@ public void testSchema() { Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON); Schema.Parser parser = new Schema.Parser(); String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema()); - log.info("schemaJson: {}", schemaJson); Assert.assertEquals(schemaJson, SCHEMA_JSON); Schema schema = parser.parse(schemaJson); @@ -84,6 +88,33 @@ public void testEncodeAndDecode() { Assert.assertEquals(object2, foo2); } + @Test + public void testNestedClasses() { + JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null); + JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null); + + Bar bar = new Bar(); + bar.setField1(true); + + NestedBar nested = new NestedBar(); + nested.setField1(true); + nested.setNested(bar); + + byte[] bytes = jsonSchema.encode(nested); + Assert.assertTrue(bytes.length > 0); + Assert.assertEquals(jsonSchema.decode(bytes), nested); + + List<Bar> list = Collections.singletonList(bar); + NestedBarList nestedList = new NestedBarList(); + nestedList.setField1(true); + nestedList.setList(list); + + bytes = listJsonSchema.encode(nestedList); + Assert.assertTrue(bytes.length > 0); + + Assert.assertEquals(listJsonSchema.decode(bytes), nestedList); + } + @Test public void testCorrectPolymorphism() { @@ -123,7 +154,6 @@ public void testCorrectPolymorphism() { // schema for derived class JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class); Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo); - log.info("derivedJsonSchema.encode(derivedDerivedFoo)): {}", derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo))); Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo); //schema for derived derived class diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java index 123b81c82c..b31b007e29 100644 --- a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java +++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.client.schema; +import java.util.List; + import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; @@ -45,6 +47,22 @@ private boolean field1; } + @Data + @ToString + @EqualsAndHashCode + public static class NestedBar { + private boolean field1; + private Bar nested; + } + + @Data + @ToString + @EqualsAndHashCode + public static class NestedBarList { + private boolean field1; + private List<Bar> list; + } + @Data @ToString @EqualsAndHashCode diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java index 48bc0f1b6b..c2fbbf8ede 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java @@ -80,7 +80,7 @@ /** * Produce Mode. */ - protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE; + protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE; /** * If true, the producer will wait until all outstanding records have been send to the broker. @@ -212,7 +212,7 @@ public void open(Configuration parameters) throws Exception { LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause); return null; }; - } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode) { + } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) { this.failureCallback = cause -> { if (null == asyncException) { if (cause instanceof Exception) { diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java index d1b9fd88d2..d42f5c341c 100644 --- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java +++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java @@ -32,6 +32,6 @@ * The producer will ensure that all the events are persisted in pulsar. * There could be duplicate events written though. */ - AT_LEAST_ONE, + AT_LEAST_ONCE, } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services