This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d9bcbffc006 [FLINK-28609][Connector/Pulsar] PulsarSchema didn't get
properly serialized. (#20698)
d9bcbffc006 is described below
commit d9bcbffc006481c09a8d2e04aa05cc92cd5c80d2
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Aug 30 10:31:36 2022 +0800
[FLINK-28609][Connector/Pulsar] PulsarSchema didn't get properly
serialized. (#20698)
---
.../pulsar/common/schema/PulsarSchema.java | 4 ++--
.../pulsar/common/schema/PulsarSchemaTest.java | 28 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
index 6ce91cdc674..4c33d79d205 100644
---
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
+++
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java
@@ -19,6 +19,7 @@
package org.apache.flink.connector.pulsar.common.schema;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.IOUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -163,8 +164,7 @@ public final class PulsarSchema<T> implements Serializable {
// Schema
int byteLen = ois.readInt();
byte[] schemaBytes = new byte[byteLen];
- int read = ois.read(schemaBytes);
- checkState(read == byteLen);
+ IOUtils.readFully(ois, schemaBytes, 0, byteLen);
// Type
int typeIdx = ois.readInt();
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
index 7011e169656..81074c99370 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchemaTest.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.junit.jupiter.api.Test;
+import java.io.Serializable;
+
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -117,9 +119,35 @@ class PulsarSchemaTest {
assertPulsarSchemaIsSerializable(new PulsarSchema<>(KV, Foo.class,
FA.class));
}
+ @Test
+ void largeAvroSchemaSerialization() throws Exception {
+ Schema<LargeMessage> largeMessageSchema =
Schema.AVRO(LargeMessage.class);
+ assertPulsarSchemaIsSerializable(
+ new PulsarSchema<>(largeMessageSchema, LargeMessage.class));
+ }
+
private <T> void assertPulsarSchemaIsSerializable(PulsarSchema<T> schema)
throws Exception {
PulsarSchema<T> clonedSchema = InstantiationUtil.clone(schema);
assertEquals(clonedSchema.getSchemaInfo(), schema.getSchemaInfo());
assertEquals(clonedSchema.getRecordClass(), schema.getRecordClass());
}
+
+ /** A POJO Class which would generate a large schema by Avro. */
+ public static class LargeMessage implements Serializable {
+ private static final long serialVersionUID = 5364494369740402518L;
+
+ public String
+
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa;
+ public String
+
bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb;
+ public String
+
cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc;
+ public String
+
dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd;
+ public String
+
eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee;
+ // the problem begins
+ public String
+
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff;
+ }
}