jerrypeng closed pull request #2969: JSONSchema fails to serialise fields on 
objects that are nested in a collection
URL: https://github.com/apache/pulsar/pull/2969
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index d46c84b570..02185ec555 100644
--- 
a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ 
b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -50,25 +50,7 @@
     private JSONSchema(Class<T> pojo, Map<String, String> properties) {
         this.pojo = pojo;
         this.properties = properties;
-        this.gson = new GsonBuilder().addSerializationExclusionStrategy(new 
ExclusionStrategy() {
-            Set<String> classes = new HashSet<>();
-
-            @Override
-            public boolean shouldSkipField(FieldAttributes f) {
-                boolean skip = !(f.getDeclaringClass().equals(pojo)
-                        || classes.contains(f.getDeclaringClass().getName())
-                        || f.getDeclaringClass().isAssignableFrom(pojo));
-                if (!skip) {
-                    classes.add(f.getDeclaredClass().getName());
-                }
-                return skip;
-            }
-
-            @Override
-            public boolean shouldSkipClass(Class<?> clazz) {
-                return false;
-            }
-        }).create();
+        this.gson = new Gson();
 
         this.schema = ReflectData.AllowNull.get().getSchema(pojo);
         this.schemaInfo = new SchemaInfo();
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
index c7b16bf0fa..93f34785f2 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/JSONSchemaTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.client.schema;
 
+import java.util.Collections;
+import java.util.List;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.schema.SchemaTestUtils.DerivedFoo;
 import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBar;
+import org.apache.pulsar.client.schema.SchemaTestUtils.NestedBarList;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -40,7 +45,6 @@ public void testSchema() {
         Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), 
SchemaType.JSON);
         Schema.Parser parser = new Schema.Parser();
         String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
-        log.info("schemaJson: {}", schemaJson);
         Assert.assertEquals(schemaJson, SCHEMA_JSON);
         Schema schema = parser.parse(schemaJson);
 
@@ -84,6 +88,33 @@ public void testEncodeAndDecode() {
         Assert.assertEquals(object2, foo2);
     }
 
+    @Test
+    public void testNestedClasses() {
+        JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, 
null);
+        JSONSchema<NestedBarList> listJsonSchema = 
JSONSchema.of(NestedBarList.class, null);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        NestedBar nested = new NestedBar();
+        nested.setField1(true);
+        nested.setNested(bar);
+
+        byte[] bytes = jsonSchema.encode(nested);
+        Assert.assertTrue(bytes.length > 0);
+        Assert.assertEquals(jsonSchema.decode(bytes), nested);
+
+        List<Bar> list = Collections.singletonList(bar);
+        NestedBarList nestedList = new NestedBarList();
+        nestedList.setField1(true);
+        nestedList.setList(list);
+
+        bytes = listJsonSchema.encode(nestedList);
+        Assert.assertTrue(bytes.length > 0);
+
+        Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
+    }
+
     @Test
     public void testCorrectPolymorphism() {
 
@@ -123,7 +154,6 @@ public void testCorrectPolymorphism() {
         // schema for derived class
         JSONSchema<DerivedFoo> derivedJsonSchema = 
JSONSchema.of(DerivedFoo.class);
         
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)),
 derivedFoo);
-        log.info("derivedJsonSchema.encode(derivedDerivedFoo)): {}", 
derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)));
         
Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)),
 derivedFoo);
 
         //schema for derived derived class
diff --git 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
index 123b81c82c..b31b007e29 100644
--- 
a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
+++ 
b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/SchemaTestUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.client.schema;
 
+import java.util.List;
+
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -45,6 +47,22 @@
         private boolean field1;
     }
 
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class NestedBar {
+        private boolean field1;
+        private Bar nested;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class NestedBarList {
+        private boolean field1;
+        private List<Bar> list;
+    }
+
     @Data
     @ToString
     @EqualsAndHashCode
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 48bc0f1b6b..c2fbbf8ede 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -80,7 +80,7 @@
     /**
      * Produce Mode.
      */
-    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONE;
+    protected PulsarProduceMode produceMode = PulsarProduceMode.AT_LEAST_ONCE;
 
     /**
      * If true, the producer will wait until all outstanding records have been 
send to the broker.
@@ -212,7 +212,7 @@ public void open(Configuration parameters) throws Exception 
{
                 LOG.error("Error while sending record to Pulsar : " + 
cause.getMessage(), cause);
                 return null;
             };
-        } else if (PulsarProduceMode.AT_LEAST_ONE == produceMode) {
+        } else if (PulsarProduceMode.AT_LEAST_ONCE == produceMode) {
             this.failureCallback = cause -> {
                 if (null == asyncException) {
                     if (cause instanceof Exception) {
diff --git 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
index d1b9fd88d2..d42f5c341c 100644
--- 
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
+++ 
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarProduceMode.java
@@ -32,6 +32,6 @@
      * The producer will ensure that all the events are persisted in pulsar.
      * There could be duplicate events written though.
      */
-    AT_LEAST_ONE,
+    AT_LEAST_ONCE,
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to