eolivelli commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611002212
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
return decode(bytes);
}
+ /**
+ * Decode a ByteBuf into an object using a given version. <br/>
+ * <b>NOTE</b>: This method should not modify reader/writer index of
ByteBuf else it can cause corruption while
+ * accessing same ByteBuf for decoding and deserialization.
+ *
+ * @param byteBuf
+ * the byte array to decode
+ * @param schemaVersion
+ * the schema version to decode the object. null indicates
using latest version.
+ * @return the deserialized object
+ */
+ default T decode(ByteBuf bytes, byte[] schemaVersion) {
+ return null;
Review comment:
can you please write the default implementation, that calls the other
`decode` method ?
otherwise custom schema may break if in the future we are going to relay
more on this method
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
##########
@@ -171,6 +174,12 @@ private String interpretMessage(Message<?> message,
boolean displayHex) throws I
} else if (value instanceof GenericRecord) {
Map<String, Object> asMap = genericRecordToMap((GenericRecord)
value);
data = asMap.toString();
+ } else if (value instanceof ByteBuffer) {
+ ByteBuffer payload = ((ByteBuffer)value);
Review comment:
what about using the internal array of the ByteBuffer in case of
"hasArray", offset=0 and len == remaining? I did the same in BytesKafkaSource
this way we can save a copy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]