This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-presto.git
The following commit(s) were added to refs/heads/main by this push:
new 767c328 [presto] Support time travel scan for paimon-presto connector
(#48)
767c328 is described below
commit 767c328778d8fdcff0b7ea9fd28f36fd5e8567a7
Author: xiangyu0xf <[email protected]>
AuthorDate: Thu Jan 9 13:57:31 2025 +0800
[presto] Support time travel scan for paimon-presto connector (#48)
---
.../org/apache/paimon/presto/TestPrestoITCase.java | 26 +++++++++++++++++++++-
.../org/apache/paimon/presto/PrestoMetadata.java | 17 +++++++++++---
.../paimon/presto/PrestoSessionProperties.java | 16 ++++++++++++-
.../org/apache/paimon/presto/TestPrestoITCase.java | 13 ++++++++++-
4 files changed, 66 insertions(+), 6 deletions(-)
diff --git
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 344ed55..d57d9db 100644
---
a/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-0.236/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -37,6 +37,7 @@ import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import com.facebook.presto.Session;
import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
@@ -49,6 +50,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import static com.facebook.airlift.testing.Closeables.closeAllSuppress;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
@@ -212,7 +214,18 @@ public class TestPrestoITCase {
@Test
public void testFilter() throws Exception {
- assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
+ assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 7"))
+ .isEqualTo("[[1, 1], [3, 2], [5, 3]]");
+ }
+
+ @Test
+ public void testFilterWithTimeTravel() throws Exception {
+ // Time travel table t2 to first commit.
+ assertThat(
+ sql(
+ "SELECT a, aCa FROM paimon.default.t2 WHERE a
< 7",
+ PrestoSessionProperties.SCAN_VERSION,
+ "1"))
.isEqualTo("[[1, 1], [3, 2]]");
}
@@ -224,6 +237,17 @@ public class TestPrestoITCase {
.isEqualTo("[[1, 1, 3, 3], [2, 3, 3, 3]]");
}
+ private String sql(String sql, String key, String value) throws Exception {
+ Session session =
+ testSessionBuilder().setCatalogSessionProperty("paimon", key,
value).build();
+ MaterializedResult result = queryRunner.execute(session, sql);
+ return result.getMaterializedRows().stream()
+ .map(Object::toString)
+ .sorted()
+ .collect(Collectors.toList())
+ .toString();
+ }
+
private String sql(String sql) throws Exception {
MaterializedResult result = queryRunner.execute(sql);
return result.getMaterializedRows().toString();
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
index 80463b7..b1a3aeb 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoMetadata.java
@@ -27,6 +27,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.security.SecurityContext;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.StringUtils;
@@ -130,14 +131,24 @@ public class PrestoMetadata implements ConnectorMetadata {
@Override
public PrestoTableHandle getTableHandle(ConnectorSession session,
SchemaTableName tableName) {
- return getTableHandle(tableName);
+ return getTableHandle(tableName,
PrestoSessionProperties.getScanVersion(session));
}
- public PrestoTableHandle getTableHandle(SchemaTableName tableName) {
+ public PrestoTableHandle getTableHandle(SchemaTableName tableName, String
scanVersion) {
Identifier tablePath = new Identifier(tableName.getSchemaName(),
tableName.getTableName());
byte[] serializedTable;
try {
- serializedTable =
InstantiationUtil.serializeObject(catalog.getTable(tablePath));
+ Table table = catalog.getTable(tablePath);
+ if (!StringUtils.isBlank(scanVersion)) {
+ table =
+ table.copy(
+ new HashMap<String, String>() {
+ {
+ put(CoreOptions.SCAN_VERSION.key(),
scanVersion);
+ }
+ });
+ }
+ serializedTable = InstantiationUtil.serializeObject(table);
} catch (Catalog.TableNotExistException e) {
return null;
} catch (IOException e) {
diff --git
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
index 3888cb9..2d4704f 100644
---
a/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
+++
b/paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoSessionProperties.java
@@ -27,12 +27,16 @@ import javax.inject.Inject;
import java.util.List;
import static com.facebook.presto.spi.session.PropertyMetadata.booleanProperty;
+import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
/** Presto {@link PrestoSessionProperties}. */
public class PrestoSessionProperties {
public static final String QUERY_PUSHDOWN_ENABLED =
"query_pushdown_enabled";
public static final String PARTITION_PRUNE_ENABLED =
"partition_prune_enabled";
+ public static final String RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED =
+ "range_filters_on_subscripts_enabled";
+ public static final String SCAN_VERSION = "scan_version";
private final List<PropertyMetadata<?>> sessionProperties;
@@ -49,7 +53,13 @@ public class PrestoSessionProperties {
PARTITION_PRUNE_ENABLED,
"Enable paimon query partition prune",
config.isPaimonPartitionPruningEnabled(),
- false));
+ false),
+ booleanProperty(
+ RANGE_FILTERS_ON_SUBSCRIPTS_ENABLED,
+ "Whether to enable pushdown of range filters
on subscripts like (a[2] = 5)",
+ false,
+ false),
+ stringProperty(SCAN_VERSION, "Paimon table scan
version", null, false));
}
public List<PropertyMetadata<?>> getSessionProperties() {
@@ -63,4 +73,8 @@ public class PrestoSessionProperties {
public static boolean isPartitionPruneEnabled(ConnectorSession session) {
return session.getProperty(PARTITION_PRUNE_ENABLED, Boolean.class);
}
+
+ public static String getScanVersion(ConnectorSession session) {
+ return session.getProperty(SCAN_VERSION, String.class);
+ }
}
diff --git
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
index 03678f2..8643fc3 100644
---
a/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
+++
b/paimon-presto-common/src/test/java/org/apache/paimon/presto/TestPrestoITCase.java
@@ -355,7 +355,18 @@ public class TestPrestoITCase {
@Test
public void testFilter() throws Exception {
- assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 4"))
+ assertThat(sql("SELECT a, aCa FROM paimon.default.t2 WHERE a < 7"))
+ .isEqualTo("[[1, 1], [3, 2], [5, 3]]");
+ }
+
+ @Test
+ public void testFilterWithTimeTravel() throws Exception {
+ // Time travel table t2 to first commit.
+ assertThat(
+ sql(
+ "SELECT a, aCa FROM paimon.default.t2 WHERE a
< 7",
+ PrestoSessionProperties.SCAN_VERSION,
+ "1"))
.isEqualTo("[[1, 1], [3, 2]]");
}