This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef7aca   optimizing throughput in Pulsar Presto connector (#2564)
6ef7aca is described below

commit 6ef7acaf37d57769c4d9dbf7558ef627ce061339
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Thu Sep 13 14:50:01 2018 -0700

     optimizing throughput in Pulsar Presto connector (#2564)
    
    ### Motivation
    
    1. Currently, the presto pulsar connector will read synchronously from 
bookkeeper when it has run out of entries go process.  Basically, we process a 
batch of entries and then we read more.  Ideally should be doing reading and 
processing in parallel to increase throughput.
    
    2. Each split initializes their own ManagedLedgerFactory/Bookkeeper client. 
 We really just need one bookkeeper client to be shared among threads.
    
    ### Modifications
    1. Rewrote the logic in the Presto Pulsar connector to read async and 
process in parallel
    
    2. Cache ManagedLedgerFactory to be used across splits
    
    ### Result
    
    I see about 2X throughput improvement on single node as well as cluster (2 
brokers, 3 bookies, 4 presto workers including coordinator) on AWS
---
 conf/presto/catalog/pulsar.properties              |   6 +-
 .../apache/pulsar/sql/presto/PulsarConnector.java  |   5 +
 .../pulsar/sql/presto/PulsarConnectorCache.java    |  64 +++++++
 .../pulsar/sql/presto/PulsarConnectorConfig.java   |  26 ++-
 .../pulsar/sql/presto/PulsarRecordCursor.java      | 201 +++++++++++++++------
 .../pulsar/sql/presto/TestPulsarConnector.java     | 146 ++++++++-------
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |   1 +
 7 files changed, 326 insertions(+), 123 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties 
b/conf/presto/catalog/pulsar.properties
index 23b945e..5f922e5 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181
 # minimum number of entries to read at a single time
 pulsar.entry-read-batch-size=100
 # default number of splits to use per query
-pulsar.target-num-splits=4
+pulsar.target-num-splits=2
+# max message queue size
+pulsar.max-split-message-queue-size=10000
+# max entry queue size
+pulsar.max-split-entry-queue-size = 1000
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 1d89b51..498583d 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -87,6 +87,11 @@ public class PulsarConnector implements Connector {
             log.error(e, "Failed to close pulsar connector");
         }
         try {
+            PulsarConnectorCache.shutdown();
+        } catch (Exception e) {
+            log.error("Failed to shutdown pulsar connector cache");
+        }
+        try {
             lifeCycleManager.stop();
         } catch (Exception e) {
             log.error(e, "Error shutting down connector");
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
new file mode 100644
index 0000000..d13ddcd
--- /dev/null
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+
+public class PulsarConnectorCache {
+
+    private static PulsarConnectorCache instance;
+
+    private final ManagedLedgerFactory managedLedgerFactory;
+
+    private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) 
throws Exception {
+        this.managedLedgerFactory = 
initManagedLedgerFactory(pulsarConnectorConfig);
+    }
+
+    public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig 
pulsarConnectorConfig) throws Exception {
+        synchronized (PulsarConnectorCache.class) {
+            if (instance == null) {
+                instance = new PulsarConnectorCache(pulsarConnectorConfig);
+            }
+        }
+        return instance;
+    }
+
+    private static ManagedLedgerFactory 
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws 
Exception {
+        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+                .setAllowShadedLedgerManagerFactoryClass(true)
+                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")
+                .setReadEntryTimeout(60);
+        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+    }
+
+    public ManagedLedgerFactory getManagedLedgerFactory() {
+        return managedLedgerFactory;
+    }
+
+    public static void shutdown() throws ManagedLedgerException, 
InterruptedException {
+        if (instance != null) {
+            instance.managedLedgerFactory.shutdown();
+            instance = null;
+        }
+    }
+}
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 1f574c6..482fab3 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -29,7 +29,9 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private String brokerServiceUrl = "http://localhost:8080";;
     private String zookeeperUri = "localhost:2181";
     private int entryReadBatchSize = 100;
-    private int targetNumSplits = 4;
+    private int targetNumSplits = 2;
+    private int maxSplitMessageQueueSize = 10000;
+    private int maxSplitEntryQueueSize = 1000;
     private PulsarAdmin pulsarAdmin;
 
     @NotNull
@@ -77,6 +79,28 @@ public class PulsarConnectorConfig implements AutoCloseable {
     }
 
     @NotNull
+    public int getMaxSplitMessageQueueSize() {
+        return this.maxSplitMessageQueueSize;
+    }
+
+    @Config("pulsar.max-split-message-queue-size")
+    public PulsarConnectorConfig setMaxSplitMessageQueueSize(int 
maxSplitMessageQueueSize) {
+        this.maxSplitMessageQueueSize = maxSplitMessageQueueSize;
+        return this;
+    }
+
+    @NotNull
+    public int getMaxSplitEntryQueueSize() {
+        return this.maxSplitEntryQueueSize;
+    }
+
+    @Config("pulsar.max-split-entry-queue-size")
+    public PulsarConnectorConfig setMaxSplitEntryQueueSize(int 
maxSplitEntryQueueSize) {
+        this.maxSplitEntryQueueSize = maxSplitEntryQueueSize;
+        return this;
+    }
+
+    @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
             this.pulsarAdmin = 
PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index ef56e6c..c8106aa 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -28,6 +28,7 @@ import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
 import org.apache.avro.Schema;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -44,10 +45,11 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
@@ -62,35 +64,47 @@ import static 
com.facebook.presto.spi.type.TimestampWithTimeZoneType.TIMESTAMP_W
 import static com.facebook.presto.spi.type.TinyintType.TINYINT;
 import static com.google.common.base.Preconditions.checkArgument;
 
+
 public class PulsarRecordCursor implements RecordCursor {
 
     private List<PulsarColumnHandle> columnHandles;
     private PulsarSplit pulsarSplit;
     private PulsarConnectorConfig pulsarConnectorConfig;
-    private ManagedLedgerFactory managedLedgerFactory;
     private ReadOnlyCursor cursor;
-    private Queue<Message> messageQueue = new LinkedList<>();
+    private ArrayBlockingQueue<Message> messageQueue;
+    private ArrayBlockingQueue<Entry> entryQueue;
     private Object currentRecord;
     private Message currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = 
PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
     private int batchSize;
-    private long completedBytes = 0L;
+    private AtomicLong completedBytes = new AtomicLong(0L);
+    private ReadEntries readEntries;
+    private DeserializeEntries deserializeEntries;
+    private TopicName topicName;
 
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
+    private static ManagedLedgerFactory 
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws 
Exception {
+        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+                .setAllowShadedLedgerManagerFactoryClass(true)
+                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
+        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+    }
+
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, 
PulsarSplit pulsarSplit,
                               PulsarConnectorConfig pulsarConnectorConfig) {
-
-        ManagedLedgerFactory managedLedgerFactory;
+        PulsarConnectorCache pulsarConnectorCache;
         try {
-            managedLedgerFactory = 
getManagedLedgerFactory(pulsarConnectorConfig);
+            pulsarConnectorCache = 
PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
         } catch (Exception e) {
-            log.error(e, "Failed to initialize managed ledger factory");
+            log.error(e, "Failed to initialize Pulsar connector cache");
             close();
             throw new RuntimeException(e);
         }
-        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, 
managedLedgerFactory);
+        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
+                pulsarConnectorCache.getManagedLedgerFactory());
     }
 
     // Exposed for testing purposes
