syhily commented on code in PR #20698:
URL: https://github.com/apache/flink/pull/20698#discussion_r957218599


##########
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java:
##########
@@ -118,67 +117,37 @@ public <K, V> PulsarSchema(
         SchemaInfo encodedInfo =
                 encodeKeyValueSchemaInfo(info.getName(), infoKey, infoValue, 
encodingType);
 
-        this.schemaInfo = encodeClassInfo(encodedInfo, KeyValue.class);
-        this.schema = createSchema(this.schemaInfo);
+        setSchemaInfo(encodeClassInfo(encodedInfo, KeyValue.class));
+    }
+
+    /** Validate the schema for having the required class info. */
+    private void setSchemaInfo(SchemaInfo schemaInfo) {
+        this.schema = createSchema(schemaInfo);
+        this.schemaInfo = schemaInfo;
+
+        this.schemaName = schemaInfo.getName();
+        this.schemaBytes = schemaInfo.getSchema();
+        this.schemaType = schemaInfo.getType();
+        this.schemaProperties = schemaInfo.getProperties();
     }
 
     public Schema<T> getPulsarSchema() {
+        if (schema == null) {
+            this.schema = createSchema(getSchemaInfo());
+        }
         return schema;
     }
 
     public SchemaInfo getSchemaInfo() {
+        if (schemaInfo == null) {
+            this.schemaInfo =
+                    new SchemaInfoImpl(schemaName, schemaBytes, schemaType, 
schemaProperties);
+        }
         return schemaInfo;
     }
 
     public Class<T> getRecordClass() {
-        return decodeClassInfo(schemaInfo);
-    }
-
-    private void writeObject(ObjectOutputStream oos) throws IOException {
-        // Name
-        oos.writeUTF(schemaInfo.getName());
-
-        // Schema
-        byte[] schemaBytes = schemaInfo.getSchema();
-        oos.writeInt(schemaBytes.length);
-        oos.write(schemaBytes);
-
-        // Type
-        SchemaType type = schemaInfo.getType();
-        oos.writeInt(type.getValue());
-
-        // Properties
-        Map<String, String> properties = schemaInfo.getProperties();
-        oos.writeInt(properties.size());
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
-            oos.writeUTF(entry.getKey());
-            oos.writeUTF(entry.getValue());
-        }
-    }
-
-    private void readObject(ObjectInputStream ois) throws 
ClassNotFoundException, IOException {
-        // Name
-        String name = ois.readUTF();
-
-        // Schema
-        int byteLen = ois.readInt();
-        byte[] schemaBytes = new byte[byteLen];
-        int read = ois.read(schemaBytes);

Review Comment:
   `ois.readFully(schemaBytes)` is also good for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to