BewareMyPower commented on code in PR #24488: URL: https://github.com/apache/pulsar/pull/24488#discussion_r2306309651
########## pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java: ########## @@ -100,28 +101,82 @@ public interface KeyValueDecoder<K, V> { */ public static <K, V> byte[] encode(K key, Schema<K> keyWriter, V value, Schema<V> valueWriter) { - byte [] keyBytes; + return encode(null, key, keyWriter, value, valueWriter).data(); + } + + public static <K, V> EncodeData encode(String topic, K key, Schema<K> keyWriter, + V value, Schema<V> valueWriter) { + EncodeData keyEncodeData; if (key == null) { - keyBytes = new byte[0]; + keyEncodeData = new EncodeData(new byte[0]); } else { - keyBytes = keyWriter.encode(key); + keyEncodeData = keyWriter.encode(topic, key); } - byte [] valueBytes; + EncodeData valueEncodeData; if (value == null) { - valueBytes = new byte[0]; + valueEncodeData = new EncodeData(new byte[0]); } else { - valueBytes = valueWriter.encode(value); + valueEncodeData = valueWriter.encode(topic, value); } - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length); + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keyEncodeData.data().length + 4 + valueEncodeData.data().length); byteBuffer - .putInt(key == null ? -1 : keyBytes.length) - .put(keyBytes) - .putInt(value == null ? -1 : valueBytes.length) - .put(valueBytes); + .putInt(key == null ? -1 : keyEncodeData.data().length) + .put(keyEncodeData.data()) + .putInt(value == null ? -1 : valueEncodeData.data().length) + .put(valueEncodeData.data()); + return new EncodeData(byteBuffer.array(), + generateKVSchemaId(keyEncodeData.schemaId(), valueEncodeData.schemaId())); + } + + /** + * Generate a combined schema id for key/value schema. + * The format is: + * schemaId = schemaKeyLength + keySchemaIdBytes + schemaValueLength + valueSchemaIdBytes + * where schemaKeyLength and schemaValueLength are 4 bytes integer. + * If keySchemaIdBytes or valueSchemaIdBytes is null, the length will be 0. + * So the total length of schemaId is: + * 4 + keySchemaIdBytes.length + 4 + valueSchemaIdBytes.length + * + * @param keySchemaId the schema id of key schema + * @param valueSchemaId the schema id of value schema + */ + public static byte[] generateKVSchemaId(byte[] keySchemaId, byte[] valueSchemaId) { + if (keySchemaId == null && valueSchemaId == null) { + return null; + } + keySchemaId = keySchemaId == null ? new byte[0] : keySchemaId; + valueSchemaId = valueSchemaId == null ? new byte[0] : valueSchemaId; + ByteBuffer byteBuffer = ByteBuffer.allocate( + 4 + keySchemaId.length + 4 + valueSchemaId.length); + byteBuffer + .putInt(keySchemaId.length) + .put(keySchemaId) + .putInt(valueSchemaId.length) + .put(valueSchemaId); return byteBuffer.array(); } + public static byte[] getSchemaId(byte[] schemaId, boolean isKey) { Review Comment: Please add unit tests for this function. The current implementation is wrong. ```java final var schemaId = generateKVSchemaId(null, new byte[]{0x11, 0x22}); final var keySchemaId = getSchemaId(schemaId, true); ``` In this case, `keySchemaId` should be an empty schema id array. However, it's the value schema. BTW, I found allowing a null schema id could make key value schema very confusing. With the same code example, it's impossible to distinguish if the key schema is null or an empty array. One solution is that never allow a null schema id. Then all APIs will become much simpler, e.g. `getSchemaId` won't need to return an `Optional`. The other solution is to have special encoding for null schema id, e.g. use a negative length to represent null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org