This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-paimon-presto.git
The following commit(s) were added to refs/heads/main by this push:
new aad39a2 Split page to small pages (#17)
aad39a2 is described below
commit aad39a237504b4761eef9a9256e8f8f2fb1ef549
Author: humengyu <[email protected]>
AuthorDate: Wed Nov 22 20:05:27 2023 +0800
Split page to small pages (#17)
---
.../org/apache/paimon/presto/TestPrestoITCase.java | 7 ++++
.../org/apache/paimon/presto/TestPrestoITCase.java | 7 ++++
.../apache/paimon/presto/PrestoPageSourceBase.java | 43 ++++++++++++++++------
.../org/apache/paimon/presto/TestPrestoITCase.java | 7 ++++
4 files changed, 53 insertions(+), 11 deletions(-)
diff --git
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
---
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
.isEqualTo("[[1, 0, user, 0, APPEND]]");
}
+ @Test
+ public void testLimit() throws Exception {
+ assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT
1")).isEqualTo("[[1, 2, 1, 1]]");
+ assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+ .isEqualTo("[[5, 6, 3, 3]]");
+ }
+
@Test
public void testProjection() throws Exception {
assertThat(sql("SELECT * FROM paimon.default.t1"))
diff --git
a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
---
a/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-0.268/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
.isEqualTo("[[1, 0, user, 0, APPEND]]");
}
+ @Test
+ public void testLimit() throws Exception {
+ assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT
1")).isEqualTo("[[1, 2, 1, 1]]");
+ assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+ .isEqualTo("[[5, 6, 3, 3]]");
+ }
+
@Test
public void testProjection() throws Exception {
assertThat(sql("SELECT * FROM paimon.default.t1"))
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
index 22b5185..a99e886 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoPageSourceBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.guava30.com.google.common.base.Verify;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
+import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.InternalRowUtils;
import com.facebook.presto.common.Page;
@@ -48,6 +49,8 @@ import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.PrestoException;
import io.airlift.slice.Slice;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
@@ -72,16 +75,19 @@ import static java.lang.String.format;
/** Presto {@link ConnectorPageSource}. */
public abstract class PrestoPageSourceBase implements ConnectorPageSource {
- private final RecordReader<InternalRow> reader;
+ private static final int ROWS_PER_REQUEST = 4096;
+
+ private final CloseableIterator<InternalRow> iterator;
private final PageBuilder pageBuilder;
private final List<Type> prestoColumnTypes;
private final List<DataType> paimonColumnTypes;
private boolean isFinished = false;
+ private long numReturn = 0;
public PrestoPageSourceBase(
RecordReader<InternalRow> reader, List<ColumnHandle>
projectedColumns) {
- this.reader = reader;
+ this.iterator = reader.toCloseableIterator();
this.prestoColumnTypes = new ArrayList<>();
this.paimonColumnTypes = new ArrayList<>();
for (ColumnHandle handle : projectedColumns) {
@@ -131,15 +137,18 @@ public abstract class PrestoPageSourceBase implements
ConnectorPageSource {
return 0;
}
+ @Nullable
private Page nextPage() throws IOException {
- RecordReader.RecordIterator<InternalRow> batch = reader.readBatch();
- if (batch == null) {
- isFinished = true;
- return null;
- }
- InternalRow row;
- while ((row = batch.next()) != null) {
+ int count = 0;
+ while (count < ROWS_PER_REQUEST && !pageBuilder.isFull()) {
+ if (!iterator.hasNext()) {
+ isFinished = true;
+ return returnPage(count);
+ }
+
+ InternalRow row = iterator.next();
pageBuilder.declarePosition();
+ count++;
for (int i = 0; i < prestoColumnTypes.size(); i++) {
BlockBuilder output = pageBuilder.getBlockBuilder(i);
appendTo(
@@ -149,7 +158,15 @@ public abstract class PrestoPageSourceBase implements
ConnectorPageSource {
output);
}
}
- batch.releaseBatch();
+
+ return returnPage(count);
+ }
+
+ private Page returnPage(int count) {
+ if (count == 0) {
+ return null;
+ }
+ numReturn += count;
Page page = pageBuilder.build();
pageBuilder.reset();
return page;
@@ -157,7 +174,11 @@ public abstract class PrestoPageSourceBase implements
ConnectorPageSource {
@Override
public void close() throws IOException {
- this.reader.close();
+ try {
+ this.iterator.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
private void appendTo(Type prestoType, DataType paimonType, Object value,
BlockBuilder output) {
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 96e4654..672a402 100644
---
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -186,6 +186,13 @@ public class TestPrestoITCase {
.isEqualTo("[[1, 0, user, 0, APPEND]]");
}
+ @Test
+ public void testLimit() throws Exception {
+ assertThat(sql("SELECT * FROM paimon.default.t1 LIMIT
1")).isEqualTo("[[1, 2, 1, 1]]");
+ assertThat(sql("SELECT * FROM paimon.default.t1 WHERE a = 5 LIMIT 1"))
+ .isEqualTo("[[5, 6, 3, 3]]");
+ }
+
@Test
public void testProjection() throws Exception {
assertThat(sql("SELECT * FROM paimon.default.t1"))