eolivelli commented on code in PR #16605:
URL: https://github.com/apache/pulsar/pull/16605#discussion_r930974415
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java:
##########
@@ -1096,7 +1096,6 @@ public void testHasMessageAvailableWithBatch() throws
Exception {
ReaderImpl<byte[]> reader =
(ReaderImpl<byte[]>)pulsarClient.newReader().topic(topicName)
.startMessageId(messageId).startMessageIdInclusive().create();
MessageIdImpl lastMsgId = (MessageIdImpl)
reader.getConsumer().getLastMessageId();
- assertTrue(messageId instanceof BatchMessageIdImpl);
Review Comment:
this test becomes wrong, the name is `testHasMessageAvailableWithBatch`
we must rework the test in a way that we are producing batch messages
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -197,6 +197,26 @@ public boolean isMultiBatches() {
@Override
public OpSendMsg createOpSendMsg() throws IOException {
+ if (messages.size() == 1) {
+ MessageImpl<?> msg = messages.get(0);
+ messageMetadata = msg.getMessageBuilder();
+ ByteBuf encryptedPayload = producer.encryptMessage(
+ messageMetadata,
+ producer.applyCompression(msg.getDataBuffer()));
+ if (encryptedPayload.readableBytes() >
ClientCnx.getMaxMessageSize()) {
+ discard(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ return null;
+ }
+ ByteBufPair cmd = producer.sendMessage(producer.producerId,
messageMetadata.getSequenceId(),
+ 1, messageMetadata, encryptedPayload);
+ final OpSendMsg op;
+ op = OpSendMsg.create(msg, cmd, messageMetadata.getSequenceId(),
firstCallback);
+ op.setNumMessagesInBatch(1);
Review Comment:
I don't think we have to set this header
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java:
##########
@@ -197,6 +197,26 @@ public boolean isMultiBatches() {
@Override
public OpSendMsg createOpSendMsg() throws IOException {
+ if (messages.size() == 1) {
+ MessageImpl<?> msg = messages.get(0);
+ messageMetadata = msg.getMessageBuilder();
+ ByteBuf encryptedPayload = producer.encryptMessage(
+ messageMetadata,
+ producer.applyCompression(msg.getDataBuffer()));
+ if (encryptedPayload.readableBytes() >
ClientCnx.getMaxMessageSize()) {
+ discard(new PulsarClientException.InvalidMessageException(
+ "Message size is bigger than " +
ClientCnx.getMaxMessageSize() + " bytes"));
+ return null;
+ }
+ ByteBufPair cmd = producer.sendMessage(producer.producerId,
messageMetadata.getSequenceId(),
+ 1, messageMetadata, encryptedPayload);
+ final OpSendMsg op;
+ op = OpSendMsg.create(msg, cmd, messageMetadata.getSequenceId(),
firstCallback);
+ op.setNumMessagesInBatch(1);
+ op.setBatchSizeByte(encryptedPayload.readableBytes());
Review Comment:
I don't think we have to set this header
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java:
##########
@@ -237,9 +237,8 @@ public void testBatchMessage() throws Exception {
Assert.assertTrue(entryMetadata.getBrokerTimestamp() >= sendTime);
Assert.assertEquals(entryMetadata.getIndex(), 0);
System.out.println(message.getProperties());
-
Assert.assertEquals(Integer.parseInt(message.getProperty(BATCH_HEADER)), 1);
Review Comment:
it looks like we are changing the behaviour of the test,
maybe in the test we have to send 2 messages in order to trigger the
creation of a batch message
##########
pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java:
##########
@@ -261,7 +261,6 @@ public void testDisableBatching() throws Exception {
Assert.assertNotNull(msg);
if (i < numberOfMessages) {
Assert.assertEquals(new String(msg.getData()), "batched");
- Assert.assertTrue(msg.getMessageId() instanceof
BatchMessageIdImpl);
Review Comment:
this test becomes wrong, the name is `testDisableBatching `
we must rework the test in a way that we are producing batch messages
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -425,7 +425,7 @@ CompletableFuture<MessageId>
internalSendWithTxnAsync(Message<?> message, Transa
* @param payload
* @return a new payload
*/
- private ByteBuf applyCompression(ByteBuf payload) {
+ protected ByteBuf applyCompression(ByteBuf payload) {
Review Comment:
instead of opening the visibility of this method, maybe it is better to put
here in ProducerImpl a method that creates the OpSendMsg.
I am not sure it is the better way, but maybe you take try to see if the
code will look better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]