This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1bcc032d0b Enable key value byte stitching in PulsarMessageBatch
(#8897)
1bcc032d0b is described below
commit 1bcc032d0b03215e08d5200bd1974af3d00d6476
Author: icefury71 <[email protected]>
AuthorDate: Thu Jul 7 03:49:10 2022 -0700
Enable key value byte stitching in PulsarMessageBatch (#8897)
* Adding ability to stitch key and value bytes when retrieving single
message in PulsarMessageBatch
* Adding unit test for key value stitching in PulsarMessageBatch class
* Fixing checkstyle errors
* Bug fix in clearing byte buffer
* Addressing review comments
---
.../pinot/plugin/stream/pulsar/PulsarConfig.java | 14 +-
.../plugin/stream/pulsar/PulsarMessageBatch.java | 47 ++++-
.../pulsar/PulsarPartitionLevelConsumer.java | 14 +-
.../stream/pulsar/PulsarMessageBatchTest.java | 199 +++++++++++++++++++++
4 files changed, 265 insertions(+), 9 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
index 78db1b766d..c0f210cbbe 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarConfig.java
@@ -36,6 +36,7 @@ public class PulsarConfig {
public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
public static final String AUTHENTICATION_TOKEN = "authenticationToken";
public static final String TLS_TRUST_CERTS_FILE_PATH =
"tlsTrustCertsFilePath";
+ public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";
private String _pulsarTopicName;
private String _subscriberId;
@@ -44,6 +45,7 @@ public class PulsarConfig {
private SubscriptionInitialPosition _subscriptionInitialPosition;
private String _authenticationToken;
private String _tlsTrustCertsFilePath;
+ private boolean _enableKeyValueStitch;
public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
@@ -56,9 +58,13 @@ public class PulsarConfig {
_authenticationToken = streamConfigMap.get(authenticationTokenKey);
String tlsTrustCertsFilePathKey = StreamConfigProperties.
- constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
+ constructStreamProperty(STREAM_TYPE, TLS_TRUST_CERTS_FILE_PATH);
_tlsTrustCertsFilePath = streamConfigMap.get(tlsTrustCertsFilePathKey);
+ String enableKeyValueStitchKey = StreamConfigProperties.
+ constructStreamProperty(STREAM_TYPE, ENABLE_KEY_VALUE_STITCH);
+ _enableKeyValueStitch =
Boolean.parseBoolean(streamConfigMap.get(enableKeyValueStitchKey));
+
Preconditions.checkNotNull(_bootstrapServers, "No brokers provided in the
config");
OffsetCriteria offsetCriteria = streamConfig.getOffsetCriteria();
@@ -84,7 +90,7 @@ public class PulsarConfig {
}
public SubscriptionInitialPosition getInitialSubscriberPosition() {
- return _subscriptionInitialPosition;
+ return _subscriptionInitialPosition;
}
public String getAuthenticationToken() {
@@ -94,4 +100,8 @@ public class PulsarConfig {
public String getTlsTrustCertsFilePath() {
return _tlsTrustCertsFilePath;
}
+
+ public boolean getEnableKeyValueStitch() {
+ return _enableKeyValueStitch;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
index 514773719f..9655990721 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatch.java
@@ -21,6 +21,7 @@ package org.apache.pinot.plugin.stream.pulsar;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pulsar.client.api.Message;
@@ -28,17 +29,26 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link MessageBatch} for collecting messages from pulsar topic
+ *
+ * When 'enableKeyValueStitch' flag is enabled, existing {@link
org.apache.pinot.spi.stream.StreamMessageDecoder}
+ * plugins will not work. A custom decoder will be needed to unpack key and
value byte arrays and decode
+ * them independently.
*/
public class PulsarMessageBatch implements MessageBatch<byte[]> {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarMessageBatch.class);
private List<Message<byte[]>> _messageList = new ArrayList<>();
+ private static ByteBuffer _lengthBuf = ByteBuffer.allocate(4);
+ private final boolean _enableKeyValueStitch;
- public PulsarMessageBatch(Iterable<Message<byte[]>> iterable) {
+ public PulsarMessageBatch(Iterable<Message<byte[]>> iterable, boolean
enableKeyValueStitch) {
iterable.forEach(_messageList::add);
+ _enableKeyValueStitch = enableKeyValueStitch;
}
@Override
@@ -48,7 +58,11 @@ public class PulsarMessageBatch implements
MessageBatch<byte[]> {
@Override
public byte[] getMessageAtIndex(int index) {
- return _messageList.get(index).getData();
+ Message<byte[]> msg = _messageList.get(index);
+ if (_enableKeyValueStitch) {
+ return stitchKeyValue(msg.getKeyBytes(), msg.getData());
+ }
+ return msg.getData();
}
@Override
@@ -58,6 +72,10 @@ public class PulsarMessageBatch implements
MessageBatch<byte[]> {
@Override
public int getMessageLengthAtIndex(int index) {
+ if (_enableKeyValueStitch) {
+ Message<byte[]> msg = _messageList.get(index);
+ return 8 + msg.getKeyBytes().length + msg.getData().length;
+ }
return _messageList.get(index).getData().length;
}
@@ -105,4 +123,27 @@ public class PulsarMessageBatch implements
MessageBatch<byte[]> {
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException("Pulsar does not support long
stream offsets");
}
+
+ /**
+ * Stitch key and value bytes together using a simple format:
+ * 4 bytes for key length + key bytes + 4 bytes for value length + value
bytes
+ */
+ private byte[] stitchKeyValue(byte[] keyBytes, byte[] valueBytes) {
+ int keyLen = keyBytes.length;
+ int valueLen = valueBytes.length;
+ int totalByteArrayLength = 8 + keyLen + valueLen;
+ try {
+ ByteArrayOutputStream bos = new
ByteArrayOutputStream(totalByteArrayLength);
+ _lengthBuf.clear();
+ bos.write(_lengthBuf.putInt(keyLen).array());
+ bos.write(keyBytes);
+ _lengthBuf.clear();
+ bos.write(_lengthBuf.putInt(valueLen).array());
+ bos.write(valueBytes);
+ return bos.toByteArray();
+ } catch (Exception e) {
+ LOGGER.error("Unable to stitch key and value bytes together", e);
+ }
+ return null;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
index 548f90beea..25a7d298bb 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java
@@ -46,11 +46,13 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
implements PartitionGroupConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
private final ExecutorService _executorService;
+ private boolean _enableKeyValueStitch = false;
public PulsarPartitionLevelConsumer(String clientId, StreamConfig
streamConfig,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
super(clientId, streamConfig,
partitionGroupConsumptionStatus.getPartitionGroupId());
_executorService = Executors.newSingleThreadExecutor();
+ _enableKeyValueStitch = _config.getEnableKeyValueStitch();
}
/**
@@ -76,10 +78,12 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
// The fetchMessages has thrown an exception. Most common cause is the
timeout.
// We return the records fetched till now along with the next start
offset.
pulsarResultFuture.cancel(true);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId));
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId),
+ _enableKeyValueStitch);
} catch (Exception e) {
LOGGER.warn("Error while fetching records from Pulsar", e);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId));
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId),
+ _enableKeyValueStitch);
}
}
@@ -103,10 +107,12 @@ public class PulsarPartitionLevelConsumer extends
PulsarPartitionLevelConnection
}
}
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId));
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId),
+ _enableKeyValueStitch);
} catch (PulsarClientException e) {
LOGGER.warn("Error consuming records from Pulsar topic", e);
- return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId));
+ return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList,
startMessageId, endMessageId),
+ _enableKeyValueStitch);
}
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
new file mode 100644
index 0000000000..15cef93ee3
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarMessageBatchTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.pulsar;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.EncryptionContext;
+import org.junit.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PulsarMessageBatchTest {
+ private Random _random = new Random();
+ private DummyPulsarMessage _msgWithKeyAndValue;
+ private byte[] _expectedValueBytes;
+ private byte[] _expectedKeyBytes;
+ private List<Message<byte[]>> _messageList;
+
+ class DummyPulsarMessage implements Message<byte[]> {
+ private final byte[] _keyData;
+ private final byte[] _valueData;
+
+ public DummyPulsarMessage(byte[] key, byte[] value) {
+ _keyData = key;
+ _valueData = value;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return null;
+ }
+
+ @Override
+ public boolean hasProperty(String name) {
+ return false;
+ }
+
+ @Override
+ public String getProperty(String name) {
+ return null;
+ }
+
+ @Override
+ public byte[] getData() {
+ return _valueData;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return _valueData;
+ }
+
+ @Override
+ public MessageId getMessageId() {
+ return null;
+ }
+
+ @Override
+ public long getPublishTime() {
+ return 0;
+ }
+
+ @Override
+ public long getEventTime() {
+ return 0;
+ }
+
+ @Override
+ public long getSequenceId() {
+ return 0;
+ }
+
+ @Override
+ public String getProducerName() {
+ return null;
+ }
+
+ @Override
+ public boolean hasKey() {
+ return _keyData == null ? false : true;
+ }
+
+ @Override
+ public String getKey() {
+ return _keyData.toString();
+ }
+
+ @Override
+ public boolean hasBase64EncodedKey() {
+ return false;
+ }
+
+ @Override
+ public byte[] getKeyBytes() {
+ return _keyData;
+ }
+
+ @Override
+ public boolean hasOrderingKey() {
+ return false;
+ }
+
+ @Override
+ public byte[] getOrderingKey() {
+ return new byte[0];
+ }
+
+ @Override
+ public String getTopicName() {
+ return null;
+ }
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return Optional.empty();
+ }
+
+ @Override
+ public int getRedeliveryCount() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getSchemaVersion() {
+ return new byte[0];
+ }
+
+ @Override
+ public boolean isReplicated() {
+ return false;
+ }
+
+ @Override
+ public String getReplicatedFrom() {
+ return null;
+ }
+ }
+
+ @BeforeClass
+ public void setup() {
+ _expectedValueBytes = new byte[10];
+ _expectedKeyBytes = new byte[10];
+ _random.nextBytes(_expectedValueBytes);
+ _random.nextBytes(_expectedKeyBytes);
+ _msgWithKeyAndValue = new DummyPulsarMessage(_expectedKeyBytes,
_expectedValueBytes);
+ _messageList = new ArrayList<>();
+ _messageList.add(_msgWithKeyAndValue);
+ }
+
+ @Test
+ public void testMessageBatchNoStitching() {
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList,
false);
+ byte[] valueBytes = messageBatch.getMessageAtIndex(0);
+ Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
+ }
+
+ @Test
+ public void testMessageBatchWithStitching() {
+ PulsarMessageBatch messageBatch = new PulsarMessageBatch(_messageList,
true);
+ byte[] keyValueBytes = messageBatch.getMessageAtIndex(0);
+ Assert.assertEquals(keyValueBytes.length, 8 + _expectedKeyBytes.length +
_expectedValueBytes.length);
+ try {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(keyValueBytes);
+ int keyLength = byteBuffer.getInt();
+ byte[] keyBytes = new byte[keyLength];
+ byteBuffer.get(keyBytes);
+ Assert.assertArrayEquals(_expectedKeyBytes, keyBytes);
+ int valueLength = byteBuffer.getInt();
+ byte[] valueBytes = new byte[valueLength];
+ byteBuffer.get(valueBytes);
+ Assert.assertArrayEquals(_expectedValueBytes, valueBytes);
+ } catch (Exception e) {
+ Assert.fail("Could not parse key and value bytes because of exception: "
+ e.getMessage());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]