sijie commented on a change in pull request #9628:
URL: https://github.com/apache/pulsar/pull/9628#discussion_r585897620



##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -654,4 +671,15 @@ private void checkFieldType(int field, Class<?> expected) {
         checkArgument(actual == expected, "Expected field %s to be type %s but 
is %s", field, expected, actual);
     }
 
+    private void initEntryCacheSizeAllocator(PulsarConnectorConfig 
connectorConfig) {
+        log.info("Init entry cache size allocator with max split entry queue 
size bytes {}.",

Review comment:
       Move this log message to the `if` block?

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/util/NoStrictCacheSizeAllocator.java
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto.util;
+
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Cache size allocator.
+ */
+public class NoStrictCacheSizeAllocator implements CacheSizeAllocator {
+
+    private final long maxCacheSize;
+    private final LongAdder availableCacheSize;
+    private final ReentrantReadWriteLock lock;
+
+    public NoStrictCacheSizeAllocator(long maxCacheSize) {
+        this.maxCacheSize = maxCacheSize;
+        this.availableCacheSize = new LongAdder();
+        this.availableCacheSize.add(maxCacheSize);
+        this.lock = new ReentrantReadWriteLock();
+    }
+
+    public long getAvailableCacheSize() {
+        try {
+            lock.readLock().lock();

Review comment:
       Why do we need a lock?

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -222,7 +228,7 @@ public void 
setPulsarSqlSchemaInfoProvider(PulsarSqlSchemaInfoProvider schemaInf
     @VisibleForTesting
     class DeserializeEntries implements Runnable {
 
-    protected boolean isRunning = false;
+        protected boolean isRunning = false;

Review comment:
       is this introduced by mistake?

##########
File path: 
pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
##########
@@ -347,8 +355,16 @@ public void run() {
 
                             entriesProcessed += entriesToSkip;
                         } else {
-                            outstandingReadsRequests.decrementAndGet();
-                            cursor.asyncReadEntries(batchSize, this, 
System.nanoTime(), PositionImpl.latest);
+                            long maxSizeBytes = 
entryCacheSizeAllocator.getAvailableCacheSize();
+                            // if the available size is invalid and the entry 
queue size is 0, read one entry
+                            if (maxSizeBytes > 0 || entryQueue.size() == 0) {
+                                outstandingReadsRequests.decrementAndGet();
+                                cursor.asyncReadEntries(batchSize, 
maxSizeBytes,
+                                        this, System.nanoTime(), 
PositionImpl.latest);
+                            } else {
+                                metricsTracker.incr_READ_ATTEMPTS_FAIL();

Review comment:
       If people doesn't configure this setting, we are initializing a 
NullCacheSizeAllocator. So it will cause `metricsTracker` to increase 
`READ_ATTEMPTS_FAIL` again and again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to