This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d9bcbffc006 [FLINK-28609][Connector/Pulsar] PulsarSchema didn't get 
properly serialized. (#20698)
d9bcbffc006 is described below

commit d9bcbffc006481c09a8d2e04aa05cc92cd5c80d2
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Aug 30 10:31:36 2022 +0800

    [FLINK-28609][Connector/Pulsar] PulsarSchema didn't get properly 
serialized. (#20698)
---
 .../pulsar/common/schema/PulsarSchema.java         |  4 ++--
 .../pulsar/common/schema/PulsarSchemaTest.java     | 28 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index 6ce91cdc674..4c33d79d205 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -19,6 +19,7 @@
 package org.apache.flink.connector.pulsar.common.schema;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -163,8 +164,7 @@ public final class PulsarSchema<T> implements Serializable {
         // Schema
         int byteLen = ois.readInt();
         byte[] schemaBytes = new byte[byteLen];
-        int read = ois.read(schemaBytes);
-        checkState(read == byteLen);
+        IOUtils.readFully(ois, schemaBytes, 0, byteLen);
 
         // Type
         int typeIdx = ois.readInt();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
index 7011e169656..81074c99370 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.junit.jupiter.api.Test;
 
+import java.io.Serializable;
+
 import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -117,9 +119,35 @@ class PulsarSchemaTest {
         assertPulsarSchemaIsSerializable(new PulsarSchema<>(KV, Foo.class, 
FA.class));
     }
 
+    @Test
+    void largeAvroSchemaSerialization() throws Exception {
+        Schema<LargeMessage> largeMessageSchema = 
Schema.AVRO(LargeMessage.class);
+        assertPulsarSchemaIsSerializable(
+                new PulsarSchema<>(largeMessageSchema, LargeMessage.class));
+    }
+
     private <T> void assertPulsarSchemaIsSerializable(PulsarSchema<T> schema) 
throws Exception {
         PulsarSchema<T> clonedSchema = InstantiationUtil.clone(schema);
         assertEquals(clonedSchema.getSchemaInfo(), schema.getSchemaInfo());
         assertEquals(clonedSchema.getRecordClass(), schema.getRecordClass());
     }
+
+    /** A POJO Class which would generate a large schema by Avro. */
+    public static class LargeMessage implements Serializable {
+        private static final long serialVersionUID = 5364494369740402518L;
+
+        public String
+                
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa;
+        public String
+                
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb;
+        public String
+                
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc;
+        public String
+                
dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd;
+        public String
+                
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee;
+        // the problem begins
+        public String
+                
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff;
+    }
 }

Reply via email to