This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9db66c31370c0b2a693f92ea8fba6d02e4a9bdd4 Author: ran <[email protected]> AuthorDate: Mon Dec 13 11:20:47 2021 +0800 [Pulsar SQL] Support query chunked messages feature in Pulsar SQL (#12720) ### Motivation Currently, the Pulsar SQL didn't support query chunked messages. ### Modifications Add a chunked message map in `PulsarRecordCursor` to maintain incomplete chunked messages, if one chunked message was received completely, it will be offered in the message queue to wait for deserialization. (cherry picked from commit 93b74b5498ced9e75512c593e6e5a9f5a6c8f26b) --- .../apache/pulsar/client/impl/ProducerImpl.java | 9 + .../apache/pulsar/common/api/raw/RawMessage.java | 29 +++ .../pulsar/common/api/raw/RawMessageImpl.java | 44 +++++ .../pulsar/sql/presto/PulsarRecordCursor.java | 141 ++++++++++++-- .../pulsar/sql/presto/TestReadChunkedMessages.java | 214 +++++++++++++++++++++ 5 files changed, 424 insertions(+), 13 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index cc978a1..a3ca386 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -463,7 +463,16 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne sequenceId = msgMetadata.getSequenceId(); } String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null; + byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ? + msg.getMessageBuilder().getSchemaVersion() : null; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { + // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in + // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`, + // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to + // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`. + if (chunkId > 0 && schemaVersion != null) { + msg.getMessageBuilder().setSchemaVersion(schemaVersion); + } serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed, compressedPayload.readableBytes(), uncompressedSize, callback); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java index d093628..483b5a3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessage.java @@ -121,4 +121,33 @@ public interface RawMessage { * @return true if the key is base64 encoded, false otherwise */ boolean hasBase64EncodedKey(); + + /** + * Get uuid of chunked message. + * + * @return uuid + */ + String getUUID(); + + /** + * Get chunkId of chunked message. + * + * @return chunkId + */ + int getChunkId(); + + /** + * Get chunk num of chunked message. + * + * @return chunk num + */ + int getNumChunksFromMsg(); + + /** + * Get chunk message total size in bytes. + * + * @return chunked message total size in bytes + */ + int getTotalChunkMsgSize(); + } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java index defc1b4..3aa0cbc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/RawMessageImpl.java @@ -81,6 +81,14 @@ public class RawMessageImpl implements RawMessage { return msg; } + public RawMessage updatePayloadForChunkedMessage(ByteBuf chunkedTotalPayload) { + if (!msgMetadata.getMetadata().hasNumChunksFromMsg() || msgMetadata.getMetadata().getNumChunksFromMsg() <= 1) { + throw new RuntimeException("The update payload operation only support multi chunked messages."); + } + payload = chunkedTotalPayload; + return this; + } + @Override public Map<String, String> getProperties() { if (singleMessageMetadata != null && singleMessageMetadata.getPropertiesCount() > 0) { @@ -170,6 +178,42 @@ public class RawMessageImpl implements RawMessage { return msgMetadata.getMetadata().isPartitionKeyB64Encoded(); } + @Override + public String getUUID() { + if (msgMetadata.getMetadata().hasUuid()) { + return msgMetadata.getMetadata().getUuid(); + } else { + return null; + } + } + + @Override + public int getChunkId() { + if (msgMetadata.getMetadata().hasChunkId()) { + return msgMetadata.getMetadata().getChunkId(); + } else { + return -1; + } + } + + @Override + public int getNumChunksFromMsg() { + if (msgMetadata.getMetadata().hasNumChunksFromMsg()) { + return msgMetadata.getMetadata().getNumChunksFromMsg(); + } else { + return -1; + } + } + + @Override + public int getTotalChunkMsgSize() { + if (msgMetadata.getMetadata().hasTotalChunkMsgSize()) { + return msgMetadata.getMetadata().getTotalChunkMsgSize(); + } else { + return -1; + } + } + public int getBatchSize() { return msgMetadata.getMetadata().getNumMessagesInBatch(); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index b1230d3..558b87b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -29,6 +29,9 @@ import com.google.common.annotations.VisibleForTesting; import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.Recycler; +import io.netty.util.ReferenceCountUtil; import io.prestosql.decoder.DecoderColumnHandle; import io.prestosql.decoder.FieldValueProvider; import io.prestosql.spi.block.Block; @@ -58,6 +61,8 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; +import org.apache.pulsar.common.api.raw.RawMessageIdImpl; +import org.apache.pulsar.common.api.raw.RawMessageImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -66,6 +71,7 @@ import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.apache.pulsar.sql.presto.util.CacheSizeAllocator; import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator; import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator; @@ -112,6 +118,8 @@ public class PulsarRecordCursor implements RecordCursor { PulsarDispatchingRowDecoderFactory decoderFactory; + protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap = new ConcurrentOpenHashMap<>(); + private static final Logger log = Logger.get(PulsarRecordCursor.class); public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, @@ -265,7 +273,8 @@ public class PulsarRecordCursor implements RecordCursor { metricsTracker.register_BYTES_READ(bytes); // check if we have processed all entries in this split - if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0) { + // and no incomplete chunked messages exist + if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) { return; } @@ -279,15 +288,25 @@ public class PulsarRecordCursor implements RecordCursor { // start time for message queue read metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME(); - while (true) { - if (!haveAvailableCacheSize( - messageQueueCacheSizeAllocator, messageQueue) - || !messageQueue.offer(message)) { - Thread.sleep(1); - } else { - messageQueueCacheSizeAllocator.allocate( - message.getData().readableBytes()); - break; + if (message.getNumChunksFromMsg() > 1) { + message = processChunkedMessages(message); + } else if (entryExceedSplitEndPosition(entry)) { + // skip no chunk or no multi chunk message + // that exceed split end position + message.release(); + message = null; + } + if (message != null) { + while (true) { + if (!haveAvailableCacheSize( + messageQueueCacheSizeAllocator, messageQueue) + || !messageQueue.offer(message)) { + Thread.sleep(1); + } else { + messageQueueCacheSizeAllocator.allocate( + message.getData().readableBytes()); + break; + } } } @@ -328,6 +347,10 @@ public class PulsarRecordCursor implements RecordCursor { } } + private boolean entryExceedSplitEndPosition(Entry entry) { + return ((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0; + } + @VisibleForTesting class ReadEntries implements AsyncCallbacks.ReadEntriesCallback { @@ -341,8 +364,9 @@ public class PulsarRecordCursor implements RecordCursor { public void run() { if (outstandingReadsRequests.get() > 0) { - if (!cursor.hasMoreEntries() || ((PositionImpl) cursor.getReadPosition()) - .compareTo(pulsarSplit.getEndPosition()) >= 0) { + if (!cursor.hasMoreEntries() || + (((PositionImpl) cursor.getReadPosition()).compareTo(pulsarSplit.getEndPosition()) >= 0 + && chunkedMessagesMap.isEmpty())) { isDone = true; } else { @@ -408,7 +432,7 @@ public class PulsarRecordCursor implements RecordCursor { public boolean hasFinished() { return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1 - && splitSize <= entriesProcessed; + && splitSize <= entriesProcessed && chunkedMessagesMap.isEmpty(); } @Override @@ -732,4 +756,95 @@ public class PulsarRecordCursor implements RecordCursor { } } + private RawMessage processChunkedMessages(RawMessage message) { + final String uuid = message.getUUID(); + final int chunkId = message.getChunkId(); + final int totalChunkMsgSize = message.getTotalChunkMsgSize(); + final int numChunks = message.getNumChunksFromMsg(); + + RawMessageIdImpl rawMessageId = (RawMessageIdImpl) message.getMessageId(); + if (rawMessageId.getLedgerId() > pulsarSplit.getEndPositionLedgerId() + && !chunkedMessagesMap.containsKey(uuid)) { + // If the message is out of the split range, we only care about the incomplete chunked messages. + message.release(); + return null; + } + if (chunkId == 0) { + ByteBuf chunkedMsgBuffer = Unpooled.directBuffer(totalChunkMsgSize, totalChunkMsgSize); + chunkedMessagesMap.computeIfAbsent(uuid, (key) -> ChunkedMessageCtx.get(numChunks, chunkedMsgBuffer)); + } + + ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(uuid); + if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null + || chunkId != (chunkedMsgCtx.lastChunkedMessageId + 1) || chunkId >= numChunks) { + // Means we lost the first chunk, it will happen when the beginning chunk didn't belong to this split. + log.info("Received unexpected chunk. messageId: %s, last-chunk-id: %s chunkId: %s, totalChunks: %s", + message.getMessageId(), + (chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), chunkId, + numChunks); + if (chunkedMsgCtx != null) { + if (chunkedMsgCtx.chunkedMsgBuffer != null) { + ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer); + } + chunkedMsgCtx.recycle(); + } + chunkedMessagesMap.remove(uuid); + message.release(); + return null; + } + + // append the chunked payload and update lastChunkedMessage-id + chunkedMsgCtx.chunkedMsgBuffer.writeBytes(message.getData()); + chunkedMsgCtx.lastChunkedMessageId = chunkId; + + // if final chunk is not received yet then release payload and return + if (chunkId != (numChunks - 1)) { + message.release(); + return null; + } + + if (log.isDebugEnabled()) { + log.debug("Chunked message completed. chunkId: %s, totalChunks: %s, msgId: %s, sequenceId: %s", + chunkId, numChunks, rawMessageId, message.getSequenceId()); + } + chunkedMessagesMap.remove(uuid); + ByteBuf unCompressedPayload = chunkedMsgCtx.chunkedMsgBuffer; + chunkedMsgCtx.recycle(); + // The chunked message complete, we use the entire payload to instead of the last chunk payload. + return ((RawMessageImpl) message).updatePayloadForChunkedMessage(unCompressedPayload); + } + + static class ChunkedMessageCtx { + + protected int totalChunks = -1; + protected ByteBuf chunkedMsgBuffer; + protected int lastChunkedMessageId = -1; + + static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) { + ChunkedMessageCtx ctx = RECYCLER.get(); + ctx.totalChunks = numChunksFromMsg; + ctx.chunkedMsgBuffer = chunkedMsgBuffer; + return ctx; + } + + private final Recycler.Handle<ChunkedMessageCtx> recyclerHandle; + + private ChunkedMessageCtx(Recycler.Handle<ChunkedMessageCtx> recyclerHandle) { + this.recyclerHandle = recyclerHandle; + } + + private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() { + protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) { + return new ChunkedMessageCtx(handle); + } + }; + + public void recycle() { + this.totalChunks = -1; + this.chunkedMsgBuffer = null; + this.lastChunkedMessageId = -1; + recyclerHandle.recycle(this); + } + } + } diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java new file mode 100644 index 0000000..0a02dc3 --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestReadChunkedMessages.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sql.presto; + +import com.google.common.collect.Sets; +import io.prestosql.spi.connector.ConnectorContext; +import io.prestosql.spi.predicate.TupleDomain; +import io.prestosql.testing.TestingConnectorContext; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test read chunked messages. + */ +@Test +@Slf4j +public class TestReadChunkedMessages extends MockedPulsarServiceBaseTest { + + private final static int MAX_MESSAGE_SIZE = 1024 * 1024; + + @EqualsAndHashCode + @Data + static class Movie { + private String name; + private Long publishTime; + private byte[] binaryData; + } + + @EqualsAndHashCode + @Data + static class MovieMessage { + private Movie movie; + private String messageId; + } + + @BeforeClass + @Override + protected void setup() throws Exception { + conf.setMaxMessageSize(MAX_MESSAGE_SIZE); + conf.setManagedLedgerMaxEntriesPerLedger(5); + conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + internalSetup(); + + admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build()); + + // so that clients can test short names + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test"))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet("test")); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + internalCleanup(); + } + + @Test + public void queryTest() throws Exception { + String topic = "chunk-topic"; + TopicName topicName = TopicName.get(topic); + int messageCnt = 20; + Set<MovieMessage> messageSet = prepareChunkedData(topic, messageCnt); + SchemaInfo schemaInfo = Schema.AVRO(Movie.class).getSchemaInfo(); + + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + connectorConfig.setWebServiceUrl(pulsar.getWebServiceAddress()); + PulsarSplitManager pulsarSplitManager = new PulsarSplitManager(new PulsarConnectorId("1"), connectorConfig); + Collection<PulsarSplit> splits = pulsarSplitManager.getSplitsForTopic( + topicName.getPersistenceNamingEncoding(), + pulsar.getManagedLedgerFactory(), + new ManagedLedgerConfig(), + 3, + new PulsarTableHandle("1", topicName.getNamespace(), topic, topic), + schemaInfo, + topic, + TupleDomain.all(), + null); + + List<PulsarColumnHandle> columnHandleList = TestPulsarConnector.getColumnColumnHandles( + topicName, schemaInfo, PulsarColumnHandle.HandleKeyValueType.NONE, true); + ConnectorContext prestoConnectorContext = new TestingConnectorContext(); + + for (PulsarSplit split : splits) { + queryAndCheck(columnHandleList, split, connectorConfig, prestoConnectorContext, messageSet); + } + Assert.assertTrue(messageSet.isEmpty()); + } + + private Set<MovieMessage> prepareChunkedData(String topic, int messageCnt) throws PulsarClientException, InterruptedException { + pulsarClient.newConsumer(Schema.AVRO(Movie.class)) + .topic(topic) + .subscriptionName("sub") + .subscribe() + .close(); + Producer<Movie> producer = pulsarClient.newProducer(Schema.AVRO(Movie.class)) + .topic(topic) + .enableBatching(false) + .enableChunking(true) + .create(); + Set<MovieMessage> messageSet = new LinkedHashSet<>(); + CountDownLatch countDownLatch = new CountDownLatch(messageCnt); + for (int i = 0; i < messageCnt; i++) { + final double dataTimes = (i % 5) * 0.5; + byte[] movieBinaryData = RandomUtils.nextBytes((int) (MAX_MESSAGE_SIZE * dataTimes)); + final int length = movieBinaryData.length; + final int index = i; + + Movie movie = new Movie(); + movie.setName("movie-" + i); + movie.setPublishTime(System.currentTimeMillis()); + movie.setBinaryData(movieBinaryData); + producer.newMessage().value(movie).sendAsync() + .whenComplete((msgId, throwable) -> { + if (throwable != null) { + log.error("Failed to produce message.", throwable); + countDownLatch.countDown(); + return; + } + MovieMessage movieMessage = new MovieMessage(); + movieMessage.setMovie(movie); + MessageIdImpl messageId = (MessageIdImpl) msgId; + movieMessage.setMessageId("(" + messageId.getLedgerId() + "," + messageId.getEntryId() + ",0)"); + messageSet.add(movieMessage); + countDownLatch.countDown(); + }); + } + countDownLatch.await(); + Assert.assertEquals(messageCnt, messageSet.size()); + producer.close(); + return messageSet; + } + + private void queryAndCheck(List<PulsarColumnHandle> columnHandleList, + PulsarSplit split, + PulsarConnectorConfig connectorConfig, + ConnectorContext prestoConnectorContext, + Set<MovieMessage> messageSet) { + PulsarRecordCursor pulsarRecordCursor = new PulsarRecordCursor( + columnHandleList, split, connectorConfig, pulsar.getManagedLedgerFactory(), + new ManagedLedgerConfig(), new PulsarConnectorMetricsTracker(new NullStatsProvider()), + new PulsarDispatchingRowDecoderFactory(prestoConnectorContext.getTypeManager())); + + AtomicInteger receiveMsgCnt = new AtomicInteger(messageSet.size()); + while (pulsarRecordCursor.advanceNextPosition()) { + Movie movie = new Movie(); + MovieMessage movieMessage = new MovieMessage(); + movieMessage.setMovie(movie); + for (int i = 0; i < columnHandleList.size(); i++) { + switch (columnHandleList.get(i).getName()) { + case "binaryData": + movie.setBinaryData(pulsarRecordCursor.getSlice(i).getBytes()); + break; + case "name": + movie.setName(new String(pulsarRecordCursor.getSlice(i).getBytes())); + break; + case "publishTime": + movie.setPublishTime(pulsarRecordCursor.getLong(i)); + break; + case "__message_id__": + movieMessage.setMessageId(new String(pulsarRecordCursor.getSlice(i).getBytes())); + default: + // do nothing + break; + } + } + + Assert.assertTrue(messageSet.contains(movieMessage)); + messageSet.remove(movieMessage); + receiveMsgCnt.decrementAndGet(); + } + } + +}
