sijie closed pull request #2564:  optimizing throughput in Pulsar Presto 
connector
URL: https://github.com/apache/incubator-pulsar/pull/2564
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/presto/catalog/pulsar.properties 
b/conf/presto/catalog/pulsar.properties
index 23b945e3b3..5f922e5071 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -26,4 +26,8 @@ pulsar.zookeeper-uri=localhost:2181
 # minimum number of entries to read at a single time
 pulsar.entry-read-batch-size=100
 # default number of splits to use per query
-pulsar.target-num-splits=4
+pulsar.target-num-splits=2
+# max message queue size
+pulsar.max-split-message-queue-size=10000
+# max entry queue size
+pulsar.max-split-entry-queue-size = 1000
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
index 1d89b519c2..498583d3c6 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java
@@ -86,6 +86,11 @@ public final void shutdown() {
         } catch (Exception e) {
             log.error(e, "Failed to close pulsar connector");
         }
+        try {
+            PulsarConnectorCache.shutdown();
+        } catch (Exception e) {
+            log.error("Failed to shutdown pulsar connector cache");
+        }
         try {
             lifeCycleManager.stop();
         } catch (Exception e) {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
new file mode 100644
index 0000000000..d13ddcd84c
--- /dev/null
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
+
+public class PulsarConnectorCache {
+
+    private static PulsarConnectorCache instance;
+
+    private final ManagedLedgerFactory managedLedgerFactory;
+
+    private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) 
throws Exception {
+        this.managedLedgerFactory = 
initManagedLedgerFactory(pulsarConnectorConfig);
+    }
+
+    public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig 
pulsarConnectorConfig) throws Exception {
+        synchronized (PulsarConnectorCache.class) {
+            if (instance == null) {
+                instance = new PulsarConnectorCache(pulsarConnectorConfig);
+            }
+        }
+        return instance;
+    }
+
+    private static ManagedLedgerFactory 
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws 
Exception {
+        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+                .setAllowShadedLedgerManagerFactoryClass(true)
+                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.")
+                .setReadEntryTimeout(60);
+        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+    }
+
+    public ManagedLedgerFactory getManagedLedgerFactory() {
+        return managedLedgerFactory;
+    }
+
+    public static void shutdown() throws ManagedLedgerException, 
InterruptedException {
+        if (instance != null) {
+            instance.managedLedgerFactory.shutdown();
+            instance = null;
+        }
+    }
+}
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index 1f574c6c02..482fab3378 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -29,7 +29,9 @@
     private String brokerServiceUrl = "http://localhost:8080";;
     private String zookeeperUri = "localhost:2181";
     private int entryReadBatchSize = 100;
-    private int targetNumSplits = 4;
+    private int targetNumSplits = 2;
+    private int maxSplitMessageQueueSize = 10000;
+    private int maxSplitEntryQueueSize = 1000;
     private PulsarAdmin pulsarAdmin;
 
     @NotNull
@@ -76,6 +78,28 @@ public PulsarConnectorConfig setTargetNumSplits(int 
targetNumSplits) {
         return this;
     }
 
+    @NotNull
+    public int getMaxSplitMessageQueueSize() {
+        return this.maxSplitMessageQueueSize;
+    }
+
+    @Config("pulsar.max-split-message-queue-size")
+    public PulsarConnectorConfig setMaxSplitMessageQueueSize(int 
maxSplitMessageQueueSize) {
+        this.maxSplitMessageQueueSize = maxSplitMessageQueueSize;
+        return this;
+    }
+
+    @NotNull
+    public int getMaxSplitEntryQueueSize() {
+        return this.maxSplitEntryQueueSize;
+    }
+
+    @Config("pulsar.max-split-entry-queue-size")
+    public PulsarConnectorConfig setMaxSplitEntryQueueSize(int 
maxSplitEntryQueueSize) {
+        this.maxSplitEntryQueueSize = maxSplitEntryQueueSize;
+        return this;
+    }
+
     @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
diff --git 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index ef56e6ccc1..c8106aa736 100644
--- 
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -28,6 +28,7 @@
 import io.airlift.slice.Slice;
 import io.airlift.slice.Slices;
 import org.apache.avro.Schema;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -44,10 +45,11 @@
 import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
 
 import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.facebook.presto.spi.type.BigintType.BIGINT;
@@ -62,35 +64,47 @@
 import static com.facebook.presto.spi.type.TinyintType.TINYINT;
 import static com.google.common.base.Preconditions.checkArgument;
 
