syhily commented on code in PR #20698: URL: https://github.com/apache/flink/pull/20698#discussion_r957218599
########## flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java: ########## @@ -118,67 +117,37 @@ public <K, V> PulsarSchema( SchemaInfo encodedInfo = encodeKeyValueSchemaInfo(info.getName(), infoKey, infoValue, encodingType); - this.schemaInfo = encodeClassInfo(encodedInfo, KeyValue.class); - this.schema = createSchema(this.schemaInfo); + setSchemaInfo(encodeClassInfo(encodedInfo, KeyValue.class)); + } + + /** Validate the schema for having the required class info. */ + private void setSchemaInfo(SchemaInfo schemaInfo) { + this.schema = createSchema(schemaInfo); + this.schemaInfo = schemaInfo; + + this.schemaName = schemaInfo.getName(); + this.schemaBytes = schemaInfo.getSchema(); + this.schemaType = schemaInfo.getType(); + this.schemaProperties = schemaInfo.getProperties(); } public Schema<T> getPulsarSchema() { + if (schema == null) { + this.schema = createSchema(getSchemaInfo()); + } return schema; } public SchemaInfo getSchemaInfo() { + if (schemaInfo == null) { + this.schemaInfo = + new SchemaInfoImpl(schemaName, schemaBytes, schemaType, schemaProperties); + } return schemaInfo; } public Class<T> getRecordClass() { - return decodeClassInfo(schemaInfo); - } - - private void writeObject(ObjectOutputStream oos) throws IOException { - // Name - oos.writeUTF(schemaInfo.getName()); - - // Schema - byte[] schemaBytes = schemaInfo.getSchema(); - oos.writeInt(schemaBytes.length); - oos.write(schemaBytes); - - // Type - SchemaType type = schemaInfo.getType(); - oos.writeInt(type.getValue()); - - // Properties - Map<String, String> properties = schemaInfo.getProperties(); - oos.writeInt(properties.size()); - for (Map.Entry<String, String> entry : properties.entrySet()) { - oos.writeUTF(entry.getKey()); - oos.writeUTF(entry.getValue()); - } - } - - private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException { - // Name - String name = ois.readUTF(); - - // Schema - int byteLen = ois.readInt(); - byte[] schemaBytes = new byte[byteLen]; - int read = ois.read(schemaBytes); Review Comment: `ois.readFully(schemaBytes)` is also good for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org