This is an automated email from the ASF dual-hosted git repository. rgao pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 86fa49620191fd265dc27055f6840a100253aea8 Author: Jiwei Guo <[email protected]> AuthorDate: Fri Feb 25 16:07:42 2022 +0800 [Pulsar SQL] Fix PulsarRecordCursor deserialize issue. (#14379) (cherry picked from commit a96a1584ba2e9d19c6919b7597a0f344a2af1a35) --- .../pulsar/sql/presto/PulsarRecordCursor.java | 202 +++++++++++---------- 1 file changed, 111 insertions(+), 91 deletions(-) diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index d839e05..56ae17b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -45,7 +45,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -110,6 +112,7 @@ public class PulsarRecordCursor implements RecordCursor { private final long splitSize; private long entriesProcessed = 0; private int partition = -1; + private volatile Throwable deserializingError; private PulsarSqlSchemaInfoProvider schemaInfoProvider; @@ -236,113 +239,125 @@ public class PulsarRecordCursor implements RecordCursor { } @VisibleForTesting - class DeserializeEntries implements Runnable { + class DeserializeEntries extends Thread { - protected boolean isRunning = false; + private final AtomicBoolean isRunning; - private final Thread thread; + private final CompletableFuture<Void> closeHandle; public DeserializeEntries() { - this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId()); + super("deserialize-thread-split-" + pulsarSplit.getSplitId()); + this.isRunning = new AtomicBoolean(false); + this.closeHandle = new CompletableFuture<>(); } - public void interrupt() { - isRunning = false; - thread.interrupt(); + @Override + public void start() { + if (isRunning.compareAndSet(false, true)) { + super.start(); + } } - public void start() { - this.thread.start(); + public CompletableFuture<Void> close() { + if (isRunning.compareAndSet(true, false)) { + super.interrupt(); + } + return closeHandle; } @Override public void run() { - isRunning = true; - while (isRunning) { - - int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() { - @Override - public void accept(Entry entry) { - - try { - entryQueueCacheSizeAllocator.release(entry.getLength()); - - long bytes = entry.getDataBuffer().readableBytes(); - completedBytes += bytes; - // register stats for bytes read - metricsTracker.register_BYTES_READ(bytes); - - // check if we have processed all entries in this split - // and no incomplete chunked messages exist - if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) { - return; - } - - // set start time for time deserializing entries for stats - metricsTracker.start_ENTRY_DESERIALIZE_TIME(); + try { + while (isRunning.get()) { + int read = entryQueue.drain(new MessagePassingQueue.Consumer<Entry>() { + @Override + public void accept(Entry entry) { try { - MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), - entry.getDataBuffer(), (message) -> { - try { - // start time for message queue read - metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME(); - - if (message.getNumChunksFromMsg() > 1) { - message = processChunkedMessages(message); - } else if (entryExceedSplitEndPosition(entry)) { - // skip no chunk or no multi chunk message - // that exceed split end position - message.release(); - message = null; - } - if (message != null) { - while (true) { - if (!haveAvailableCacheSize( - messageQueueCacheSizeAllocator, messageQueue) - || !messageQueue.offer(message)) { - Thread.sleep(1); - } else { - messageQueueCacheSizeAllocator.allocate( - message.getData().readableBytes()); - break; + entryQueueCacheSizeAllocator.release(entry.getLength()); + + long bytes = entry.getDataBuffer().readableBytes(); + completedBytes += bytes; + // register stats for bytes read + metricsTracker.register_BYTES_READ(bytes); + + // check if we have processed all entries in this split + // and no incomplete chunked messages exist + if (entryExceedSplitEndPosition(entry) && chunkedMessagesMap.isEmpty()) { + return; + } + + // set start time for time deserializing entries for stats + metricsTracker.start_ENTRY_DESERIALIZE_TIME(); + + try { + MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(), + entry.getDataBuffer(), (message) -> { + try { + // start time for message queue read + metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME(); + + if (message.getNumChunksFromMsg() > 1) { + message = processChunkedMessages(message); + } else if (entryExceedSplitEndPosition(entry)) { + // skip no chunk or no multi chunk message + // that exceed split end position + message.release(); + message = null; + } + if (message != null) { + while (true) { + if (!haveAvailableCacheSize( + messageQueueCacheSizeAllocator, messageQueue) + || !messageQueue.offer(message)) { + Thread.sleep(1); + } else { + messageQueueCacheSizeAllocator.allocate( + message.getData().readableBytes()); + break; + } } } - } - // stats for how long a read from message queue took - metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME(); - // stats for number of messages read - metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY(); - - } catch (InterruptedException e) { - //no-op - } - }, pulsarConnectorConfig.getMaxMessageSize()); - } catch (IOException e) { - log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString()); - throw new RuntimeException(e); - } - // stats for time spend deserializing entries - metricsTracker.end_ENTRY_DESERIALIZE_TIME(); + // stats for how long a read from message queue took + metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME(); + // stats for number of messages read + metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY(); - // stats for num messages per entry - metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY(); - - } finally { - entriesProcessed++; - entry.release(); + } catch (InterruptedException e) { + //no-op + } + }, pulsarConnectorConfig.getMaxMessageSize()); + } catch (IOException e) { + log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString()); + throw new RuntimeException(e); + } + // stats for time spend deserializing entries + metricsTracker.end_ENTRY_DESERIALIZE_TIME(); + + // stats for num messages per entry + metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY(); + + } finally { + entriesProcessed++; + entry.release(); + } } - } - }); + }); - if (read <= 0) { - try { - Thread.sleep(1); - } catch (InterruptedException e) { - return; + if (read <= 0) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + return; + } } } + closeHandle.complete(null); + } catch (Throwable ex) { + log.error(ex, "Stop running DeserializeEntries"); + closeHandle.completeExceptionally(ex); + throw ex; } } } @@ -468,6 +483,9 @@ public class PulsarRecordCursor implements RecordCursor { if (readEntries == null) { // start deserialize thread deserializeEntries = new DeserializeEntries(); + deserializeEntries.setUncaughtExceptionHandler((t, ex) -> { + deserializingError = ex; + }); deserializeEntries.start(); readEntries = new ReadEntries(); @@ -492,6 +510,8 @@ public class PulsarRecordCursor implements RecordCursor { if (currentMessage != null) { messageQueueCacheSizeAllocator.release(currentMessage.getData().readableBytes()); break; + } else if (deserializingError != null) { + throw new RuntimeException(deserializingError); } else { try { Thread.sleep(1); @@ -503,7 +523,7 @@ public class PulsarRecordCursor implements RecordCursor { } } - //start time for deseralizing record + //start time for deserializing record metricsTracker.start_RECORD_DESERIALIZE_TIME(); SchemaInfo schemaInfo = getSchemaInfo(pulsarSplit); @@ -714,12 +734,12 @@ public class PulsarRecordCursor implements RecordCursor { messageQueue.drain(RawMessage::release); } - if (entryQueue != null) { - entryQueue.drain(Entry::release); - } - if (deserializeEntries != null) { - deserializeEntries.interrupt(); + deserializeEntries.close().whenComplete((r, t) -> { + if (entryQueue != null) { + entryQueue.drain(Entry::release); + } + }); } if (this.cursor != null) {