+
 public class PulsarRecordCursor implements RecordCursor {
 
     private List<PulsarColumnHandle> columnHandles;
     private PulsarSplit pulsarSplit;
     private PulsarConnectorConfig pulsarConnectorConfig;
-    private ManagedLedgerFactory managedLedgerFactory;
     private ReadOnlyCursor cursor;
-    private Queue<Message> messageQueue = new LinkedList<>();
+    private ArrayBlockingQueue<Message> messageQueue;
+    private ArrayBlockingQueue<Entry> entryQueue;
     private Object currentRecord;
     private Message currentMessage;
     private Map<String, PulsarInternalColumn> internalColumnMap = 
PulsarInternalColumn.getInternalFieldsMap();
     private SchemaHandler schemaHandler;
     private int batchSize;
-    private long completedBytes = 0L;
+    private AtomicLong completedBytes = new AtomicLong(0L);
+    private ReadEntries readEntries;
+    private DeserializeEntries deserializeEntries;
+    private TopicName topicName;
 
     private static final Logger log = Logger.get(PulsarRecordCursor.class);
 
+    private static ManagedLedgerFactory 
initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws 
Exception {
+        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
+                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
+                .setAllowShadedLedgerManagerFactoryClass(true)
+                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
+        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
+    }
+
     public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, 
PulsarSplit pulsarSplit,
                               PulsarConnectorConfig pulsarConnectorConfig) {
-
-        ManagedLedgerFactory managedLedgerFactory;
+        PulsarConnectorCache pulsarConnectorCache;
         try {
-            managedLedgerFactory = 
getManagedLedgerFactory(pulsarConnectorConfig);
+            pulsarConnectorCache = 
PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
         } catch (Exception e) {
-            log.error(e, "Failed to initialize managed ledger factory");
+            log.error(e, "Failed to initialize Pulsar connector cache");
             close();
             throw new RuntimeException(e);
         }
-        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, 
managedLedgerFactory);
+        initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
+                pulsarConnectorCache.getManagedLedgerFactory());
     }
 
     // Exposed for testing purposes
