aloyszhang commented on a change in pull request #8618:
URL: https://github.com/apache/pulsar/pull/8618#discussion_r541416121
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1920,6 +1929,72 @@ private static ByteBufPair
serializeCommandSendWithSize(BaseCommand.Builder cmdB
return command;
}
+ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload,
+
Set<BrokerEntryMetadataInterceptor> interceptors) {
+ // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE
| BROKER_ENTRY_METADATA |
+ // | 2 bytes | 4 bytes
| BROKER_ENTRY_METADATA_SIZE bytes |
+
+ PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ for (BrokerEntryMetadataInterceptor interceptor : interceptors) {
+ interceptor.intercept(brokerMetadataBuilder);
+ }
+ PulsarApi.BrokerEntryMetadata brokerEntryMetadata =
brokerMetadataBuilder.build();
+ int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
+ ByteBuf brokerMeta =
+ PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6,
brokerMetaSize + 6);
+ brokerMeta.writeShort(Commands.magicBrokerEntryMetadata);
+ brokerMeta.writeInt(brokerMetaSize);
+ ByteBufCodedOutputStream outStream =
ByteBufCodedOutputStream.get(brokerMeta);
+ try {
+ brokerEntryMetadata.writeTo(outStream);
+ } catch (IOException e) {
+ // This is in-memory serialization, should not fail
+ throw new RuntimeException(e);
+ }
+ outStream.recycle();
+
+ CompositeByteBuf compositeByteBuf =
PulsarByteBufAllocator.DEFAULT.compositeBuffer();
+ compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);
+ return compositeByteBuf;
+ }
+
+ public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf
headerAndPayloadWithBrokerEntryMetadata) {
+ int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+ if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
+ int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
+
headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
+ + brokerEntryMetadataSize);
+ } else {
+ headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
+ }
+ return headerAndPayloadWithBrokerEntryMetadata;
+ }
+
+ public static PulsarApi.BrokerEntryMetadata
parseBrokerEntryMetadataIfExist(
+ ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
+ int readerIndex =
headerAndPayloadWithBrokerEntryMetadata.readerIndex();
+ if (headerAndPayloadWithBrokerEntryMetadata.readShort() ==
magicBrokerEntryMetadata) {
+ int brokerEntryMetadataSize =
headerAndPayloadWithBrokerEntryMetadata.readInt();
+ int writerIndex =
headerAndPayloadWithBrokerEntryMetadata.writerIndex();
+
headerAndPayloadWithBrokerEntryMetadata.writerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
+ + brokerEntryMetadataSize);
+ ByteBufCodedInputStream brokerEntryMetadataInputStream =
+
ByteBufCodedInputStream.get(headerAndPayloadWithBrokerEntryMetadata);
+ PulsarApi.BrokerEntryMetadata.Builder builder =
PulsarApi.BrokerEntryMetadata.newBuilder();
+ try {
+ builder.mergeFrom(brokerEntryMetadataInputStream,
null).build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
Review comment:
Caller of this method is
`MessageImpl.deserializeBrokerEntryMetaDataFirst` and all callers of
`MessageImpl.deserializeBrokerEntryMetaDataFirst` has a `try-catch` block
already.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]