sijie closed pull request #1361: Add compactedOut flag for batched messages URL: https://github.com/apache/incubator-pulsar/pull/1361
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java new file mode 100644 index 000000000..af5e80401 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +import java.io.IOException; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; + +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class CompactedOutBatchMessageTest extends ProducerConsumerBase { + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testCompactedOutMessages() throws Exception { + final String ns1 = "my-property/use/con-ns1"; + admin.namespaces().createNamespace(ns1); + final String topic1 = "persistent://" + ns1 + "/my-topic"; + + MessageMetadata metadata = MessageMetadata.newBuilder().setProducerName("foobar") + .setSequenceId(1).setPublishTime(1).setNumMessagesInBatch(3).build(); + + // build a buffer with 4 messages, first and last compacted out + ByteBuf batchBuffer = Unpooled.buffer(1000); + Commands.serializeSingleMessageInBatchWithPayload( + SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key1"), + Unpooled.EMPTY_BUFFER, batchBuffer); + Commands.serializeSingleMessageInBatchWithPayload( + SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key2"), + Unpooled.EMPTY_BUFFER, batchBuffer); + Commands.serializeSingleMessageInBatchWithPayload( + SingleMessageMetadata.newBuilder().setCompactedOut(false).setPartitionKey("key3"), + Unpooled.EMPTY_BUFFER, batchBuffer); + Commands.serializeSingleMessageInBatchWithPayload( + SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key4"), + Unpooled.EMPTY_BUFFER, batchBuffer); + + try (ConsumerImpl<byte[]> consumer + = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1) + .subscriptionName("my-subscriber-name").subscribe()) { + // shove it in the sideways + consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer, + MessageIdData.newBuilder().setLedgerId(1234) + .setEntryId(567).build(), consumer.cnx()); + Message m = consumer.receive(); + assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 1234); + assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 567); + assertEquals(((BatchMessageIdImpl)m.getMessageId()).getBatchIndex(), 2); + assertEquals(m.getKey(), "key3"); + + assertEquals(consumer.numMessagesInQueue(), 0); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index ab812a0b3..68091ee73 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -947,6 +947,14 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc ++skippedMessages; continue; } + if (singleMessageMetadataBuilder.getCompactedOut()) { + // message has been compacted out, so don't send to the user + singleMessagePayload.release(); + singleMessageMetadataBuilder.recycle(); + + ++skippedMessages; + continue; + } BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex(), i); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index b51d29ea0..f3b1f25a1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -830,24 +830,12 @@ public static long initBatchMessageMetadata(PulsarApi.MessageMetadata.Builder me return builder.getSequenceId(); } - public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder msgBuilder, + public static ByteBuf serializeSingleMessageInBatchWithPayload( + PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder, ByteBuf payload, ByteBuf batchBuffer) { - - // build single message meta-data - PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata - .newBuilder(); - if (msgBuilder.hasPartitionKey()) { - singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey()); - } - if (!msgBuilder.getPropertiesList().isEmpty()) { - singleMessageMetadataBuilder = singleMessageMetadataBuilder - .addAllProperties(msgBuilder.getPropertiesList()); - } int payLoadSize = payload.readableBytes(); PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.setPayloadSize(payLoadSize) .build(); - singleMessageMetadataBuilder.recycle(); - // serialize meta-data size, meta-data and payload for single message in batch int singleMsgMetadataSize = singleMessageMetadata.getSerializedSize(); try { @@ -862,6 +850,26 @@ public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.Message return batchBuffer.writeBytes(payload); } + public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder msgBuilder, + ByteBuf payload, ByteBuf batchBuffer) { + + // build single message meta-data + PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata + .newBuilder(); + if (msgBuilder.hasPartitionKey()) { + singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey()); + } + if (!msgBuilder.getPropertiesList().isEmpty()) { + singleMessageMetadataBuilder = singleMessageMetadataBuilder + .addAllProperties(msgBuilder.getPropertiesList()); + } + try { + return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer); + } finally { + singleMessageMetadataBuilder.recycle(); + } + } + public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf uncompressedPayload, PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder, int index, int batchSize) throws IOException { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 0a362b307..4776ee1f8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -4623,6 +4623,10 @@ public Builder clearSchemaVersion() { // required int32 payload_size = 3; boolean hasPayloadSize(); int getPayloadSize(); + + // optional bool compacted_out = 4 [default = false]; + boolean hasCompactedOut(); + boolean getCompactedOut(); } public static final class SingleMessageMetadata extends com.google.protobuf.GeneratedMessageLite @@ -4724,10 +4728,21 @@ public int getPayloadSize() { return payloadSize_; } + // optional bool compacted_out = 4 [default = false]; + public static final int COMPACTED_OUT_FIELD_NUMBER = 4; + private boolean compactedOut_; + public boolean hasCompactedOut() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public boolean getCompactedOut() { + return compactedOut_; + } + private void initFields() { properties_ = java.util.Collections.emptyList(); partitionKey_ = ""; payloadSize_ = 0; + compactedOut_ = false; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4765,6 +4780,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeInt32(3, payloadSize_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBool(4, compactedOut_); + } } private int memoizedSerializedSize = -1; @@ -4785,6 +4803,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt32Size(3, payloadSize_); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(4, compactedOut_); + } memoizedSerializedSize = size; return size; } @@ -4904,6 +4926,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000002); payloadSize_ = 0; bitField0_ = (bitField0_ & ~0x00000004); + compactedOut_ = false; + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -4950,6 +4974,10 @@ public Builder clone() { to_bitField0_ |= 0x00000002; } result.payloadSize_ = payloadSize_; + if (((from_bitField0_ & 0x00000008) == 0x00000008)) { + to_bitField0_ |= 0x00000004; + } + result.compactedOut_ = compactedOut_; result.bitField0_ = to_bitField0_; return result; } @@ -4972,6 +5000,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.SingleMess if (other.hasPayloadSize()) { setPayloadSize(other.getPayloadSize()); } + if (other.hasCompactedOut()) { + setCompactedOut(other.getCompactedOut()); + } return this; } @@ -5027,6 +5058,11 @@ public Builder mergeFrom( payloadSize_ = input.readInt32(); break; } + case 32: { + bitField0_ |= 0x00000008; + compactedOut_ = input.readBool(); + break; + } } } } @@ -5179,6 +5215,27 @@ public Builder clearPayloadSize() { return this; } + // optional bool compacted_out = 4 [default = false]; + private boolean compactedOut_ ; + public boolean hasCompactedOut() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + public boolean getCompactedOut() { + return compactedOut_; + } + public Builder setCompactedOut(boolean value) { + bitField0_ |= 0x00000008; + compactedOut_ = value; + + return this; + } + public Builder clearCompactedOut() { + bitField0_ = (bitField0_ & ~0x00000008); + compactedOut_ = false; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 00f840be2..0c0a50135 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -104,6 +104,7 @@ message SingleMessageMetadata { repeated KeyValue properties = 1; optional string partition_key = 2; required int32 payload_size = 3; + optional bool compacted_out = 4 [default = false]; } enum ServerError { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
