This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 93b74b5 [Pulsar SQL] Support query chunked messages feature in Pulsar
SQL (#12720)
93b74b5 is described below
commit 93b74b5498ced9e75512c593e6e5a9f5a6c8f26b
Author: ran <[email protected]>
AuthorDate: Mon Dec 13 11:20:47 2021 +0800
[Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720)
### Motivation
Currently, the Pulsar SQL didn't support query chunked messages.
### Modifications
Add a chunked message map in `PulsarRecordCursor` to maintain incomplete
chunked messages, if one chunked message was received completely, it will be
offered in the message queue to wait for deserialization.
---
.../apache/pulsar/client/impl/ProducerImpl.java | 9 +
.../apache/pulsar/common/api/raw/RawMessage.java | 29 +++
.../pulsar/common/api/raw/RawMessageImpl.java | 44 +++++
.../pulsar/sql/presto/PulsarRecordCursor.java | 141 ++++++++++++--
.../pulsar/sql/presto/TestReadChunkedMessages.java | 214 +++++++++++++++++++++
5 files changed, 424 insertions(+), 13 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 1652113..dbebfb9 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -478,7 +478,16 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
sequenceId = msgMetadata.getSequenceId();
}
String uuid = totalChunks > 1 ? String.format("%s-%d",
producerName, sequenceId) : null;
+ byte[] schemaVersion = totalChunks > 1 &&
msg.getMessageBuilder().hasSchemaVersion() ?
+ msg.getMessageBuilder().getSchemaVersion() : null;
for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
+ // Need to reset the schemaVersion, because the
schemaVersion is based on a ByteBuf object in
+ // `MessageMetadata`, if we want to re-serialize the
`SEND` command using a same `MessageMetadata`,
+ // we need to reset the ByteBuf of the schemaVersion in
`MessageMetadata`, I think we need to
+ // reset `ByteBuf` objects in `MessageMetadata` after call
the method `MessageMetadata#writeTo()`.
+ if (chunkId > 0 && schemaVersion != null) {
+
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+ }
serializeAndSendMessage(msg, payload, sequenceId, uuid,
chunkId, totalChunks,
readStartIndex, ClientCnx.getMaxMessageSize(),
compressedPayload, compressed,
compressedPayload.readableBytes(),
uncompressedSize, callback);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
index d093628..483b5a3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java
@@ -121,4 +121,33 @@ public interface RawMessage {
* @return true if the key is base64 encoded, false otherwise
*/
boolean hasBase64EncodedKey();
+
+ /**
+ * Get uuid of chunked message.
+ *
+ * @return uuid
+ */
+ String getUUID();
+
+ /**
+ * Get chunkId of chunked message.
+ *
+ * @return chunkId
+ */
+ int getChunkId();
+
+ /**
+ * Get chunk num of chunked message.
+ *
+ * @return chunk num
+ */
+ int getNumChunksFromMsg();
+
+ /**
+ * Get chunk message total size in bytes.
+ *
+ * @return chunked message total size in bytes
+ */
+ int getTotalChunkMsgSize();
+
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
index defc1b4..3aa0cbc 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java
@@ -81,6 +81,14 @@ public class RawMessageImpl implements RawMessage {
return msg;
}
+ public RawMessage updatePayloadForChunkedMessage(ByteBuf
chunkedTotalPayload) {
+ if (!msgMetadata.getMetadata().hasNumChunksFromMsg() ||
msgMetadata.getMetadata().getNumChunksFromMsg() <= 1) {
+ throw new RuntimeException("The update payload operation only
support multi chunked messages.");
+ }
+ payload = chunkedTotalPayload;
+ return this;
+ }
+
@Override
public Map<String, String> getProperties() {
if (singleMessageMetadata != null &&
singleMessageMetadata.getPropertiesCount() > 0) {
@@ -170,6 +178,42 @@ public class RawMessageImpl implements RawMessage {
return msgMetadata.getMetadata().isPartitionKeyB64Encoded();
}
+ @Override
+ public String getUUID() {
+ if (msgMetadata.getMetadata().hasUuid()) {
+ return msgMetadata.getMetadata().getUuid();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getChunkId() {
+ if (msgMetadata.getMetadata().hasChunkId()) {
+ return msgMetadata.getMetadata().getChunkId();
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int getNumChunksFromMsg() {
+ if (msgMetadata.getMetadata().hasNumChunksFromMsg()) {
+ return msgMetadata.getMetadata().getNumChunksFromMsg();
+ } else {
+ return -1;
+ }
+ }
+
+ @Override
+ public int getTotalChunkMsgSize() {
+ if (msgMetadata.getMetadata().hasTotalChunkMsgSize()) {
+ return msgMetadata.getMetadata().getTotalChunkMsgSize();
+ } else {
+ return -1;
+ }
+ }
+
public int getBatchSize() {
return msgMetadata.getMetadata().getNumMessagesInBatch();
}
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index b1230d3..558b87b 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -29,6 +29,9 @@ import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.ReferenceCountUtil;
import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.block.Block;
@@ -58,6 +61,8 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.api.raw.RawMessageIdImpl;
+import org.apache.pulsar.common.api.raw.RawMessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
@@ -66,6 +71,7 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
@@ -112,6 +118,8 @@ public class PulsarRecordCursor implements RecordCursor {
PulsarDispatchingRowDecoderFactory decoderFactory;
+ protected ConcurrentOpenHashMap<String, ChunkedMessageCtx>
chunkedMessagesMap = new ConcurrentOpenHashMap<>();
+
private static final Logger log = Logger.get(PulsarRecordCursor.class);
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles,
PulsarSplit pulsarSplit,
@@ -265,7 +273,8 @@ public class PulsarRecordCursor implements RecordCursor {
metricsTracker.register_BYTES_READ(bytes);
// check if we have processed all entries in this
split
- if (((PositionImpl)
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) {
+ // and no incomplete chunked messages exist
+ if (entryExceedSplitEndPosition(entry) &&
chunkedMessagesMap.isEmpty()) {
return;
}
@@ -279,15 +288,25 @@ public class PulsarRecordCursor implements RecordCursor {
// start time for message
queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
- while (true) {
- if
(!haveAvailableCacheSize(
-
messageQueueCacheSizeAllocator, messageQueue)
- ||
!messageQueue.offer(message)) {
- Thread.sleep(1);
- } else {
-
messageQueueCacheSizeAllocator.allocate(
-
message.getData().readableBytes());
- break;
+ if
(message.getNumChunksFromMsg() > 1) {
+ message =
processChunkedMessages(message);
+ } else if
(entryExceedSplitEndPosition(entry)) {
+ // skip no chunk or no
multi chunk message
+ // that exceed split end
position
+ message.release();
+ message = null;
+ }
+ if (message != null) {
+ while (true) {
+ if
(!haveAvailableCacheSize(
+
messageQueueCacheSizeAllocator, messageQueue)
+ ||
!messageQueue.offer(message)) {
+ Thread.sleep(1);
+ } else {
+
messageQueueCacheSizeAllocator.allocate(
+
message.getData().readableBytes());
+ break;
+ }
}
}
@@ -328,6 +347,10 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
+ private boolean entryExceedSplitEndPosition(Entry entry) {
+ return ((PositionImpl)
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0;
+ }
+
@VisibleForTesting
class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
@@ -341,8 +364,9 @@ public class PulsarRecordCursor implements RecordCursor {
public void run() {
if (outstandingReadsRequests.get() > 0) {
- if (!cursor.hasMoreEntries() || ((PositionImpl)
cursor.getReadPosition())
- .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+ if (!cursor.hasMoreEntries() ||
+ (((PositionImpl)
cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0
+ && chunkedMessagesMap.isEmpty())) {
isDone = true;
} else {
@@ -408,7 +432,7 @@ public class PulsarRecordCursor implements RecordCursor {
public boolean hasFinished() {
return messageQueue.isEmpty() && isDone &&
outstandingReadsRequests.get() >= 1
- && splitSize <= entriesProcessed;
+ && splitSize <= entriesProcessed &&
chunkedMessagesMap.isEmpty();
}
@Override
@@ -732,4 +756,95 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
+ private RawMessage processChunkedMessages(RawMessage message) {
+ final String uuid = message.getUUID();
+ final int chunkId = message.getChunkId();
+ final int totalChunkMsgSize = message.getTotalChunkMsgSize();
+ final int numChunks = message.getNumChunksFromMsg();
+
+ RawMessageIdImpl rawMessageId = (RawMessageIdImpl)
message.getMessageId();
+ if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId()
+ && !chunkedMessagesMap.containsKey(uuid)) {
+ // If the message is out of the split range, we only care about
the incomplete chunked messages.
+ message.release();
+ return null;
+ }
+ if (chunkId == 0) {
+ ByteBuf chunkedMsgBuffer =
Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize);
+ chunkedMessagesMap.computeIfAbsent(uuid, (key) ->
ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer));
+ }
+
+ ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid);
+ if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
+ || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) ||
chunkId >= numChunks) {
+ // Means we lost the first chunk, it will happen when the
beginning chunk didn't belong to this split.
+ log.info("Received unexpected chunk. messageId: %s, last-chunk-id:
%s chunkId: %s, totalChunks: %s",
+ message.getMessageId(),
+ (chunkedMsgCtx != null ?
chunkedMsgCtx.lastChunkedMessageId : null), chunkId,
+ numChunks);
+ if (chunkedMsgCtx != null) {
+ if (chunkedMsgCtx.chunkedMsgBuffer != null) {
+
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
+ }
+ chunkedMsgCtx.recycle();
+ }
+ chunkedMessagesMap.remove(uuid);
+ message.release();
+ return null;
+ }
+
+ // append the chunked payload and update lastChunkedMessage-id
+ chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData());
+ chunkedMsgCtx.lastChunkedMessageId = chunkId;
+
+ // if final chunk is not received yet then release payload and return
+ if (chunkId != (numChunks - 1)) {
+ message.release();
+ return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Chunked message completed. chunkId: %s, totalChunks:
%s, msgId: %s, sequenceId: %s",
+ chunkId, numChunks, rawMessageId, message.getSequenceId());
+ }
+ chunkedMessagesMap.remove(uuid);
+ ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
+ chunkedMsgCtx.recycle();
+ // The chunked message complete, we use the entire payload to instead
of the last chunk payload.
+ return ((RawMessageImpl)
message).updatePayloadForChunkedMessage(unCompressedPayload);
+ }
+
+ static class ChunkedMessageCtx {
+
+ protected int totalChunks = -1;
+ protected ByteBuf chunkedMsgBuffer;
+ protected int lastChunkedMessageId = -1;
+
+ static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf
chunkedMsgBuffer) {
+ ChunkedMessageCtx ctx = RECYCLER.get();
+ ctx.totalChunks = numChunksFromMsg;
+ ctx.chunkedMsgBuffer = chunkedMsgBuffer;
+ return ctx;
+ }
+
+ private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle;
+
+ private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx>
recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<ChunkedMessageCtx> RECYCLER = new
Recycler<ChunkedMessageCtx>() {
+ protected ChunkedMessageCtx
newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
+ return new ChunkedMessageCtx(handle);
+ }
+ };
+
+ public void recycle() {
+ this.totalChunks = -1;
+ this.chunkedMsgBuffer = null;
+ this.lastChunkedMessageId = -1;
+ recyclerHandle.recycle(this);
+ }
+ }
+
}
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
new file mode 100644
index 0000000..0a02dc3
--- /dev/null
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.google.common.collect.Sets;
+import io.prestosql.spi.connector.ConnectorContext;
+import io.prestosql.spi.predicate.TupleDomain;
+import io.prestosql.testing.TestingConnectorContext;
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test read chunked messages.
+ */
+@Test
+@Slf4j
+public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest {
+
+ private final static int MAX_MESSAGE_SIZE = 1024 * 1024;
+
+ @EqualsAndHashCode
+ @Data
+ static class Movie {
+ private String name;
+ private Long publishTime;
+ private byte[] binaryData;
+ }
+
+ @EqualsAndHashCode
+ @Data
+ static class MovieMessage {
+ private Movie movie;
+ private String messageId;
+ }
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ conf.setMaxMessageSize(MAX_MESSAGE_SIZE);
+ conf.setManagedLedgerMaxEntriesPerLedger(5);
+ conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+ internalSetup();
+
+ admin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+
+ // so that clients can test short names
+ admin.tenants().createTenant("public",
+ new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("public/default");
+ admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet("test"));
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test
+ public void queryTest() throws Exception {
+ String topic = "chunk-topic";
+ TopicName topicName = TopicName.get(topic);
+ int messageCnt = 20;
+ Set<MovieMessage> messageSet = prepareChunkedData(topic, messageCnt);
+ SchemaInfo schemaInfo = Schema.AVRO(Movie.class).getSchemaInfo();
+
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+ connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress());
+ PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new
PulsarConnectorId("1"), connectorConfig);
+ Collection<PulsarSplit> splits = pulsarSplitManager.getSplitsForTopic(
+ topicName.getPersistenceNamingEncoding(),
+ pulsar.getManagedLedgerFactory(),
+ new ManagedLedgerConfig(),
+ 3,
+ new PulsarTableHandle("1", topicName.getNamespace(), topic,
topic),
+ schemaInfo,
+ topic,
+ TupleDomain.all(),
+ null);
+
+ List<PulsarColumnHandle> columnHandleList =
TestPulsarConnector.getColumnColumnHandles(
+ topicName, schemaInfo,
PulsarColumnHandle.HandleKeyValueType.NONE, true);
+ ConnectorContext prestoConnectorContext = new
TestingConnectorContext();
+
+ for (PulsarSplit split : splits) {
+ queryAndCheck(columnHandleList, split, connectorConfig,
prestoConnectorContext, messageSet);
+ }
+ Assert.assertTrue(messageSet.isEmpty());
+ }
+
+ private Set<MovieMessage> prepareChunkedData(String topic, int messageCnt)
throws PulsarClientException, InterruptedException {
+ pulsarClient.newConsumer(Schema.AVRO(Movie.class))
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribe()
+ .close();
+ Producer<Movie> producer =
pulsarClient.newProducer(Schema.AVRO(Movie.class))
+ .topic(topic)
+ .enableBatching(false)
+ .enableChunking(true)
+ .create();
+ Set<MovieMessage> messageSet = new LinkedHashSet<>();
+ CountDownLatch countDownLatch = new CountDownLatch(messageCnt);
+ for (int i = 0; i < messageCnt; i++) {
+ final double dataTimes = (i % 5) * 0.5;
+ byte[] movieBinaryData = RandomUtils.nextBytes((int)
(MAX_MESSAGE_SIZE * dataTimes));
+ final int length = movieBinaryData.length;
+ final int index = i;
+
+ Movie movie = new Movie();
+ movie.setName("movie-" + i);
+ movie.setPublishTime(System.currentTimeMillis());
+ movie.setBinaryData(movieBinaryData);
+ producer.newMessage().value(movie).sendAsync()
+ .whenComplete((msgId, throwable) -> {
+ if (throwable != null) {
+ log.error("Failed to produce message.", throwable);
+ countDownLatch.countDown();
+ return;
+ }
+ MovieMessage movieMessage = new MovieMessage();
+ movieMessage.setMovie(movie);
+ MessageIdImpl messageId = (MessageIdImpl) msgId;
+ movieMessage.setMessageId("(" +
messageId.getLedgerId() + "," + messageId.getEntryId() + ",0)");
+ messageSet.add(movieMessage);
+ countDownLatch.countDown();
+ });
+ }
+ countDownLatch.await();
+ Assert.assertEquals(messageCnt, messageSet.size());
+ producer.close();
+ return messageSet;
+ }
+
+ private void queryAndCheck(List<PulsarColumnHandle> columnHandleList,
+ PulsarSplit split,
+ PulsarConnectorConfig connectorConfig,
+ ConnectorContext prestoConnectorContext,
+ Set<MovieMessage> messageSet) {
+ PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor(
+ columnHandleList, split, connectorConfig,
pulsar.getManagedLedgerFactory(),
+ new ManagedLedgerConfig(), new
PulsarConnectorMetricsTracker(new NullStatsProvider()),
+ new
PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager()));
+
+ AtomicInteger receiveMsgCnt = new AtomicInteger(messageSet.size());
+ while (pulsarRecordCursor.advanceNextPosition()) {
+ Movie movie = new Movie();
+ MovieMessage movieMessage = new MovieMessage();
+ movieMessage.setMovie(movie);
+ for (int i = 0; i < columnHandleList.size(); i++) {
+ switch (columnHandleList.get(i).getName()) {
+ case "binaryData":
+
movie.setBinaryData(pulsarRecordCursor.getSlice(i).getBytes());
+ break;
+ case "name":
+ movie.setName(new
String(pulsarRecordCursor.getSlice(i).getBytes()));
+ break;
+ case "publishTime":
+ movie.setPublishTime(pulsarRecordCursor.getLong(i));
+ break;
+ case "__message_id__":
+ movieMessage.setMessageId(new
String(pulsarRecordCursor.getSlice(i).getBytes()));
+ default:
+ // do nothing
+ break;
+ }
+ }
+
+ Assert.assertTrue(messageSet.contains(movieMessage));
+ messageSet.remove(movieMessage);
+ receiveMsgCnt.decrementAndGet();
+ }
+ }
+
+}