This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bcc032d0b Enable key value byte stitching in PulsarMessageBatch 
(#8897)
1bcc032d0b is described below

commit 1bcc032d0b03215e08d5200bd1974af3d00d6476
Author: icefury71 <[email protected]>
AuthorDate: Thu Jul 7 03:49:10 2022 -0700

    Enable key value byte stitching in PulsarMessageBatch (#8897)
    
    * Adding ability to stitch key and value bytes when retrieving single 
message in PulsarMessageBatch
    
    * Adding unit test for key value stitching in PulsarMessageBatch class
    
    * Fixing checkstyle errors
    
    * Bug fix in clearing byte buffer
    
    * Addressing review comments
---
 .../pinot/plugin/stream/pulsar/PulsarConfig.java   |  14 +-
 .../plugin/stream/pulsar/PulsarMessageBatch.java   |  47 ++++-
 .../pulsar/PulsarPartitionLevelConsumer.java       |  14 +-
 .../stream/pulsar/PulsarMessageBatchTest.java      | 199 +++++++++++++++++++++
 4 files changed, 265 insertions(+), 9 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 78db1b766d..c0f210cbbe 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -36,6 +36,7 @@ public class PulsarConfig {
   public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
   public static final String AUTHENTICATION_TOKEN = "authenticationToken";
   public static final String TLS_TRUST_CERTS_FILE_PATH = 
"tlsTrustCertsFilePath";
+  public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
 
   private String _pulsarTopicName;
   private String _subscriberId;
@@ -44,6 +45,7 @@ public class PulsarConfig {
   private SubscriptionInitialPosition _subscriptionInitialPosition;
   private String _authenticationToken;
   private String _tlsTrustCertsFilePath;
+  private boolean _enableKeyValueStitch;
 
   public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
     Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
@@ -56,9 +58,13 @@ public class PulsarConfig {
     _authenticationToken = streamConfigMap.get(authenticationTokenKey);
 
     String tlsTrustCertsFilePathKey = StreamConfigProperties.
-            constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
+        constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
     _tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
 
+    String enableKeyValueStitchKey = StreamConfigProperties.
+        constructStreamProperty(STREAM_TYPE, ENABLE_KEY_VALUE_STITCH);
+    _enableKeyValueStitch = 
Boolean.parseBoolean(streamConfigMap.get(enableKeyValueStitchKey));
+
     Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the 
config");
 
     OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
@@ -84,7 +90,7 @@ public class PulsarConfig {
   }
 
   public SubscriptionInitialPosition getInitialSubscriberPosition() {
-   return _subscriptionInitialPosition;
+    return _subscriptionInitialPosition;
   }
 
   public String getAuthenticationToken() {
@@ -94,4 +100,8 @@ public class PulsarConfig {
   public String getTlsTrustCertsFilePath() {
     return _tlsTrustCertsFilePath;
   }
+
+  public boolean getEnableKeyValueStitch() {
+    return _enableKeyValueStitch;
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
index 514773719f..9655990721 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
@@ -21,6 +21,7 @@ package org.apache.pinot.plugin.stream.pulsar;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pulsar.client.api.Message;
@@ -28,17 +29,26 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * A {@link MessageBatch} for collecting messages from pulsar topic
+ *
+ * When 'enableKeyValueStitch' flag is enabled, existing {@link 
org.apache.pinot.spi.stream.StreamMessageDecoder}
+ * plugins will not work. A custom decoder will be needed to unpack key and 
value byte arrays and decode
+ * them independently.
  */
 public class PulsarMessageBatch implements MessageBatch<byte[]> {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarMessageBatch.class);
   private List<Message<byte[]>> _messageList = new ArrayList<>();
+  private static ByteBuffer _lengthBuf = ByteBuffer.allocate(4);
+  private final boolean _enableKeyValueStitch;
 
-  public PulsarMessageBatch(Iterable<Message<byte[]>> iterable) {
+  public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean 
enableKeyValueStitch) {
     iterable.forEach(_messageList::add);
+    _enableKeyValueStitch = enableKeyValueStitch;
   }
 
   @Override
@@ -48,7 +58,11 @@ public class PulsarMessageBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public byte[] getMessageAtIndex(int index) {
-    return _messageList.get(index).getData();
+    Message<byte[]> msg = _messageList.get(index);
+    if (_enableKeyValueStitch) {
+      return stitchKeyValue(msg.getKeyBytes(), msg.getData());
+    }
+    return msg.getData();
   }
 
   @Override
@@ -58,6 +72,10 @@ public class PulsarMessageBatch implements 
MessageBatch<byte[]> {
 
   @Override
   public int getMessageLengthAtIndex(int index) {
+    if (_enableKeyValueStitch) {
+      Message<byte[]> msg = _messageList.get(index);
+      return 8 + msg.getKeyBytes().length + msg.getData().length;
+    }
     return _messageList.get(index).getData().length;
   }
 
@@ -105,4 +123,27 @@ public class PulsarMessageBatch implements 
MessageBatch<byte[]> {
   public long getNextStreamMessageOffsetAtIndex(int index) {
     throw new UnsupportedOperationException("Pulsar does not support long 
stream offsets");
   }
+
+  /**
+   * Stitch key and value bytes together using a simple format:
+   * 4 bytes for key length + key bytes + 4 bytes for value length + value 
bytes
+   */
+  private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+    int keyLen = keyBytes.length;
+    int valueLen = valueBytes.length;
+    int totalByteArrayLength = 8 + keyLen + valueLen;
+    try {
+      ByteArrayOutputStream bos = new 
ByteArrayOutputStream(totalByteArrayLength);
+      _lengthBuf.clear();
+      bos.write(_lengthBuf.putInt(keyLen).array());
+      bos.write(keyBytes);
+      _lengthBuf.clear();
+      bos.write(_lengthBuf.putInt(valueLen).array());
+      bos.write(valueBytes);
+      return bos.toByteArray();
+    } catch (Exception e) {
+      LOGGER.error("Unable to stitch key and value bytes together", e);
+    }
+    return null;
+  }
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
index 548f90beea..25a7d298bb 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
@@ -46,11 +46,13 @@ public class PulsarPartitionLevelConsumer extends 
PulsarPartitionLevelConnection
     implements PartitionGroupConsumer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
   private final ExecutorService _executorService;
+  private boolean _enableKeyValueStitch = false;
 
   public PulsarPartitionLevelConsumer(String clientId, StreamConfig 
streamConfig,
       PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
     super(clientId, streamConfig, 
partitionGroupConsumptionStatus.getPartitionGroupId());
     _executorService = Executors.newSingleThreadExecutor();
+    _enableKeyValueStitch = _config.getEnableKeyValueStitch();
   }
 
   /**
@@ -76,10 +78,12 @@ public class PulsarPartitionLevelConsumer extends 
PulsarPartitionLevelConnection
       // The fetchMessages has thrown an exception. Most common cause is the 
timeout.
       // We return the records fetched till now along with the next start 
offset.
       pulsarResultFuture.cancel(true);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId));
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
+          _enableKeyValueStitch);
     } catch (Exception e) {
       LOGGER.warn("Error while fetching records from Pulsar", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId));
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
+          _enableKeyValueStitch);
     }
   }
 
@@ -103,10 +107,12 @@ public class PulsarPartitionLevelConsumer extends 
PulsarPartitionLevelConnection
         }
       }
 
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId));
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
+          _enableKeyValueStitch);
     } catch (PulsarClientException e) {
       LOGGER.warn("Error consuming records from Pulsar topic", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId));
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
+          _enableKeyValueStitch);
     }
   }
 
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
new file mode 100644
index 0000000000..15cef93ee3
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PulsarMessageBatchTest {
+  private Random _random = new Random();
+  private DummyPulsarMessage _msgWithKeyAndValue;
+  private byte[] _expectedValueBytes;
+  private byte[] _expectedKeyBytes;
+  private List<Message<byte[]>> _messageList;
+
+  class DummyPulsarMessage implements Message<byte[]> {
+    private final byte[] _keyData;
+    private final byte[] _valueData;
+
+    public DummyPulsarMessage(byte[] key, byte[] value) {
+      _keyData = key;
+      _valueData = value;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+      return null;
+    }
+
+    @Override
+    public boolean hasProperty(String name) {
+      return false;
+    }
+
+    @Override
+    public String getProperty(String name) {
+      return null;
+    }
+
+    @Override
+    public byte[] getData() {
+      return _valueData;
+    }
+
+    @Override
+    public byte[] getValue() {
+      return _valueData;
+    }
+
+    @Override
+    public MessageId getMessageId() {
+      return null;
+    }
+
+    @Override
+    public long getPublishTime() {
+      return 0;
+    }
+
+    @Override
+    public long getEventTime() {
+      return 0;
+    }
+
+    @Override
+    public long getSequenceId() {
+      return 0;
+    }
+
+    @Override
+    public String getProducerName() {
+      return null;
+    }
+
+    @Override
+    public boolean hasKey() {
+      return _keyData == null ? false : true;
+    }
+
+    @Override
+    public String getKey() {
+      return _keyData.toString();
+    }
+
+    @Override
+    public boolean hasBase64EncodedKey() {
+      return false;
+    }
+
+    @Override
+    public byte[] getKeyBytes() {
+      return _keyData;
+    }
+
+    @Override
+    public boolean hasOrderingKey() {
+      return false;
+    }
+
+    @Override
+    public byte[] getOrderingKey() {
+      return new byte[0];
+    }
+
+    @Override
+    public String getTopicName() {
+      return null;
+    }
+
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+      return Optional.empty();
+    }
+
+    @Override
+    public int getRedeliveryCount() {
+      return 0;
+    }
+
+    @Override
+    public byte[] getSchemaVersion() {
+      return new byte[0];
+    }
+
+    @Override
+    public boolean isReplicated() {
+      return false;
+    }
+
+    @Override
+    public String getReplicatedFrom() {
+      return null;
+    }
+  }
+
+  @BeforeClass
+  public void setup() {
+    _expectedValueBytes = new byte[10];
+    _expectedKeyBytes = new byte[10];
+    _random.nextBytes(_expectedValueBytes);
+    _random.nextBytes(_expectedKeyBytes);
+    _msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes, 
_expectedValueBytes);
+    _messageList = new ArrayList<>();
+    _messageList.add(_msgWithKeyAndValue);
+  }
+
+  @Test
+  public void testMessageBatchNoStitching() {
+    PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, 
false);
+    byte[] valueBytes = messageBatch.getMessageAtIndex(0);
+    Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
+  }
+
+  @Test
+  public void testMessageBatchWithStitching() {
+    PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList, 
true);
+    byte[] keyValueBytes = messageBatch.getMessageAtIndex(0);
+    Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length + 
_expectedValueBytes.length);
+    try {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes);
+      int keyLength = byteBuffer.getInt();
+      byte[] keyBytes = new byte[keyLength];
+      byteBuffer.get(keyBytes);
+      Assert.assertArrayEquals(_expectedKeyBytes, keyBytes);
+      int valueLength = byteBuffer.getInt();
+      byte[] valueBytes = new byte[valueLength];
+      byteBuffer.get(valueBytes);
+      Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
+    } catch (Exception e) {
+      Assert.fail("Could not parse key and value bytes because of exception: " 
+ e.getMessage());
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to