sijie closed pull request #1361: Add compactedOut flag for batched messages
URL: https://github.com/apache/incubator-pulsar/pull/1361
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
new file mode 100644
index 000000000..af5e80401
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCompactedOutMessages() throws Exception {
+        final String ns1 = "my-property/use/con-ns1";
+        admin.namespaces().createNamespace(ns1);
+        final String topic1 = "persistent://" + ns1 + "/my-topic";
+
+        MessageMetadata metadata = 
MessageMetadata.newBuilder().setProducerName("foobar")
+            
.setSequenceId(1).setPublishTime(1).setNumMessagesInBatch(3).build();
+
+        // build a buffer with 4 messages, first and last compacted out
+        ByteBuf batchBuffer = Unpooled.buffer(1000);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                
SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key1"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                
SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key2"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                
SingleMessageMetadata.newBuilder().setCompactedOut(false).setPartitionKey("key3"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                
SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key4"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+
+        try (ConsumerImpl<byte[]> consumer
+             = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name").subscribe()) {
+            // shove it in the sideways
+            consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer,
+                                                        
MessageIdData.newBuilder().setLedgerId(1234)
+                                                        
.setEntryId(567).build(), consumer.cnx());
+            Message m = consumer.receive();
+            assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 
1234);
+            assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 
567);
+            
assertEquals(((BatchMessageIdImpl)m.getMessageId()).getBatchIndex(), 2);
+            assertEquals(m.getKey(), "key3");
+
+            assertEquals(consumer.numMessagesInQueue(), 0);
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index ab812a0b3..68091ee73 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -947,6 +947,14 @@ void receiveIndividualMessagesFromBatch(MessageMetadata 
msgMetadata, ByteBuf unc
                     ++skippedMessages;
                     continue;
                 }
+                if (singleMessageMetadataBuilder.getCompactedOut()) {
+                    // message has been compacted out, so don't send to the 
user
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
+
+                    ++skippedMessages;
+                    continue;
+                }
 
                 BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), getPartitionIndex(), i);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index b51d29ea0..f3b1f25a1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -830,24 +830,12 @@ public static long 
initBatchMessageMetadata(PulsarApi.MessageMetadata.Builder me
         return builder.getSequenceId();
     }
 
-    public static ByteBuf 
serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder 
msgBuilder,
+    public static ByteBuf serializeSingleMessageInBatchWithPayload(
+            PulsarApi.SingleMessageMetadata.Builder 
singleMessageMetadataBuilder,
             ByteBuf payload, ByteBuf batchBuffer) {
-
-        // build single message meta-data
-        PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
PulsarApi.SingleMessageMetadata
-                .newBuilder();
-        if (msgBuilder.hasPartitionKey()) {
-            singleMessageMetadataBuilder = 
singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey());
-        }
-        if (!msgBuilder.getPropertiesList().isEmpty()) {
-            singleMessageMetadataBuilder = singleMessageMetadataBuilder
-                    .addAllProperties(msgBuilder.getPropertiesList());
-        }
         int payLoadSize = payload.readableBytes();
         PulsarApi.SingleMessageMetadata singleMessageMetadata = 
singleMessageMetadataBuilder.setPayloadSize(payLoadSize)
                 .build();
-        singleMessageMetadataBuilder.recycle();
-
         // serialize meta-data size, meta-data and payload for single message 
in batch
         int singleMsgMetadataSize = singleMessageMetadata.getSerializedSize();
         try {
@@ -862,6 +850,26 @@ public static ByteBuf 
serializeSingleMessageInBatchWithPayload(PulsarApi.Message
         return batchBuffer.writeBytes(payload);
     }
 
+    public static ByteBuf 
serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder 
msgBuilder,
+            ByteBuf payload, ByteBuf batchBuffer) {
+
+        // build single message meta-data
+        PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = 
PulsarApi.SingleMessageMetadata
+                .newBuilder();
+        if (msgBuilder.hasPartitionKey()) {
+            singleMessageMetadataBuilder = 
singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey());
+        }
+        if (!msgBuilder.getPropertiesList().isEmpty()) {
+            singleMessageMetadataBuilder = singleMessageMetadataBuilder
+                    .addAllProperties(msgBuilder.getPropertiesList());
+        }
+        try {
+            return 
serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, 
batchBuffer);
+        } finally {
+            singleMessageMetadataBuilder.recycle();
+        }
+    }
+
     public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf 
uncompressedPayload,
             PulsarApi.SingleMessageMetadata.Builder 
singleMessageMetadataBuilder, int index, int batchSize)
             throws IOException {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 0a362b307..4776ee1f8 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4623,6 +4623,10 @@ public Builder clearSchemaVersion() {
     // required int32 payload_size = 3;
     boolean hasPayloadSize();
     int getPayloadSize();
+    
+    // optional bool compacted_out = 4 [default = false];
+    boolean hasCompactedOut();
+    boolean getCompactedOut();
   }
   public static final class SingleMessageMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -4724,10 +4728,21 @@ public int getPayloadSize() {
       return payloadSize_;
     }
     
+    // optional bool compacted_out = 4 [default = false];
+    public static final int COMPACTED_OUT_FIELD_NUMBER = 4;
+    private boolean compactedOut_;
+    public boolean hasCompactedOut() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public boolean getCompactedOut() {
+      return compactedOut_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
       payloadSize_ = 0;
+      compactedOut_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4765,6 +4780,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeInt32(3, payloadSize_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(4, compactedOut_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4785,6 +4803,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(3, payloadSize_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, compactedOut_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4904,6 +4926,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000002);
         payloadSize_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        compactedOut_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -4950,6 +4974,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000002;
         }
         result.payloadSize_ = payloadSize_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.compactedOut_ = compactedOut_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4972,6 +5000,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.SingleMess
         if (other.hasPayloadSize()) {
           setPayloadSize(other.getPayloadSize());
         }
+        if (other.hasCompactedOut()) {
+          setCompactedOut(other.getCompactedOut());
+        }
         return this;
       }
       
@@ -5027,6 +5058,11 @@ public Builder mergeFrom(
               payloadSize_ = input.readInt32();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              compactedOut_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -5179,6 +5215,27 @@ public Builder clearPayloadSize() {
         return this;
       }
       
+      // optional bool compacted_out = 4 [default = false];
+      private boolean compactedOut_ ;
+      public boolean hasCompactedOut() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public boolean getCompactedOut() {
+        return compactedOut_;
+      }
+      public Builder setCompactedOut(boolean value) {
+        bitField0_ |= 0x00000008;
+        compactedOut_ = value;
+        
+        return this;
+      }
+      public Builder clearCompactedOut() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        compactedOut_ = false;
+        
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 00f840be2..0c0a50135 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -104,6 +104,7 @@ message SingleMessageMetadata {
        repeated KeyValue properties    = 1;
        optional string partition_key   = 2;
        required int32 payload_size     = 3;
+        optional bool compacted_out     = 4 [default = false];
 }
 
 enum ServerError {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to