This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-presto.git


The following commit(s) were added to refs/heads/main by this push:
     new aad39a2  Split page to small pages (#17)
aad39a2 is described below

commit aad39a237504b4761eef9a9256e8f8f2fb1ef549
Author: humengyu <[email protected]>
AuthorDate: Wed Nov 22 20:05:27 2023 +0800

    Split page to small pages (#17)
---
 .../org/apache/paimon/presto/TestPrestoITCase.java |  7 ++++
 .../org/apache/paimon/presto/TestPrestoITCase.java |  7 ++++
 .../apache/paimon/presto/PrestoPageSourceBase.java | 43 ++++++++++++++++------
 .../org/apache/paimon/presto/TestPrestoITCase.java |  7 ++++
 4 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
 
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
--- 
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++ 
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
                 .isEqualTo("[[1, 0, user, 0, APPEND]]");
     }
 
+    @Test
+    public void testLimit() throws Exception {
+        assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT 
1")).isEqualTo("[[1, 2, 1, 1]]");
+        assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+                .isEqualTo("[[5, 6, 3, 3]]");
+    }
+
     @Test
     public void testProjection() throws Exception {
         assertThat(sql("SELECT * FROM paimon.default.t1"))
diff --git 
a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
 
b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
--- 
a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++ 
b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
                 .isEqualTo("[[1, 0, user, 0, APPEND]]");
     }
 
+    @Test
+    public void testLimit() throws Exception {
+        assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT 
1")).isEqualTo("[[1, 2, 1, 1]]");
+        assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+                .isEqualTo("[[5, 6, 3, 3]]");
+    }
+
     @Test
     public void testProjection() throws Exception {
         assertThat(sql("SELECT * FROM paimon.default.t1"))
diff --git 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
index 22b5185..a99e886 100644
--- 
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
+++ 
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.shade.guava30.com.google.common.base.Verify;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.InternalRowUtils;
 
 import com.facebook.presto.common.Page;
@@ -48,6 +49,8 @@ import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.PrestoException;
 import io.airlift.slice.Slice;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.math.BigDecimal;
@@ -72,16 +75,19 @@ import static java.lang.String.format;
 /** Presto {@link ConnectorPageSource}. */
 public abstract class PrestoPageSourceBase implements ConnectorPageSource {
 
-    private final RecordReader<InternalRow> reader;
+    private static final int ROWS_PER_REQUEST = 4096;
+
+    private final CloseableIterator<InternalRow> iterator;
     private final PageBuilder pageBuilder;
     private final List<Type> prestoColumnTypes;
     private final List<DataType> paimonColumnTypes;
 
     private boolean isFinished = false;
+    private long numReturn = 0;
 
     public PrestoPageSourceBase(
             RecordReader<InternalRow> reader, List<ColumnHandle> 
projectedColumns) {
-        this.reader = reader;
+        this.iterator = reader.toCloseableIterator();
         this.prestoColumnTypes = new ArrayList<>();
         this.paimonColumnTypes = new ArrayList<>();
         for (ColumnHandle handle : projectedColumns) {
@@ -131,15 +137,18 @@ public abstract class PrestoPageSourceBase implements 
ConnectorPageSource {
         return 0;
     }
 
+    @Nullable
     private Page nextPage() throws IOException {
-        RecordReader.RecordIterator<InternalRow> batch = reader.readBatch();
-        if (batch == null) {
-            isFinished = true;
-            return null;
-        }
-        InternalRow row;
-        while ((row = batch.next()) != null) {
+        int count = 0;
+        while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
+            if (!iterator.hasNext()) {
+                isFinished = true;
+                return returnPage(count);
+            }
+
+            InternalRow row = iterator.next();
             pageBuilder.declarePosition();
+            count++;
             for (int i = 0; i < prestoColumnTypes.size(); i++) {
                 BlockBuilder output = pageBuilder.getBlockBuilder(i);
                 appendTo(
@@ -149,7 +158,15 @@ public abstract class PrestoPageSourceBase implements 
ConnectorPageSource {
                         output);
             }
         }
-        batch.releaseBatch();
+
+        return returnPage(count);
+    }
+
+    private Page returnPage(int count) {
+        if (count == 0) {
+            return null;
+        }
+        numReturn += count;
         Page page = pageBuilder.build();
         pageBuilder.reset();
         return page;
@@ -157,7 +174,11 @@ public abstract class PrestoPageSourceBase implements 
ConnectorPageSource {
 
     @Override
     public void close() throws IOException {
-        this.reader.close();
+        try {
+            this.iterator.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     private void appendTo(Type prestoType, DataType paimonType, Object value, 
BlockBuilder output) {
diff --git 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
--- 
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++ 
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
                 .isEqualTo("[[1, 0, user, 0, APPEND]]");
     }
 
+    @Test
+    public void testLimit() throws Exception {
+        assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT 
1")).isEqualTo("[[1, 2, 1, 1]]");
+        assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+                .isEqualTo("[[5, 6, 3, 3]]");
+    }
+
     @Test
     public void testProjection() throws Exception {
         assertThat(sql("SELECT * FROM paimon.default.t1"))

Reply via email to