merlimat commented on a change in pull request #10184:
URL: https://github.com/apache/pulsar/pull/10184#discussion_r611078422
##########
File path:
pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
##########
@@ -122,7 +123,9 @@
@Parameter(names = { "-st", "--schema-type"}, description = "Set a schema
type on the consumer, it can be 'bytes' or 'auto_consume'")
private String schematype = "bytes";
-
+ @Parameter(names = { "-pm", "--pool-messages" }, description = "Use the
pooled message")
+ private boolean poolMessages = false;
Review comment:
I'd say to not have it configurable for the tool. Just enabled it always.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -320,6 +393,14 @@ public boolean publishedEarlierThan(long timestamp) {
}
}
+ @Override
+ public int size() {
+ if (msgMetadata.isNullValue()) {
+ return 0;
+ }
+ return poolMessage ? payload.readableBytes() : getData().length;
Review comment:
Shouldn't `payload.readableBytes()` work in both cases?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -93,93 +98,156 @@
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
- this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0);
+ this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0, false);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount) {
- this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
- this.messageId = messageId;
- this.topic = topic;
- this.cnx = cnx;
- this.redeliveryCount = redeliveryCount;
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
+ boolean pooledMessage) {
+ this.msgMetadata = new MessageMetadata();
+ init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, redeliveryCount, pooledMessage);
+ }
- // Need to make a copy since the passed payload is using a ref-count
buffer that we don't know when could
- // release, since the Message is passed to the user. Also, the passed
ByteBuf is coming from network and is
- // backed by a direct buffer which we could not expose as a byte[]
- this.payload = Unpooled.copiedBuffer(payload);
- this.encryptionCtx = encryptionCtx;
+ public static <T> MessageImpl<T> create(String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean pooledMessage) {
+ if (pooledMessage) {
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+ init(msg, topic, messageId, msgMetadata, payload, encryptionCtx,
cnx, schema, redeliveryCount,
+ pooledMessage);
+ return msg;
+ } else {
+ return new MessageImpl<>(topic, messageId, msgMetadata, payload,
encryptionCtx, cnx, schema,
+ redeliveryCount, pooledMessage);
+ }
+ }
+
+ static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean pooledMessage) {
+ msg.msgMetadata.clear();
+ msg.msgMetadata.copyFrom(msgMetadata);
+ msg.messageId = messageId;
+ msg.topic = topic;
+ msg.cnx = cnx;
+ msg.redeliveryCount = redeliveryCount;
+
+ msg.poolMessage = pooledMessage;
+ if (pooledMessage) {
+ msg.payload = payload;
+ payload.retain();
Review comment:
```suggestion
msg.payload = payload.retain();
```
##########
File path:
pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
##########
@@ -167,6 +172,9 @@
@Parameter(names = {"--batch-index-ack" }, description = "Enable or
disable the batch index acknowledgment")
public boolean batchIndexAck = false;
+
+ @Parameter(names = { "-pm", "--pool-messages" }, description = "Use
the pooled message")
+ private boolean poolMessages = false;
Review comment:
Again, for tools, I'd keep it always enabled.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
##########
@@ -77,8 +77,15 @@ public ByteBuffer decode(byte[] data) {
@Override
public ByteBuffer decode(ByteBuf byteBuf) {
+ return decode(byteBuf, null);
+ }
+
+ @Override
+ public ByteBuffer decode(ByteBuf byteBuf, byte[] schemaVersion) {
if (null == byteBuf) {
return null;
+ } else if(byteBuf.isDirect()){
+ return byteBuf.nioBuffer();
Review comment:
We can return the NIO buffer even if it's on the heap
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Message.java
##########
@@ -66,6 +66,13 @@
*/
byte[] getData();
+ /**
+ * Get the message payload size in bytes.
+ *
Review comment:
We should specify wether it's compressed or uncompressed size.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -310,6 +378,11 @@ public boolean publishedEarlierThan(long timestamp) {
if (msgMetadata.isNullValue()) {
return null;
}
+ if (poolMessage) {
Review comment:
Why do we need the special case here?
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
##########
@@ -93,93 +98,156 @@
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
- this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0);
+ this(topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, 0, false);
}
MessageImpl(String topic, MessageIdImpl messageId, MessageMetadata
msgMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount) {
- this.msgMetadata = new MessageMetadata().copyFrom(msgMetadata);
- this.messageId = messageId;
- this.topic = topic;
- this.cnx = cnx;
- this.redeliveryCount = redeliveryCount;
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
+ boolean pooledMessage) {
+ this.msgMetadata = new MessageMetadata();
+ init(this, topic, messageId, msgMetadata, payload, encryptionCtx, cnx,
schema, redeliveryCount, pooledMessage);
+ }
- // Need to make a copy since the passed payload is using a ref-count
buffer that we don't know when could
- // release, since the Message is passed to the user. Also, the passed
ByteBuf is coming from network and is
- // backed by a direct buffer which we could not expose as a byte[]
- this.payload = Unpooled.copiedBuffer(payload);
- this.encryptionCtx = encryptionCtx;
+ public static <T> MessageImpl<T> create(String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean pooledMessage) {
+ if (pooledMessage) {
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+ init(msg, topic, messageId, msgMetadata, payload, encryptionCtx,
cnx, schema, redeliveryCount,
+ pooledMessage);
+ return msg;
+ } else {
+ return new MessageImpl<>(topic, messageId, msgMetadata, payload,
encryptionCtx, cnx, schema,
+ redeliveryCount, pooledMessage);
+ }
+ }
+
+ static <T> void init(MessageImpl<T> msg, String topic, MessageIdImpl
messageId, MessageMetadata msgMetadata,
+ ByteBuf payload, Optional<EncryptionContext> encryptionCtx,
ClientCnx cnx, Schema<T> schema,
+ int redeliveryCount, boolean pooledMessage) {
+ msg.msgMetadata.clear();
+ msg.msgMetadata.copyFrom(msgMetadata);
+ msg.messageId = messageId;
+ msg.topic = topic;
+ msg.cnx = cnx;
+ msg.redeliveryCount = redeliveryCount;
+
+ msg.poolMessage = pooledMessage;
+ if (pooledMessage) {
+ msg.payload = payload;
+ payload.retain();
+ } else {
+ // Need to make a copy since the passed payload is using a
ref-count buffer that we don't know when could
+ // release, since the Message is passed to the user. Also, the
passed ByteBuf is coming from network and is
+ // backed by a direct buffer which we could not expose as a byte[]
+ msg.payload = Unpooled.copiedBuffer(payload);
+ }
+ msg.encryptionCtx = encryptionCtx;
if (msgMetadata.getPropertiesCount() > 0) {
- this.properties =
Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
- .collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue,
- (oldValue,newValue) -> newValue)));
+ msg.properties =
Collections.unmodifiableMap(msgMetadata.getPropertiesList().stream()
+ .collect(Collectors.toMap(KeyValue::getKey,
KeyValue::getValue, (oldValue, newValue) -> newValue)));
} else {
- properties = Collections.emptyMap();
+ msg.properties = Collections.emptyMap();
}
- this.schema = schema;
+ msg.schema = schema;
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata msgMetadata,
- SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
- this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata,
payload, encryptionCtx, cnx, schema, 0);
+ SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx,
+ ClientCnx cnx, Schema<T> schema) {
+ this(topic, batchMessageIdImpl, msgMetadata, singleMessageMetadata,
payload, encryptionCtx, cnx, schema, 0,
+ false);
}
MessageImpl(String topic, BatchMessageIdImpl batchMessageIdImpl,
MessageMetadata batchMetadata,
- SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
- Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount) {
- this.msgMetadata = new MessageMetadata().copyFrom(batchMetadata);
- this.messageId = batchMessageIdImpl;
- this.topic = topic;
- this.cnx = cnx;
- this.redeliveryCount = redeliveryCount;
+ SingleMessageMetadata singleMessageMetadata, ByteBuf payload,
Optional<EncryptionContext> encryptionCtx,
+ ClientCnx cnx, Schema<T> schema, int redeliveryCount, boolean
keepMessageInDirectMemory) {
+ this.msgMetadata = new MessageMetadata();
+ init(this, topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload, encryptionCtx, cnx, schema,
+ redeliveryCount, keepMessageInDirectMemory);
+
+ }
+
+ public static <T> MessageImpl<T> create(String topic, BatchMessageIdImpl
batchMessageIdImpl,
+ MessageMetadata batchMetadata, SingleMessageMetadata
singleMessageMetadata, ByteBuf payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
+ boolean pooledMessage) {
+ if (pooledMessage) {
+ @SuppressWarnings("unchecked")
+ MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
+ init(msg, topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload, encryptionCtx, cnx,
+ schema, redeliveryCount, pooledMessage);
+ return msg;
+ } else {
+ return new MessageImpl<>(topic, batchMessageIdImpl, batchMetadata,
singleMessageMetadata, payload,
+ encryptionCtx, cnx, schema, redeliveryCount,
pooledMessage);
+ }
+ }
- this.payload = Unpooled.copiedBuffer(payload);
- this.encryptionCtx = encryptionCtx;
+ private static <T> void init(MessageImpl<T> msg, String topic,
BatchMessageIdImpl batchMessageIdImpl,
+ MessageMetadata batchMetadata, SingleMessageMetadata
singleMessageMetadata, ByteBuf payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema, int redeliveryCount,
+ boolean poolMessage) {
+ msg.msgMetadata.clear();
+ msg.msgMetadata.copyFrom(batchMetadata);
+ msg.messageId = batchMessageIdImpl;
+ msg.topic = topic;
+ msg.cnx = cnx;
+ msg.redeliveryCount = redeliveryCount;
+
+ msg.poolMessage = poolMessage;
Review comment:
This portion is being repeated from the other `init()` method. Is there
a way to reuse that?
##########
File path:
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
##########
@@ -120,6 +122,21 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
return decode(bytes);
}
+ /**
+ * Decode a ByteBuf into an object using a given version. <br/>
+ * <b>NOTE</b>: This method should not modify reader/writer index of
ByteBuf else it can cause corruption while
+ * accessing same ByteBuf for decoding and deserialization.
+ *
+ * @param byteBuf
+ * the byte array to decode
+ * @param schemaVersion
+ * the schema version to decode the object. null indicates
using latest version.
+ * @return the deserialized object
+ */
+ default T decode(ByteBuf bytes, byte[] schemaVersion) {
Review comment:
Since `Schema` is also part of public API, we can't use Netty `ByteBuf`.
Instead we can use `java.nio.ByteBuffer`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]