gaoran10 commented on code in PR #24488: URL: https://github.com/apache/pulsar/pull/24488#discussion_r2306780392
########## pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java: ########## @@ -100,28 +101,82 @@ public interface KeyValueDecoder<K, V> { */ public static <K, V> byte[] encode(K key, Schema<K> keyWriter, V value, Schema<V> valueWriter) { - byte [] keyBytes; + return encode(null, key, keyWriter, value, valueWriter).data(); + } + + public static <K, V> EncodeData encode(String topic, K key, Schema<K> keyWriter, + V value, Schema<V> valueWriter) { + EncodeData keyEncodeData; if (key == null) { - keyBytes = new byte[0]; + keyEncodeData = new EncodeData(new byte[0]); } else { - keyBytes = keyWriter.encode(key); + keyEncodeData = keyWriter.encode(topic, key); } - byte [] valueBytes; + EncodeData valueEncodeData; if (value == null) { - valueBytes = new byte[0]; + valueEncodeData = new EncodeData(new byte[0]); } else { - valueBytes = valueWriter.encode(value); + valueEncodeData = valueWriter.encode(topic, value); } - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length); + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keyEncodeData.data().length + 4 + valueEncodeData.data().length); byteBuffer - .putInt(key == null ? -1 : keyBytes.length) - .put(keyBytes) - .putInt(value == null ? -1 : valueBytes.length) - .put(valueBytes); + .putInt(key == null ? -1 : keyEncodeData.data().length) + .put(keyEncodeData.data()) + .putInt(value == null ? -1 : valueEncodeData.data().length) + .put(valueEncodeData.data()); + return new EncodeData(byteBuffer.array(), + generateKVSchemaId(keyEncodeData.schemaId(), valueEncodeData.schemaId())); + } + + /** + * Generate a combined schema id for key/value schema. + * The format is: + * schemaId = schemaKeyLength + keySchemaIdBytes + schemaValueLength + valueSchemaIdBytes + * where schemaKeyLength and schemaValueLength are 4 bytes integer. + * If keySchemaIdBytes or valueSchemaIdBytes is null, the length will be 0. + * So the total length of schemaId is: + * 4 + keySchemaIdBytes.length + 4 + valueSchemaIdBytes.length + * + * @param keySchemaId the schema id of key schema + * @param valueSchemaId the schema id of value schema + */ + public static byte[] generateKVSchemaId(byte[] keySchemaId, byte[] valueSchemaId) { + if (keySchemaId == null && valueSchemaId == null) { + return null; + } + keySchemaId = keySchemaId == null ? new byte[0] : keySchemaId; + valueSchemaId = valueSchemaId == null ? new byte[0] : valueSchemaId; + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keySchemaId.length + 4 + valueSchemaId.length); + byteBuffer + .putInt(keySchemaId.length) + .put(keySchemaId) + .putInt(valueSchemaId.length) + .put(valueSchemaId); return byteBuffer.array(); } + public static byte[] getSchemaId(byte[] schemaId, boolean isKey) { + ByteBuffer buffer = ByteBuffer.wrap(schemaId); + int keySchemaLength = buffer.getInt(); + if (keySchemaLength > 0) { + byte[] keySchemaId = new byte[keySchemaLength]; + buffer.get(keySchemaId); + if (isKey) { + return keySchemaId; + } + } + int valueSchemaLength = buffer.getInt(); + if (valueSchemaLength > 0) { + byte[] valueSchemaId = new byte[valueSchemaLength]; + buffer.get(valueSchemaId); + return valueSchemaId; + } + throw new IllegalArgumentException("Invalid schemaId length: " + schemaId.length); Review Comment: Improved, and I added a test to cover different cases. ########## pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java: ########## @@ -100,28 +101,82 @@ public interface KeyValueDecoder<K, V> { */ public static <K, V> byte[] encode(K key, Schema<K> keyWriter, V value, Schema<V> valueWriter) { - byte [] keyBytes; + return encode(null, key, keyWriter, value, valueWriter).data(); + } + + public static <K, V> EncodeData encode(String topic, K key, Schema<K> keyWriter, + V value, Schema<V> valueWriter) { + EncodeData keyEncodeData; if (key == null) { - keyBytes = new byte[0]; + keyEncodeData = new EncodeData(new byte[0]); } else { - keyBytes = keyWriter.encode(key); + keyEncodeData = keyWriter.encode(topic, key); } - byte [] valueBytes; + EncodeData valueEncodeData; if (value == null) { - valueBytes = new byte[0]; + valueEncodeData = new EncodeData(new byte[0]); } else { - valueBytes = valueWriter.encode(value); + valueEncodeData = valueWriter.encode(topic, value); } - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length); + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keyEncodeData.data().length + 4 + valueEncodeData.data().length); byteBuffer - .putInt(key == null ? -1 : keyBytes.length) - .put(keyBytes) - .putInt(value == null ? -1 : valueBytes.length) - .put(valueBytes); + .putInt(key == null ? -1 : keyEncodeData.data().length) + .put(keyEncodeData.data()) + .putInt(value == null ? -1 : valueEncodeData.data().length) + .put(valueEncodeData.data()); + return new EncodeData(byteBuffer.array(), + generateKVSchemaId(keyEncodeData.schemaId(), valueEncodeData.schemaId())); + } + + /** + * Generate a combined schema id for key/value schema. + * The format is: + * schemaId = schemaKeyLength + keySchemaIdBytes + schemaValueLength + valueSchemaIdBytes + * where schemaKeyLength and schemaValueLength are 4 bytes integer. + * If keySchemaIdBytes or valueSchemaIdBytes is null, the length will be 0. + * So the total length of schemaId is: + * 4 + keySchemaIdBytes.length + 4 + valueSchemaIdBytes.length + * + * @param keySchemaId the schema id of key schema + * @param valueSchemaId the schema id of value schema + */ + public static byte[] generateKVSchemaId(byte[] keySchemaId, byte[] valueSchemaId) { + if (keySchemaId == null && valueSchemaId == null) { + return null; + } + keySchemaId = keySchemaId == null ? new byte[0] : keySchemaId; + valueSchemaId = valueSchemaId == null ? new byte[0] : valueSchemaId; + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keySchemaId.length + 4 + valueSchemaId.length); + byteBuffer + .putInt(keySchemaId.length) + .put(keySchemaId) + .putInt(valueSchemaId.length) + .put(valueSchemaId); return byteBuffer.array(); } + public static byte[] getSchemaId(byte[] schemaId, boolean isKey) { Review Comment: Improved, and I added a test to cover different cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org