This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/paimon-trino.git
The following commit(s) were added to refs/heads/main by this push: new 5144ad9 Update Paimon core to 1.0-SNAPSHOT 5144ad9 is described below commit 5144ad9087e38a3165ae3b160560f1edd2b29156 Author: Jingsong <jingsongl...@gmail.com> AuthorDate: Tue Nov 26 15:18:25 2024 +0800 Update Paimon core to 1.0-SNAPSHOT fix --- .../org/apache/paimon/trino/TrinoMetadata.java | 35 +++++++------ .../trino/TrinoNodePartitioningProvider.java | 10 +--- .../apache/paimon/trino/TrinoPageSinkProvider.java | 13 ++--- .../paimon/trino/TrinoPageSourceProvider.java | 2 +- .../paimon/trino/TrinoPartitioningHandle.java | 16 +----- .../org/apache/paimon/trino/TrinoSplitManager.java | 2 +- .../apache/paimon/trino/TrinoTableOptionUtils.java | 3 -- .../apache/paimon/trino/catalog/TrinoCatalog.java | 61 +++++++++------------- .../paimon/trino/TestTrinoPartitioningHandle.java | 4 +- pom.xml | 2 +- 10 files changed, 55 insertions(+), 93 deletions(-) diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java index 359bad3..8fac45c 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoMetadata.java @@ -93,8 +93,6 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.paimon.table.BucketMode.FIXED; -import static org.apache.paimon.table.BucketMode.UNAWARE; import static org.apache.paimon.trino.TrinoColumnHandle.TRINO_ROW_ID_NAME; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -124,19 +122,18 @@ public class TrinoMetadata implements ConnectorMetadata { FileStoreTable storeTable = (FileStoreTable) table; BucketMode bucketMode = storeTable.bucketMode(); switch (bucketMode) { - case FIXED: + case HASH_FIXED: try { return Optional.of( new ConnectorTableLayout( new TrinoPartitioningHandle( - InstantiationUtil.serializeObject(storeTable.schema()), - FIXED), + InstantiationUtil.serializeObject(storeTable.schema())), storeTable.schema().bucketKeys(), false)); } catch (IOException e) { throw new RuntimeException(e); } - case UNAWARE: + case BUCKET_UNAWARE: return Optional.empty(); default: throw new IllegalArgumentException("Unknown table bucket mode: " + bucketMode); @@ -230,7 +227,7 @@ public class TrinoMetadata implements ConnectorMetadata { } FileStoreTable storeTable = (FileStoreTable) table; BucketMode bucketMode = storeTable.bucketMode(); - if (bucketMode != FIXED) { + if (bucketMode != BucketMode.HASH_FIXED) { throw new IllegalArgumentException("Unsupported table bucket mode: " + bucketMode); } Set<String> pkSet = new HashSet<>(table.primaryKeys()); @@ -252,13 +249,13 @@ public class TrinoMetadata implements ConnectorMetadata { } FileStoreTable storeTable = (FileStoreTable) table; BucketMode bucketMode = storeTable.bucketMode(); - if (bucketMode != FIXED) { + if (bucketMode != BucketMode.HASH_FIXED) { throw new IllegalArgumentException("Unsupported table bucket mode: " + bucketMode); } try { return Optional.of( new TrinoPartitioningHandle( - InstantiationUtil.serializeObject(storeTable.schema()), FIXED)); + InstantiationUtil.serializeObject(storeTable.schema()))); } catch (IOException e) { throw new RuntimeException(e); } @@ -282,7 +279,12 @@ public class TrinoMetadata implements ConnectorMetadata { @Override public boolean schemaExists(ConnectorSession session, String schemaName) { catalog.initSession(session); - return catalog.databaseExists(schemaName); + try { + catalog.getDatabase(schemaName); + return true; + } catch (Catalog.DatabaseNotExistException e) { + return false; + } } @Override @@ -427,11 +429,14 @@ public class TrinoMetadata implements ConnectorMetadata { SchemaTableName tableName, Map<String, String> dynamicOptions) { catalog.initSession(session); - return catalog.tableExists( - Identifier.create(tableName.getSchemaName(), tableName.getTableName())) - ? new TrinoTableHandle( - tableName.getSchemaName(), tableName.getTableName(), dynamicOptions) - : null; + try { + catalog.getTable( + Identifier.create(tableName.getSchemaName(), tableName.getTableName())); + return new TrinoTableHandle( + tableName.getSchemaName(), tableName.getTableName(), dynamicOptions); + } catch (Catalog.TableNotExistException e) { + return null; + } } @Override diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java index 0aed279..e7cf7eb 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoNodePartitioningProvider.java @@ -18,8 +18,6 @@ package org.apache.paimon.trino; -import org.apache.paimon.table.BucketMode; - import com.google.inject.Inject; import io.trino.spi.connector.BucketFunction; import io.trino.spi.connector.ConnectorNodePartitioningProvider; @@ -46,11 +44,7 @@ public class TrinoNodePartitioningProvider implements ConnectorNodePartitioningP // todo support dynamic bucket tables TrinoPartitioningHandle trinoPartitioningHandle = (TrinoPartitioningHandle) partitioningHandle; - if (trinoPartitioningHandle.getBucketMode() == BucketMode.FIXED) { - return new FixedBucketTableShuffleFunction( - partitionChannelTypes, trinoPartitioningHandle, workerCount); - } - throw new UnsupportedOperationException( - "Unsupported table bucket mode: " + trinoPartitioningHandle.getBucketMode()); + return new FixedBucketTableShuffleFunction( + partitionChannelTypes, trinoPartitioningHandle, workerCount); } } diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java index 1ee2601..af52d5f 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSinkProvider.java @@ -92,18 +92,11 @@ public class TrinoPageSinkProvider implements ConnectorPageSinkProvider { BucketMode mode = table instanceof FileStoreTable ? ((FileStoreTable) table).bucketMode() - : BucketMode.FIXED; + : BucketMode.HASH_FIXED; switch (mode) { - case FIXED: - case UNAWARE: + case HASH_FIXED: + case BUCKET_UNAWARE: break; - case DYNAMIC: - case GLOBAL_DYNAMIC: - if (table.primaryKeys().isEmpty()) { - throw new IllegalArgumentException( - "Only primary-key table can support dynamic bucket."); - } - throw new IllegalArgumentException("Global dynamic bucket mode are not supported"); default: throw new IllegalArgumentException("Unknown bucket mode: " + mode); } diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java index 41bb874..171dfd8 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java @@ -205,7 +205,7 @@ public class TrinoPageSourceProvider implements ConnectorPageSourceProvider { new Path(indexFile.path()), ((FileStoreTable) table).fileIO(), rowType)) { - if (!fileIndexPredicate.testPredicate(paimonFilter.get())) { + if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) { continue; } } diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java index 0da69d8..224adc0 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoPartitioningHandle.java @@ -19,7 +19,6 @@ package org.apache.paimon.trino; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.InstantiationUtil; import com.fasterxml.jackson.annotation.JsonCreator; @@ -33,14 +32,10 @@ import java.util.Arrays; public class TrinoPartitioningHandle implements ConnectorPartitioningHandle { private final byte[] schema; - private final BucketMode bucketMode; @JsonCreator - public TrinoPartitioningHandle( - @JsonProperty("schema") byte[] schema, - @JsonProperty("bucketMode") BucketMode bucketMode) { + public TrinoPartitioningHandle(@JsonProperty("schema") byte[] schema) { this.schema = schema; - this.bucketMode = bucketMode; } @JsonProperty @@ -48,17 +43,10 @@ public class TrinoPartitioningHandle implements ConnectorPartitioningHandle { return schema; } - @JsonProperty - public BucketMode getBucketMode() { - return bucketMode; - } - public TableSchema getOriginalSchema() { try { return InstantiationUtil.deserializeObject(this.schema, getClass().getClassLoader()); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (ClassNotFoundException e) { + } catch (IOException | ClassNotFoundException e) { throw new RuntimeException(e); } } diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java index 038e522..120b0e9 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoSplitManager.java @@ -72,7 +72,7 @@ public class TrinoSplitManager implements ConnectorSplitManager { .convert(tableHandle.getFilter()) .ifPresent(readBuilder::withFilter); tableHandle.getLimit().ifPresent(limit -> readBuilder.withLimit((int) limit)); - List<Split> splits = readBuilder.newScan().plan().splits(); + List<Split> splits = readBuilder.dropStats().newScan().plan().splits(); long maxRowCount = splits.stream().mapToLong(Split::rowCount).max().orElse(0L); double minimumSplitWeight = TrinoSessionProperties.getMinimumSplitWeight(session); diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java index c0b9344..3504fb5 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/TrinoTableOptionUtils.java @@ -93,7 +93,6 @@ public class TrinoTableOptionUtils { private static boolean isEnum(String className) { switch (className) { - case "FileFormatType": case "StartupMode": case "MergeEngine": case "ChangelogProducer": @@ -108,8 +107,6 @@ public class TrinoTableOptionUtils { private static Class<?> buildClass(String className) { switch (className) { - case "FileFormatType": - return CoreOptions.FileFormatType.class; case "MergeEngine": return CoreOptions.MergeEngine.class; case "ChangelogProducer": diff --git a/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java b/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java index 2caab17..aab9057 100644 --- a/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java +++ b/paimon-trino-440/src/main/java/org/apache/paimon/trino/catalog/TrinoCatalog.java @@ -20,14 +20,10 @@ package org.apache.paimon.trino.catalog; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.CatalogLockContext; -import org.apache.paimon.catalog.CatalogLockFactory; -import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.catalog.*; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; @@ -43,7 +39,6 @@ import org.apache.hadoop.conf.Configuration; import java.util.List; import java.util.Map; -import java.util.Optional; /** Trino catalog, use it after set session. */ public class TrinoCatalog implements Catalog { @@ -120,21 +115,11 @@ public class TrinoCatalog implements Catalog { return current.fileIO(); } - @Override - public Optional<CatalogLockFactory> lockFactory() { - return current.lockFactory(); - } - @Override public List<String> listDatabases() { return current.listDatabases(); } - @Override - public boolean databaseExists(String s) { - return current.databaseExists(s); - } - @Override public void createDatabase(String s, boolean b, Map<String, String> map) throws DatabaseAlreadyExistException { @@ -142,8 +127,8 @@ public class TrinoCatalog implements Catalog { } @Override - public Map<String, String> loadDatabaseProperties(String s) throws DatabaseNotExistException { - return current.loadDatabaseProperties(s); + public Database getDatabase(String name) throws DatabaseNotExistException { + return current.getDatabase(name); } @Override @@ -157,6 +142,11 @@ public class TrinoCatalog implements Catalog { return current.getTable(identifier); } + @Override + public Path getTableLocation(Identifier identifier) { + return current.getTableLocation(identifier); + } + @Override public List<String> listTables(String s) throws DatabaseNotExistException { return current.listTables(s); @@ -185,12 +175,24 @@ public class TrinoCatalog implements Catalog { current.alterTable(identifier, list, ignoreIfExists); } + @Override + public void createPartition(Identifier identifier, Map<String, String> map) + throws TableNotExistException { + current.createPartition(identifier, map); + } + @Override public void dropPartition(Identifier identifier, Map<String, String> partitions) throws TableNotExistException, PartitionNotExistException { current.dropPartition(identifier, partitions); } + @Override + public List<PartitionEntry> listPartitions(Identifier identifier) + throws TableNotExistException { + return current.listPartitions(identifier); + } + @Override public void close() throws Exception { if (current != null) { @@ -198,27 +200,12 @@ public class TrinoCatalog implements Catalog { } } - @Override - public Optional<CatalogLockContext> lockContext() { - return current.lockContext(); - } - - @Override - public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier identifier) { - return current.metastoreClientFactory(identifier); - } - @Override public void createDatabase(String name, boolean ignoreIfExists) throws DatabaseAlreadyExistException { current.createDatabase(name, ignoreIfExists); } - @Override - public boolean tableExists(Identifier identifier) { - return current.tableExists(identifier); - } - @Override public void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists) throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { @@ -226,7 +213,7 @@ public class TrinoCatalog implements Catalog { } @Override - public boolean caseSensitive() { - return current.caseSensitive(); + public boolean allowUpperCase() { + return current.allowUpperCase(); } } diff --git a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java index d23a21e..7d97c7c 100644 --- a/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java +++ b/paimon-trino-440/src/test/java/org/apache/paimon/trino/TestTrinoPartitioningHandle.java @@ -18,7 +18,6 @@ package org.apache.paimon.trino; -import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.InstantiationUtil; import io.airlift.json.JsonCodec; @@ -35,8 +34,7 @@ public class TestTrinoPartitioningHandle { @Test public void testTrinoPartitioningHandle() throws Exception { byte[] schemaData = InstantiationUtil.serializeObject("test_schema"); - TrinoPartitioningHandle expected = - new TrinoPartitioningHandle(schemaData, BucketMode.FIXED); + TrinoPartitioningHandle expected = new TrinoPartitioningHandle(schemaData); testRoundTrip(expected); } diff --git a/pom.xml b/pom.xml index 77cdcb2..d137d6f 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ under the License. </scm> <properties> - <paimon.version>0.8.0</paimon.version> + <paimon.version>1.0-SNAPSHOT</paimon.version> <target.java.version>11</target.java.version> <junit5.version>5.8.1</junit5.version> <slf4j.version>2.0.7</slf4j.version>