@@ -105,7 +119,11 @@ public class PulsarRecordCursor implements RecordCursor {
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
-        this.managedLedgerFactory = managedLedgerFactory;
+        this.messageQueue = new 
ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+        this.entryQueue = new 
ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+        this.topicName = TopicName.get("persistent",
+                NamespaceName.get(pulsarSplit.getSchemaName()),
+                pulsarSplit.getTableName());
 
         Schema schema = 
PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
 
@@ -149,18 +167,9 @@ public class PulsarRecordCursor implements RecordCursor {
         return cursor;
     }
 
-    private ManagedLedgerFactory getManagedLedgerFactory(PulsarConnectorConfig 
pulsarConnectorConfig) throws Exception {
-        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-                .setAllowShadedLedgerManagerFactoryClass(true)
-                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
-        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
-    }
-
-
     @Override
     public long getCompletedBytes() {
-        return this.completedBytes;
+        return this.completedBytes.get();
     }
 
     @Override
@@ -174,39 +183,48 @@ public class PulsarRecordCursor implements RecordCursor {
         return columnHandles.get(field).getType();
     }
 
-    @Override
-    public boolean advanceNextPosition() {
+    @VisibleForTesting
+    class DeserializeEntries implements Runnable {
 
-        if (this.messageQueue.isEmpty()) {
-            if (!this.cursor.hasMoreEntries()) {
-                return false;
-            }
-            if (((PositionImpl) this.cursor.getReadPosition())
-                    .compareTo(this.pulsarSplit.getEndPosition()) >= 0) {
-                return false;
-            }
+    protected AtomicBoolean isRunning = new AtomicBoolean(false);
 
-            TopicName topicName = TopicName.get("persistent",
-                    NamespaceName.get(this.pulsarSplit.getSchemaName()),
-                    this.pulsarSplit.getTableName());
+        private final Thread thread;
 
-            List<Entry> newEntries;
-            try {
-                newEntries = this.cursor.readEntries(this.batchSize);
-            } catch (InterruptedException | ManagedLedgerException e) {
-                log.error(e, "Failed to read new entries from pulsar topic 
%s", topicName.toString());
-                throw new RuntimeException(e);
-            }
+        public DeserializeEntries() {
+            this.thread = new Thread(this);
+        }
+
+        public void interrupt() {
+            isRunning.set(false);
+            thread.interrupt();
+        }
+
+        public void start() {
+            this.thread.start();
+        }
 
-            newEntries.forEach(entry -> {
+        @Override
+        public void run() {
+            isRunning.set(true);
+            while (isRunning.get()) {
+                Entry entry;
                 try {
-                    completedBytes += entry.getDataBuffer().readableBytes();
+                    entry = entryQueue.take();
+                } catch (InterruptedException e) {
+                    break;
+                }
+                try {
+                    
completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
                     // filter entries that is not part of my split
                     if (((PositionImpl) 
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
                         try {
                             MessageParser.parseMessage(topicName, 
entry.getLedgerId(), entry.getEntryId(),
                                     entry.getDataBuffer(), (messageId, 
message, byteBuf) -> {
-                                        messageQueue.add(message);
+                                        try {
+                                            messageQueue.put(message);
+                                        } catch (InterruptedException e) {
+                                            //no-op
+                                        }
                                     });
                         } catch (IOException e) {
                             log.error(e, "Failed to parse message from pulsar 
topic %s", topicName.toString());
@@ -216,15 +234,92 @@ public class PulsarRecordCursor implements RecordCursor {
                 } finally {
                     entry.release();
                 }
-            });
+            }
         }
+    }
 
-        this.currentMessage = this.messageQueue.poll();
-        currentRecord = 
this.schemaHandler.deserialize(this.currentMessage.getData());
+    @VisibleForTesting
+    class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
+
+        // indicate whether there are any additional entries left to read
+        private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+        //num of outstanding read requests
+        // set to 1 because we can only read one batch a time
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(1);
+
+        public void run() {
+
+            if (outstandingReadsRequests.get() > 0) {
 
+                if (!cursor.hasMoreEntries() || ((PositionImpl) 
cursor.getReadPosition())
+                        .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                    isDone.set(true);
+
+                } else if (entryQueue.remainingCapacity() > batchSize) {
+                    outstandingReadsRequests.decrementAndGet();
+                    cursor.asyncReadEntries(batchSize, this, 
System.currentTimeMillis());
+                }
+            }
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> entries, Object ctx) {
+            entryQueue.addAll(entries);
+            outstandingReadsRequests.incrementAndGet();
+        }
+
+        public boolean hashFinished() {
+            return messageQueue.isEmpty() && entryQueue.isEmpty() && 
isDone.get() && outstandingReadsRequests.get() >=1;
+        }
+
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            log.debug(exception, "Failed to read entries from topic %s", 
topicName.toString());
+            outstandingReadsRequests.incrementAndGet();
+        }
+    }
+
+
+    @Override
+    public boolean advanceNextPosition() {
+
+        if (readEntries == null) {
+            readEntries = new ReadEntries();
+            readEntries.run();
+
+            // start deserialize thread
+            deserializeEntries = new DeserializeEntries();
+            deserializeEntries.start();
+        }
+
+        while(true) {
+            if (readEntries.hashFinished()) {
+                return false;
+            }
+
+            if (messageQueue.remainingCapacity() > 0) {
+                readEntries.run();
+            }
+
+            currentMessage = messageQueue.poll();
+            if (currentMessage != null) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        currentRecord = 
this.schemaHandler.deserialize(this.currentMessage.getData());
         return true;
     }
 
+
     @VisibleForTesting
     Object getRecord(int fieldIndex) {
         if (this.currentRecord == null) {
@@ -317,17 +412,13 @@ public class PulsarRecordCursor implements RecordCursor {
     @Override
     public void close() {
 
-        if (this.cursor != null) {
-            try {
-                this.cursor.close();
-            } catch (Exception e) {
-                log.error(e);
-            }
+        if (deserializeEntries != null) {
+            deserializeEntries.interrupt();
         }
 
-        if (managedLedgerFactory != null) {
+        if (this.cursor != null) {
             try {
-                managedLedgerFactory.shutdown();
+                this.cursor.close();
             } catch (Exception e) {
                 log.error(e);
             }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 0882efc..5d8472d 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -27,6 +27,7 @@ import com.facebook.presto.spi.type.RealType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
 import io.airlift.log.Logger;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -68,6 +69,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -662,6 +664,9 @@ public abstract class TestPulsarConnector {
     @BeforeMethod
     public void setup() throws Exception {
         this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
+        this.pulsarConnectorConfig.setEntryReadBatchSize(1);
+        this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
+        this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
 
         Tenants tenants = mock(Tenants.class);
         doReturn(new LinkedList<>(topicNames.stream().map(new 
Function<TopicName, String>() {
@@ -786,77 +791,86 @@ public abstract class TestPulsarConnector {
                     }
                 });
 
-                when(readOnlyCursor.readEntries(anyInt())).thenAnswer(new 
Answer<List<Entry>>() {
+                doAnswer(new Answer() {
                     @Override
-                    public List<Entry> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                    public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                         Object[] args = invocationOnMock.getArguments();
                         Integer readEntries = (Integer) args[0];
+                        AsyncCallbacks.ReadEntriesCallback callback = 
(AsyncCallbacks.ReadEntriesCallback) args[1];
+                        Object ctx = args[2];
+
+                        new Thread(new Runnable() {
+                            @Override
+                            public void run() {
+                                List < Entry > entries = new LinkedList<>();
+                                for (int i = 0; i < readEntries; i++) {
+
+                                    Foo.Bar foobar = new Foo.Bar();
+                                    foobar.field1 = (int) 
fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+                                    Boo boo1 = new Boo();
+                                    boo1.field4 = (double) 
fooFunctions.get("bar.test.field4").apply(count);
+                                    boo1.field5 = (boolean) 
fooFunctions.get("bar.test.field5").apply(count);
+                                    boo1.field6 = (long) 
fooFunctions.get("bar.test.field6").apply(count);
+                                    boo1.foo = new Foo();
+                                    boo1.boo = null;
+                                    boo1.bar = new Bar();
+                                    boo1.foobar = foobar;
+
+                                    Boo boo2 = new Boo();
+                                    boo2.field4 = (double) 
fooFunctions.get("bar.test2.field4").apply(count);
+                                    boo2.field5 = (boolean) 
fooFunctions.get("bar.test2.field5").apply(count);
+                                    boo2.field6 = (long) 
fooFunctions.get("bar.test2.field6").apply(count);
+                                    boo2.foo = new Foo();
+                                    boo2.boo = boo1;
+                                    boo2.bar = new Bar();
+                                    boo2.foobar = foobar;
+
+                                    TestPulsarConnector.Bar bar = new 
TestPulsarConnector.Bar();
+                                    bar.field1 = 
fooFunctions.get("bar.field1").apply(count) == null ? null : (int) 
fooFunctions.get("bar.field1").apply(count);
+                                    bar.field2 = 
fooFunctions.get("bar.field2").apply(count) == null ? null : (String) 
fooFunctions.get("bar.field2").apply(count);
+                                    bar.field3 = (float) 
fooFunctions.get("bar.field3").apply(count);
+                                    bar.test = boo1;
+                                    bar.test2 = count % 2 == 0 ? null : boo2;
+
+                                    Foo foo = new Foo();
+                                    foo.field1 = (int) 
fooFunctions.get("field1").apply(count);
+                                    foo.field2 = (String) 
fooFunctions.get("field2").apply(count);
+                                    foo.field3 = (float) 
fooFunctions.get("field3").apply(count);
+                                    foo.field4 = (double) 
fooFunctions.get("field4").apply(count);
+                                    foo.field5 = (boolean) 
fooFunctions.get("field5").apply(count);
+                                    foo.field6 = (long) 
fooFunctions.get("field6").apply(count);
+                                    foo.timestamp = (long) 
fooFunctions.get("timestamp").apply(count);
+                                    foo.time = (int) 
fooFunctions.get("time").apply(count);
+                                    foo.date = (int) 
fooFunctions.get("date").apply(count);
+                                    foo.bar = bar;
+
+                                    PulsarApi.MessageMetadata messageMetadata 
= PulsarApi.MessageMetadata.newBuilder()
+                                            
.setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                            
.setPublishTime(System.currentTimeMillis()).build();
+
+                                    Schema schema = 
topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? 
AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+
+                                    
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
+                                            = 
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
+
+                                    ByteBuf byteBuf = 
serializeMetadataAndPayload
+                                            (Commands.ChecksumType.Crc32c, 
messageMetadata, payload);
+
+                                    completedBytes += byteBuf.readableBytes();
+
+                                    entries.add(EntryImpl.create(0, 
positions.get(topic), byteBuf));
+                                    positions.put(topic, positions.get(topic) 
+ 1);
+                                    count++;
+                                }
+
+                                callback.readEntriesComplete(entries, ctx);
+                            }
+                        }).start();
 
-                        List<Entry> entries = new LinkedList<>();
-                        for (int i = 0; i < readEntries; i++) {
-
-                            Foo.Bar foobar = new Foo.Bar();
-                            foobar.field1 = (int) 
fooFunctions.get("bar.test.foobar.field1").apply(count);
-
-                            Boo boo1 = new Boo();
-                            boo1.field4 = (double) 
fooFunctions.get("bar.test.field4").apply(count);
-                            boo1.field5 = (boolean) 
fooFunctions.get("bar.test.field5").apply(count);
-                            boo1.field6 = (long) 
fooFunctions.get("bar.test.field6").apply(count);
-                            boo1.foo = new Foo();
-                            boo1.boo = null;
-                            boo1.bar = new Bar();
-                            boo1.foobar = foobar;
-
-                            Boo boo2 = new Boo();
-                            boo2.field4 = (double) 
fooFunctions.get("bar.test2.field4").apply(count);
-                            boo2.field5 = (boolean) 
fooFunctions.get("bar.test2.field5").apply(count);
-                            boo2.field6 = (long) 
fooFunctions.get("bar.test2.field6").apply(count);
-                            boo2.foo = new Foo();
-                            boo2.boo = boo1;
-                            boo2.bar = new Bar();
-                            boo2.foobar = foobar;
-
-                            TestPulsarConnector.Bar bar = new 
TestPulsarConnector.Bar();
-                            bar.field1 = 
fooFunctions.get("bar.field1").apply(count) == null ? null : (int) 
fooFunctions.get("bar.field1").apply(count);
-                            bar.field2 = 
fooFunctions.get("bar.field2").apply(count) == null ? null : (String) 
fooFunctions.get("bar.field2").apply(count);
-                            bar.field3 = (float) 
fooFunctions.get("bar.field3").apply(count);
-                            bar.test = boo1;
-                            bar.test2 = count % 2 == 0 ? null : boo2;
-
-                            Foo foo = new Foo();
-                            foo.field1 = (int) 
fooFunctions.get("field1").apply(count);
-                            foo.field2 = (String) 
fooFunctions.get("field2").apply(count);
-                            foo.field3 = (float) 
fooFunctions.get("field3").apply(count);
-                            foo.field4 = (double) 
fooFunctions.get("field4").apply(count);
-                            foo.field5 = (boolean) 
fooFunctions.get("field5").apply(count);
-                            foo.field6 = (long) 
fooFunctions.get("field6").apply(count);
-                            foo.timestamp = (long) 
fooFunctions.get("timestamp").apply(count);
-                            foo.time = (int) 
fooFunctions.get("time").apply(count);
-                            foo.date = (int) 
fooFunctions.get("date").apply(count);
-                            foo.bar = bar;
-
-                            PulsarApi.MessageMetadata messageMetadata = 
PulsarApi.MessageMetadata.newBuilder()
-                                    
.setProducerName("test-producer").setSequenceId(positions.get(topic))
-                                    
.setPublishTime(System.currentTimeMillis()).build();
-
-                            Schema schema = 
topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? 
AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
-
-                            org.apache.pulsar.shade.io.netty.buffer.ByteBuf 
payload
-                                    = 
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
-
-                            ByteBuf byteBuf = serializeMetadataAndPayload
-                                    (Commands.ChecksumType.Crc32c, 
messageMetadata, payload);
-
-                            completedBytes += byteBuf.readableBytes();
-
-                            entries.add(EntryImpl.create(0, 
positions.get(topic), byteBuf));
-                            positions.put(topic, positions.get(topic) + 1);
-                            count++;
-                        }
-
-                        return entries;
+                        return null;
                     }
-                });
+                }).when(readOnlyCursor).asyncReadEntries(anyInt(), any(), 
any());
 
                 when(readOnlyCursor.hasMoreEntries()).thenAnswer(new 
Answer<Boolean>() {
                     @Override
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 1b323a2..b0fc42a 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -122,6 +122,7 @@ public class TestPulsarRecordCursor extends 
TestPulsarConnector {
             Assert.assertEquals(count, 
topicsToNumEntries.get(topicName.getSchemaName()).longValue());
             Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), 
completedBytes);
             cleanup();
+            pulsarRecordCursor.close();
         }
     }
 }

Reply via email to