This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b7a76f Fixed refcounting when processing entries from sql (#2316)
6b7a76f is described below
commit 6b7a76fba4b1c08bf845409c6e4cabf48a454a5e
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Aug 7 09:12:47 2018 +0900
Fixed refcounting when processing entries from sql (#2316)
---
.../java/org/apache/pulsar/client/impl/MessageParser.java | 5 -----
.../org/apache/pulsar/sql/presto/PulsarRecordCursor.java | 12 +++++-------
2 files changed, 5 insertions(+), 12 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 8cf0328..b95b22d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -95,15 +95,10 @@ public class MessageParser {
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
final MessageImpl<?> message = new MessageImpl<>(msgId,
msgMetadata, uncompressedPayload, null, null);
processor.process(msgId, message, uncompressedPayload);
-
- uncompressedPayload.release();
-
} else {
// handle batch message enqueuing; uncompressed payload has
all messages in batch
receiveIndividualMessagesFromBatch(msgMetadata,
uncompressedPayload, messageId, null, -1, processor);
- uncompressedPayload.release();
}
-
} finally {
if (uncompressedPayload != null) {
uncompressedPayload.release();
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 618b33b..3523228 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -52,7 +52,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
@@ -202,10 +201,9 @@ public class PulsarRecordCursor implements RecordCursor {
throw new RuntimeException(e);
}
- newEntries.forEach(new Consumer<Entry>() {
- @Override
- public void accept(Entry entry) {
- completedBytes += entry.getData().length;
+ newEntries.forEach(entry -> {
+ try {
+ completedBytes += entry.getDataBuffer().readableBytes();
// filter entries that is not part of my split
if (((PositionImpl)
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
try {
@@ -217,9 +215,9 @@ public class PulsarRecordCursor implements RecordCursor {
log.error(e, "Failed to parse message from pulsar
topic %s", topicName.toString());
throw new RuntimeException(e);
}
- } else {
- entry.release();
}
+ } finally {
+ entry.release();
}
});
}