This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push:
new d8584ab add tag query support by FOR VERSION AS OF (#78)
d8584ab is described below
commit d8584ab9763f861d4c748b03bde3e12b55dda07f
Author: chun.ji <[email protected]>
AuthorDate: Sun Aug 11 22:00:50 2024 +0800
add tag query support by FOR VERSION AS OF (#78)
---
.../org/apache/paimon/trino/TrinoMetadata.java | 51 ++++++++++++++++++++--
.../apache/paimon/trino/SimpleTableTestHelper.java | 6 +++
.../org/apache/paimon/trino/TestTrinoITCase.java | 14 ++++++
3 files changed, 68 insertions(+), 3 deletions(-)
diff --git
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
index 926ca64..468d8aa 100644
---
a/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
+++
b/paimon-trino-common/src/main/java/org/apache/paimon/trino/TrinoMetadata.java
@@ -21,12 +21,15 @@ package org.apache.paimon.trino;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.trino.catalog.TrinoCatalog;
import org.apache.paimon.utils.StringUtils;
+import io.airlift.slice.Slice;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.Assignment;
import io.trino.spi.connector.ColumnHandle;
@@ -49,7 +52,9 @@ import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.type.LongTimestampWithTimeZone;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
+import io.trino.spi.type.VarcharType;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -73,6 +78,7 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Trino {@link ConnectorMetadata}. */
public class TrinoMetadata implements ConnectorMetadata {
+ private static final String TAG_PREFIX = "tag-";
protected final TrinoCatalog catalog;
@@ -167,9 +173,48 @@ public class TrinoMetadata implements ConnectorMetadata {
}
case TARGET_ID:
{
- dynamicOptions.put(
- CoreOptions.SCAN_SNAPSHOT_ID.key(),
- version.getVersion().toString());
+ String tagOrVersion;
+ if (versionType instanceof VarcharType) {
+ tagOrVersion =
+ BinaryString.fromBytes(
+ ((Slice)
version.getVersion()).getBytes())
+ .toString();
+ } else {
+ tagOrVersion = version.getVersion().toString();
+ }
+
+ // if value is not number, set tag option
+ boolean isNumber = StringUtils.isNumeric(tagOrVersion);
+ if (!isNumber) {
+
dynamicOptions.put(CoreOptions.SCAN_TAG_NAME.key(), tagOrVersion);
+ } else {
+ try {
+ catalog.initSession(session);
+ String path =
+ catalog.getTable(
+ new Identifier(
+
tableName.getSchemaName(),
+
tableName.getTableName()))
+ .options()
+ .get("path");
+
+ if (catalog.fileIO()
+ .exists(
+ new Path(
+ path
+ + "/tag/"
+ + TAG_PREFIX
+ +
tagOrVersion))) {
+ dynamicOptions.put(
+ CoreOptions.SCAN_TAG_NAME.key(),
tagOrVersion);
+ } else {
+ dynamicOptions.put(
+
CoreOptions.SCAN_SNAPSHOT_ID.key(), tagOrVersion);
+ }
+ } catch (IOException |
Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ }
break;
}
}
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
index 5caef5c..456dcad 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/SimpleTableTestHelper.java
@@ -36,6 +36,7 @@ public class SimpleTableTestHelper {
private final InnerTableWrite writer;
private final InnerTableCommit commit;
+ private final FileStoreTable table;
public SimpleTableTestHelper(Path path, RowType rowType) throws Exception {
new SchemaManager(LocalFileIO.create(), path)
@@ -47,6 +48,7 @@ public class SimpleTableTestHelper {
Collections.singletonMap("bucket", "1"),
""));
FileStoreTable table =
FileStoreTableFactory.create(LocalFileIO.create(), path);
+ this.table = table;
String user = "user";
this.writer = table.newWrite(user);
this.commit = table.newCommit(user);
@@ -59,4 +61,8 @@ public class SimpleTableTestHelper {
public void commit() throws Exception {
commit.commit(0, writer.prepareCommit(true, 0));
}
+
+ public void createTag(String name) {
+ table.createTag(name);
+ }
}
diff --git
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
index 59378a6..0258ff8 100644
---
a/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
+++
b/paimon-trino-common/src/test/java/org/apache/paimon/trino/TestTrinoITCase.java
@@ -102,10 +102,12 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
testHelper2.write(GenericRow.of(1, 2L, fromString("1"),
fromString("1")));
testHelper2.write(GenericRow.of(3, 4L, fromString("2"),
fromString("2")));
testHelper2.commit();
+ testHelper2.createTag("1");
t2FirstCommitTimestamp = System.currentTimeMillis();
testHelper2.write(GenericRow.of(5, 6L, fromString("3"),
fromString("3")));
testHelper2.write(GenericRow.of(7, 8L, fromString("4"),
fromString("4")));
testHelper2.commit();
+ testHelper2.createTag("tag-2");
{
Path tablePath3 = new Path(warehouse, "default.db/t3");
@@ -795,6 +797,18 @@ public abstract class TestTrinoITCase extends
AbstractTestQueryFramework {
.isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2], [5, 6, 3, 3], [7, 8,
4, 4]]");
}
+ @Test
+ public void testTimeTravelWithTag() {
+ // tag or snapshotId is string
+ assertThat(sql("SELECT * FROM paimon.default.t2 FOR VERSION AS OF
'1'"))
+ .isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2]]");
+ assertThat(sql("SELECT * FROM paimon.default.t2 FOR VERSION AS OF
'tag-2'"))
+ .isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2], [5, 6, 3, 3], [7, 8,
4, 4]]");
+ // tag or snapshotId is int
+ assertThat(sql("SELECT * FROM paimon.default.t2 FOR VERSION AS OF 1"))
+ .isEqualTo("[[1, 2, 1, 1], [3, 4, 2, 2]]");
+ }
+
@Test
public void testSchemaEvolution() {
assertThat(