@@ -105,7 +119,11 @@ private void initialize(List<PulsarColumnHandle> 
columnHandles, PulsarSplit puls
         this.pulsarSplit = pulsarSplit;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
         this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
-        this.managedLedgerFactory = managedLedgerFactory;
+        this.messageQueue = new 
ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
+        this.entryQueue = new 
ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
+        this.topicName = TopicName.get("persistent",
+                NamespaceName.get(pulsarSplit.getSchemaName()),
+                pulsarSplit.getTableName());
 
         Schema schema = 
PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
 
@@ -149,18 +167,9 @@ private ReadOnlyCursor getCursor(TopicName topicName, 
Position startPosition, Ma
         return cursor;
     }
 
-    private ManagedLedgerFactory getManagedLedgerFactory(PulsarConnectorConfig 
pulsarConnectorConfig) throws Exception {
-        ClientConfiguration bkClientConfiguration = new ClientConfiguration()
-                .setZkServers(pulsarConnectorConfig.getZookeeperUri())
-                .setAllowShadedLedgerManagerFactoryClass(true)
-                
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
-        return new ManagedLedgerFactoryImpl(bkClientConfiguration);
-    }
-
-
     @Override
     public long getCompletedBytes() {
-        return this.completedBytes;
+        return this.completedBytes.get();
     }
 
     @Override
@@ -174,39 +183,48 @@ public Type getType(int field) {
         return columnHandles.get(field).getType();
     }
 
-    @Override
-    public boolean advanceNextPosition() {
+    @VisibleForTesting
+    class DeserializeEntries implements Runnable {
 
-        if (this.messageQueue.isEmpty()) {
-            if (!this.cursor.hasMoreEntries()) {
-                return false;
-            }
-            if (((PositionImpl) this.cursor.getReadPosition())
-                    .compareTo(this.pulsarSplit.getEndPosition()) >= 0) {
-                return false;
-            }
+    protected AtomicBoolean isRunning = new AtomicBoolean(false);
 
-            TopicName topicName = TopicName.get("persistent",
-                    NamespaceName.get(this.pulsarSplit.getSchemaName()),
-                    this.pulsarSplit.getTableName());
+        private final Thread thread;
 
-            List<Entry> newEntries;
-            try {
-                newEntries = this.cursor.readEntries(this.batchSize);
-            } catch (InterruptedException | ManagedLedgerException e) {
-                log.error(e, "Failed to read new entries from pulsar topic 
%s", topicName.toString());
-                throw new RuntimeException(e);
-            }
+        public DeserializeEntries() {
+            this.thread = new Thread(this);
+        }
+
+        public void interrupt() {
+            isRunning.set(false);
+            thread.interrupt();
+        }
+
+        public void start() {
+            this.thread.start();
+        }
 
-            newEntries.forEach(entry -> {
+        @Override
+        public void run() {
+            isRunning.set(true);
+            while (isRunning.get()) {
+                Entry entry;
                 try {
-                    completedBytes += entry.getDataBuffer().readableBytes();
+                    entry = entryQueue.take();
+                } catch (InterruptedException e) {
+                    break;
+                }
+                try {
+                    
completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
                     // filter entries that is not part of my split
                     if (((PositionImpl) 
entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
                         try {
                             MessageParser.parseMessage(topicName, 
entry.getLedgerId(), entry.getEntryId(),
                                     entry.getDataBuffer(), (messageId, 
message, byteBuf) -> {
-                                        messageQueue.add(message);
+                                        try {
+                                            messageQueue.put(message);
+                                        } catch (InterruptedException e) {
+                                            //no-op
+                                        }
                                     });
                         } catch (IOException e) {
                             log.error(e, "Failed to parse message from pulsar 
topic %s", topicName.toString());
@@ -216,15 +234,92 @@ public boolean advanceNextPosition() {
                 } finally {
                     entry.release();
                 }
-            });
+            }
         }
+    }
 
-        this.currentMessage = this.messageQueue.poll();
-        currentRecord = 
this.schemaHandler.deserialize(this.currentMessage.getData());
+    @VisibleForTesting
+    class ReadEntries implements AsyncCallbacks.ReadEntriesCallback {
+
+        // indicate whether there are any additional entries left to read
+        private final AtomicBoolean isDone = new AtomicBoolean(false);
+
+        //num of outstanding read requests
+        // set to 1 because we can only read one batch a time
+        private final AtomicLong outstandingReadsRequests = new AtomicLong(1);
+
+        public void run() {
+
+            if (outstandingReadsRequests.get() > 0) {
 
+                if (!cursor.hasMoreEntries() || ((PositionImpl) 
cursor.getReadPosition())
+                        .compareTo(pulsarSplit.getEndPosition()) >= 0) {
+                    isDone.set(true);
+
+                } else if (entryQueue.remainingCapacity() > batchSize) {
+                    outstandingReadsRequests.decrementAndGet();
+                    cursor.asyncReadEntries(batchSize, this, 
System.currentTimeMillis());
+                }
+            }
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> entries, Object ctx) {
+            entryQueue.addAll(entries);
+            outstandingReadsRequests.incrementAndGet();
+        }
+
+        public boolean hashFinished() {
+            return messageQueue.isEmpty() && entryQueue.isEmpty() && 
isDone.get() && outstandingReadsRequests.get() >=1;
+        }
+
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object 
ctx) {
+            log.debug(exception, "Failed to read entries from topic %s", 
topicName.toString());
+            outstandingReadsRequests.incrementAndGet();
+        }
+    }
+
+
+    @Override
+    public boolean advanceNextPosition() {
+
+        if (readEntries == null) {
+            readEntries = new ReadEntries();
+            readEntries.run();
+
+            // start deserialize thread
+            deserializeEntries = new DeserializeEntries();
+            deserializeEntries.start();
+        }
+
+        while(true) {
+            if (readEntries.hashFinished()) {
+                return false;
+            }
+
+            if (messageQueue.remainingCapacity() > 0) {
+                readEntries.run();
+            }
+
+            currentMessage = messageQueue.poll();
+            if (currentMessage != null) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(5);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        currentRecord = 
this.schemaHandler.deserialize(this.currentMessage.getData());
         return true;
     }
 
+
     @VisibleForTesting
     Object getRecord(int fieldIndex) {
         if (this.currentRecord == null) {
@@ -317,17 +412,13 @@ public boolean isNull(int field) {
     @Override
     public void close() {
 
-        if (this.cursor != null) {
-            try {
-                this.cursor.close();
-            } catch (Exception e) {
-                log.error(e);
-            }
+        if (deserializeEntries != null) {
+            deserializeEntries.interrupt();
         }
 
-        if (managedLedgerFactory != null) {
+        if (this.cursor != null) {
             try {
-                managedLedgerFactory.shutdown();
+                this.cursor.close();
             } catch (Exception e) {
                 log.error(e);
             }
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 0882efc4be..5d8472df38 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -27,6 +27,7 @@
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarcharType;
 import io.airlift.log.Logger;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.Position;
@@ -68,6 +69,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -662,6 +664,9 @@ public String apply(TopicName topicName) {
     @BeforeMethod
     public void setup() throws Exception {
         this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
+        this.pulsarConnectorConfig.setEntryReadBatchSize(1);
+        this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
+        this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
 
         Tenants tenants = mock(Tenants.class);
         doReturn(new LinkedList<>(topicNames.stream().map(new 
Function<TopicName, String>() {
@@ -786,77 +791,86 @@ public PositionImpl answer(InvocationOnMock 
invocationOnMock) throws Throwable {
                     }
                 });
 
-                when(readOnlyCursor.readEntries(anyInt())).thenAnswer(new 
Answer<List<Entry>>() {
+                doAnswer(new Answer() {
                     @Override
-                    public List<Entry> answer(InvocationOnMock 
invocationOnMock) throws Throwable {
+                    public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
                         Object[] args = invocationOnMock.getArguments();
                         Integer readEntries = (Integer) args[0];
+                        AsyncCallbacks.ReadEntriesCallback callback = 
(AsyncCallbacks.ReadEntriesCallback) args[1];
+                        Object ctx = args[2];
+
+                        new Thread(new Runnable() {
+                            @Override
+                            public void run() {
+                                List < Entry > entries = new LinkedList<>();
+                                for (int i = 0; i < readEntries; i++) {
+
+                                    Foo.Bar foobar = new Foo.Bar();
+                                    foobar.field1 = (int) 
fooFunctions.get("bar.test.foobar.field1").apply(count);
+
+                                    Boo boo1 = new Boo();
+                                    boo1.field4 = (double) 
fooFunctions.get("bar.test.field4").apply(count);
+                                    boo1.field5 = (boolean) 
fooFunctions.get("bar.test.field5").apply(count);
+                                    boo1.field6 = (long) 
fooFunctions.get("bar.test.field6").apply(count);
+                                    boo1.foo = new Foo();
+                                    boo1.boo = null;
+                                    boo1.bar = new Bar();
+                                    boo1.foobar = foobar;
+
+                                    Boo boo2 = new Boo();
+                                    boo2.field4 = (double) 
fooFunctions.get("bar.test2.field4").apply(count);
+                                    boo2.field5 = (boolean) 
fooFunctions.get("bar.test2.field5").apply(count);
+                                    boo2.field6 = (long) 
fooFunctions.get("bar.test2.field6").apply(count);
+                                    boo2.foo = new Foo();
+                                    boo2.boo = boo1;
+                                    boo2.bar = new Bar();
+                                    boo2.foobar = foobar;
+
+                                    TestPulsarConnector.Bar bar = new 
TestPulsarConnector.Bar();
+                                    bar.field1 = 
fooFunctions.get("bar.field1").apply(count) == null ? null : (int) 
fooFunctions.get("bar.field1").apply(count);
+                                    bar.field2 = 
fooFunctions.get("bar.field2").apply(count) == null ? null : (String) 
fooFunctions.get("bar.field2").apply(count);
+                                    bar.field3 = (float) 
fooFunctions.get("bar.field3").apply(count);
+                                    bar.test = boo1;
+                                    bar.test2 = count % 2 == 0 ? null : boo2;
+
+                                    Foo foo = new Foo();
+                                    foo.field1 = (int) 
fooFunctions.get("field1").apply(count);
+                                    foo.field2 = (String) 
fooFunctions.get("field2").apply(count);
+                                    foo.field3 = (float) 
fooFunctions.get("field3").apply(count);
+                                    foo.field4 = (double) 
fooFunctions.get("field4").apply(count);
+                                    foo.field5 = (boolean) 
fooFunctions.get("field5").apply(count);
+                                    foo.field6 = (long) 
fooFunctions.get("field6").apply(count);
+                                    foo.timestamp = (long) 
fooFunctions.get("timestamp").apply(count);
+                                    foo.time = (int) 
fooFunctions.get("time").apply(count);
+                                    foo.date = (int) 
fooFunctions.get("date").apply(count);
+                                    foo.bar = bar;
+
+                                    PulsarApi.MessageMetadata messageMetadata 
= PulsarApi.MessageMetadata.newBuilder()
+                                            
.setProducerName("test-producer").setSequenceId(positions.get(topic))
+                                            
.setPublishTime(System.currentTimeMillis()).build();
+
+                                    Schema schema = 
topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? 
AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+
+                                    
org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload
+                                            = 
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
+
+                                    ByteBuf byteBuf = 
serializeMetadataAndPayload
+                                            (Commands.ChecksumType.Crc32c, 
messageMetadata, payload);
+
+                                    completedBytes += byteBuf.readableBytes();
+
+                                    entries.add(EntryImpl.create(0, 
positions.get(topic), byteBuf));
+                                    positions.put(topic, positions.get(topic) 
+ 1);
+                                    count++;
+                                }
+
+                                callback.readEntriesComplete(entries, ctx);
+                            }
+                        }).start();
 
-                        List<Entry> entries = new LinkedList<>();
-                        for (int i = 0; i < readEntries; i++) {
-
-                            Foo.Bar foobar = new Foo.Bar();
-                            foobar.field1 = (int) 
fooFunctions.get("bar.test.foobar.field1").apply(count);
-
-                            Boo boo1 = new Boo();
-                            boo1.field4 = (double) 
fooFunctions.get("bar.test.field4").apply(count);
-                            boo1.field5 = (boolean) 
fooFunctions.get("bar.test.field5").apply(count);
-                            boo1.field6 = (long) 
fooFunctions.get("bar.test.field6").apply(count);
-                            boo1.foo = new Foo();
-                            boo1.boo = null;
-                            boo1.bar = new Bar();
-                            boo1.foobar = foobar;
-
-                            Boo boo2 = new Boo();
-                            boo2.field4 = (double) 
fooFunctions.get("bar.test2.field4").apply(count);
-                            boo2.field5 = (boolean) 
fooFunctions.get("bar.test2.field5").apply(count);
-                            boo2.field6 = (long) 
fooFunctions.get("bar.test2.field6").apply(count);
-                            boo2.foo = new Foo();
-                            boo2.boo = boo1;
-                            boo2.bar = new Bar();
-                            boo2.foobar = foobar;
-
-                            TestPulsarConnector.Bar bar = new 
TestPulsarConnector.Bar();
-                            bar.field1 = 
fooFunctions.get("bar.field1").apply(count) == null ? null : (int) 
fooFunctions.get("bar.field1").apply(count);
-                            bar.field2 = 
fooFunctions.get("bar.field2").apply(count) == null ? null : (String) 
fooFunctions.get("bar.field2").apply(count);
-                            bar.field3 = (float) 
fooFunctions.get("bar.field3").apply(count);
-                            bar.test = boo1;
-                            bar.test2 = count % 2 == 0 ? null : boo2;
-
-                            Foo foo = new Foo();
-                            foo.field1 = (int) 
fooFunctions.get("field1").apply(count);
-                            foo.field2 = (String) 
fooFunctions.get("field2").apply(count);
-                            foo.field3 = (float) 
fooFunctions.get("field3").apply(count);
-                            foo.field4 = (double) 
fooFunctions.get("field4").apply(count);
-                            foo.field5 = (boolean) 
fooFunctions.get("field5").apply(count);
-                            foo.field6 = (long) 
fooFunctions.get("field6").apply(count);
-                            foo.timestamp = (long) 
fooFunctions.get("timestamp").apply(count);
-                            foo.time = (int) 
fooFunctions.get("time").apply(count);
-                            foo.date = (int) 
fooFunctions.get("date").apply(count);
-                            foo.bar = bar;
-
-                            PulsarApi.MessageMetadata messageMetadata = 
PulsarApi.MessageMetadata.newBuilder()
-                                    
.setProducerName("test-producer").setSequenceId(positions.get(topic))
-                                    
.setPublishTime(System.currentTimeMillis()).build();
-
-                            Schema schema = 
topicsToSchemas.get(schemaName).getType() == SchemaType.AVRO ? 
AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
-
-                            org.apache.pulsar.shade.io.netty.buffer.ByteBuf 
payload
-                                    = 
org.apache.pulsar.shade.io.netty.buffer.Unpooled.copiedBuffer(schema.encode(foo));
-
-                            ByteBuf byteBuf = serializeMetadataAndPayload
-                                    (Commands.ChecksumType.Crc32c, 
messageMetadata, payload);
-
-                            completedBytes += byteBuf.readableBytes();
-
-                            entries.add(EntryImpl.create(0, 
positions.get(topic), byteBuf));
-                            positions.put(topic, positions.get(topic) + 1);
-                            count++;
-                        }
-
-                        return entries;
+                        return null;
                     }
-                });
+                }).when(readOnlyCursor).asyncReadEntries(anyInt(), any(), 
any());
 
                 when(readOnlyCursor.hasMoreEntries()).thenAnswer(new 
Answer<Boolean>() {
                     @Override
diff --git 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index 1b323a2948..b0fc42a2ac 100644
--- 
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ 
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -122,6 +122,7 @@ public void testTopics() throws Exception {
             Assert.assertEquals(count, 
topicsToNumEntries.get(topicName.getSchemaName()).longValue());
             Assert.assertEquals(pulsarRecordCursor.getCompletedBytes(), 
completedBytes);
             cleanup();
+            pulsarRecordCursor.close();
         }
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to