This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 107dfa0104 [core] Extract loadTable in CatalogUtils (#4904)
107dfa0104 is described below
commit 107dfa0104728362ed838f9dcc8263084dd9772e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 14 15:54:17 2025 +0800
[core] Extract loadTable in CatalogUtils (#4904)
---
.../java/org/apache/paimon/options/Options.java | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 122 +++----------------
.../java/org/apache/paimon/catalog/Catalog.java | 4 +
.../org/apache/paimon/catalog/CatalogUtils.java | 135 ++++++++++++++-------
.../org/apache/paimon/catalog/DelegateCatalog.java | 6 +
.../apache/paimon/catalog/FileSystemCatalog.java | 2 +-
.../org/apache/paimon/catalog/TableMetadata.java | 49 ++++++++
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 5 +-
.../java/org/apache/paimon/rest/RESTCatalog.java | 103 +++++-----------
.../apache/paimon/table/object/ObjectTable.java | 2 +
.../org/apache/paimon/catalog/CatalogTestBase.java | 4 +-
.../org/apache/paimon/rest/TestRESTCatalog.java | 12 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 82 +++++--------
.../org/apache/paimon/hive/HiveTableUtils.java | 34 +++---
.../org/apache/paimon/hive/HiveCatalogTest.java | 4 -
15 files changed, 261 insertions(+), 307 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
index d292fef3bf..46e70e22af 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/Options.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/Options.java
@@ -149,8 +149,8 @@ public class Options implements Serializable {
return new Options(convertToPropertiesPrefixKey(data, prefix));
}
- public synchronized void remove(String key) {
- data.remove(key);
+ public synchronized String remove(String key) {
+ return data.remove(key);
}
public synchronized void remove(ConfigOption<?> option) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index dedfd5f112..f766df0c5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,7 +19,6 @@
package org.apache.paimon.catalog;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -32,18 +31,13 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
-
-import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -55,13 +49,12 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
+import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static
org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
-import static org.apache.paimon.catalog.CatalogUtils.getTableType;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
@@ -69,7 +62,6 @@ import static
org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
@@ -100,6 +92,11 @@ public abstract class AbstractCatalog implements Catalog {
return fileIO;
}
+ @Override
+ public FileIO fileIO(Path path) {
+ return fileIO;
+ }
+
public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
@@ -370,67 +367,9 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
- if (isSystemDatabase(identifier.getDatabaseName())) {
- return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
- } else if (identifier.isSystemTable()) {
- Table originTable =
- getDataOrFormatTable(
- new Identifier(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- identifier.getBranchName(),
- null));
- return CatalogUtils.createSystemTable(identifier, originTable);
- } else {
- return getDataOrFormatTable(identifier);
- }
- }
-
- // hive override this method.
- protected Table getDataOrFormatTable(Identifier identifier) throws
TableNotExistException {
- Preconditions.checkArgument(identifier.getSystemTableName() == null);
- TableMeta tableMeta = getDataTableMeta(identifier);
- TableType tableType = getTableType(tableMeta.schema().options());
- if (tableType == TableType.FORMAT_TABLE) {
- TableSchema schema = tableMeta.schema();
- return buildFormatTableByTableSchema(
- identifier,
- schema.options(),
- schema.logicalRowType(),
- schema.partitionKeys(),
- schema.comment());
- }
- FileStoreTable table =
- FileStoreTableFactory.create(
- fileIO,
- getTableLocation(identifier),
- tableMeta.schema,
- new CatalogEnvironment(
- identifier,
- tableMeta.uuid,
- Lock.factory(
- lockFactory().orElse(null),
- lockContext().orElse(null),
- identifier),
- catalogLoader()));
- if (tableType == TableType.OBJECT_TABLE) {
- String objectLocation = table.coreOptions().objectLocation();
- checkNotNull(objectLocation, "Object location should not be null
for object table.");
- table =
- ObjectTable.builder()
- .underlyingTable(table)
- .objectLocation(objectLocation)
- .objectFileIO(objectFileIO(objectLocation))
- .build();
- }
- return table;
- }
-
- /**
- * Catalog implementation may override this method to provide {@link
FileIO} to object table.
- */
- protected FileIO objectFileIO(String objectLocation) {
- return fileIO;
+ Lock.Factory lockFactory =
+ Lock.factory(lockFactory().orElse(null),
lockContext().orElse(null), identifier);
+ return CatalogUtils.loadTable(this, identifier,
this::loadTableMetadata, lockFactory);
}
/**
@@ -455,11 +394,11 @@ public abstract class AbstractCatalog implements Catalog {
return newDatabasePath(warehouse(), database);
}
- protected TableMeta getDataTableMeta(Identifier identifier) throws
TableNotExistException {
- return new TableMeta(getDataTableSchema(identifier), null);
+ protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
+ return new TableMetadata(loadTableSchema(identifier), null);
}
- protected abstract TableSchema getDataTableSchema(Identifier identifier)
+ protected abstract TableSchema loadTableSchema(Identifier identifier)
throws TableNotExistException;
public Path getTableLocation(Identifier identifier) {
@@ -537,38 +476,17 @@ public abstract class AbstractCatalog implements Catalog {
}
public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath,
String branchName) {
- return new SchemaManager(fileIO, tablePath, branchName)
- .latest()
- .map(
- s -> {
- if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ Optional<TableSchema> schema = new SchemaManager(fileIO, tablePath,
branchName).latest();
+ if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
+ schema =
+ schema.map(
+ s -> {
Options branchOptions = new
Options(s.options());
branchOptions.set(CoreOptions.BRANCH,
branchName);
return s.copy(branchOptions.toMap());
- } else {
- return s;
- }
- });
- }
-
- /** Table metadata. */
- protected static class TableMeta {
-
- private final TableSchema schema;
- @Nullable private final String uuid;
-
- public TableMeta(TableSchema schema, @Nullable String uuid) {
- this.schema = schema;
- this.uuid = uuid;
- }
-
- public TableSchema schema() {
- return schema;
- }
-
- @Nullable
- public String uuid() {
- return uuid;
+ });
}
+ schema.ifPresent(s -> s.options().put(PATH.key(),
tablePath.toString()));
+ return schema;
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d0ad86c224..a0d78b2688 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -20,6 +20,7 @@ package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -374,6 +375,9 @@ public interface Catalog extends AutoCloseable {
/** {@link FileIO} of this catalog. It can access {@link #warehouse()}
path. */
FileIO fileIO();
+ /** {@link FileIO} of this catalog. */
+ FileIO fileIO(Path path);
+
/** Catalog options for re-creating this catalog. */
Map<String, String> options();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 3c245040cc..3ceef461e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -20,18 +20,23 @@ package org.apache.paimon.catalog;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
@@ -42,6 +47,7 @@ import java.util.Map;
import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME;
import static org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
+import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -129,7 +135,76 @@ public class CatalogUtils {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}
- public static Table createGlobalSystemTable(String tableName, Catalog
catalog)
+ public static List<Partition> listPartitionsFromFileSystem(Table table) {
+ Options options = Options.fromMap(table.options());
+ InternalRowPartitionComputer computer =
+ new InternalRowPartitionComputer(
+ options.get(PARTITION_DEFAULT_NAME),
+ table.rowType().project(table.partitionKeys()),
+ table.partitionKeys().toArray(new String[0]),
+ options.get(PARTITION_GENERATE_LEGCY_NAME));
+ List<PartitionEntry> partitionEntries =
+ table.newReadBuilder().newScan().listPartitionEntries();
+ List<Partition> partitions = new ArrayList<>(partitionEntries.size());
+ for (PartitionEntry entry : partitionEntries) {
+ partitions.add(
+ new Partition(
+ computer.generatePartValues(entry.partition()),
+ entry.recordCount(),
+ entry.fileSizeInBytes(),
+ entry.fileCount(),
+ entry.lastFileCreationTime()));
+ }
+ return partitions;
+ }
+
+ /**
+ * Load table from {@link Catalog}, this table can be:
+ *
+ * <ul>
+ * <li>1. Global System table: contains the statistical information of
all the tables exists.
+ * <li>2. Format table: refers to a directory that contains multiple
files of the same format.
+ * <li>3. Data table: Normal {@link FileStoreTable}, primary key table
or append table.
+ * <li>4. Object table: provides metadata indexes for unstructured data
in the location.
+ * <li>5. System table: wraps Data table or Object table, such as the
snapshots created.
+ * </ul>
+ */
+ public static Table loadTable(
+ Catalog catalog,
+ Identifier identifier,
+ TableMetadata.Loader metadataLoader,
+ Lock.Factory lockFactory)
+ throws Catalog.TableNotExistException {
+ if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
+ return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
+ }
+
+ TableMetadata metadata = metadataLoader.load(identifier);
+ TableSchema schema = metadata.schema();
+ CoreOptions options = CoreOptions.fromMap(schema.options());
+ if (options.type() == TableType.FORMAT_TABLE) {
+ return toFormatTable(identifier, schema);
+ }
+
+ CatalogEnvironment catalogEnv =
+ new CatalogEnvironment(
+ identifier, metadata.uuid(), lockFactory,
catalog.catalogLoader());
+ Path path = new Path(schema.options().get(PATH.key()));
+ FileStoreTable table =
+ FileStoreTableFactory.create(catalog.fileIO(), path, schema,
catalogEnv);
+
+ if (options.type() == TableType.OBJECT_TABLE) {
+ table = toObjectTable(catalog, table);
+ }
+
+ if (identifier.isSystemTable()) {
+ return CatalogUtils.createSystemTable(identifier, table);
+ }
+
+ return table;
+ }
+
+ private static Table createGlobalSystemTable(String tableName, Catalog
catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
@@ -154,7 +229,7 @@ public class CatalogUtils {
}
}
- public static Table createSystemTable(Identifier identifier, Table
originTable)
+ private static Table createSystemTable(Identifier identifier, Table
originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
@@ -172,41 +247,8 @@ public class CatalogUtils {
return table;
}
- public static List<Partition> listPartitionsFromFileSystem(Table table) {
- Options options = Options.fromMap(table.options());
- InternalRowPartitionComputer computer =
- new InternalRowPartitionComputer(
- options.get(PARTITION_DEFAULT_NAME),
- table.rowType().project(table.partitionKeys()),
- table.partitionKeys().toArray(new String[0]),
- options.get(PARTITION_GENERATE_LEGCY_NAME));
- List<PartitionEntry> partitionEntries =
- table.newReadBuilder().newScan().listPartitionEntries();
- List<Partition> partitions = new ArrayList<>(partitionEntries.size());
- for (PartitionEntry entry : partitionEntries) {
- partitions.add(
- new Partition(
- computer.generatePartValues(entry.partition()),
- entry.recordCount(),
- entry.fileSizeInBytes(),
- entry.fileCount(),
- entry.lastFileCreationTime()));
- }
- return partitions;
- }
-
- public static TableType getTableType(Map<String, String> options) {
- return options.containsKey(CoreOptions.TYPE.key())
- ? TableType.fromString(options.get(CoreOptions.TYPE.key()))
- : CoreOptions.TYPE.defaultValue();
- }
-
- public static FormatTable buildFormatTableByTableSchema(
- Identifier identifier,
- Map<String, String> options,
- RowType rowType,
- List<String> partitionKeys,
- String comment) {
+ private static FormatTable toFormatTable(Identifier identifier,
TableSchema schema) {
+ Map<String, String> options = schema.options();
FormatTable.Format format =
FormatTable.parseFormat(
options.getOrDefault(
@@ -215,12 +257,23 @@ public class CatalogUtils {
String location = options.get(CoreOptions.PATH.key());
return FormatTable.builder()
.identifier(identifier)
- .rowType(rowType)
- .partitionKeys(partitionKeys)
+ .rowType(schema.logicalRowType())
+ .partitionKeys(schema.partitionKeys())
.location(location)
.format(format)
.options(options)
- .comment(comment)
+ .comment(schema.comment())
+ .build();
+ }
+
+ private static ObjectTable toObjectTable(Catalog catalog, FileStoreTable
underlyingTable) {
+ CoreOptions options = underlyingTable.coreOptions();
+ String objectLocation = options.objectLocation();
+ FileIO objectFileIO = catalog.fileIO(new Path(objectLocation));
+ return ObjectTable.builder()
+ .underlyingTable(underlyingTable)
+ .objectLocation(objectLocation)
+ .objectFileIO(objectFileIO)
.build();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index aa7852456e..847485a7a1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -61,6 +62,11 @@ public abstract class DelegateCatalog implements Catalog {
return wrapped.fileIO();
}
+ @Override
+ public FileIO fileIO(Path path) {
+ return wrapped.fileIO(path);
+ }
+
@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 254826b91d..f8e9a2aabc 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -103,7 +103,7 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ public TableSchema loadTableSchema(Identifier identifier) throws
TableNotExistException {
return tableSchemaInFileSystem(
getTableLocation(identifier),
identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java
new file mode 100644
index 0000000000..81904476a2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableMetadata.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.schema.TableSchema;
+
+import javax.annotation.Nullable;
+
+/** Metadata for table. */
+public class TableMetadata {
+
+ private final TableSchema schema;
+ @Nullable private final String uuid;
+
+ public TableMetadata(TableSchema schema, @Nullable String uuid) {
+ this.schema = schema;
+ this.uuid = uuid;
+ }
+
+ public TableSchema schema() {
+ return schema;
+ }
+
+ @Nullable
+ public String uuid() {
+ return uuid;
+ }
+
+ /** Loader to load {@link TableMetadata}. */
+ public interface Loader {
+ TableMetadata load(Identifier identifier) throws
Catalog.TableNotExistException;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 327c6b9676..8db9e723c2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -357,15 +357,14 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ protected TableSchema loadTableSchema(Identifier identifier) throws
TableNotExistException {
assertMainBranch(identifier);
if (!JdbcUtils.tableExists(
connections, catalogKey, identifier.getDatabaseName(),
identifier.getTableName())) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getTableLocation(identifier);
- return new SchemaManager(fileIO, tableLocation)
- .latest()
+ return tableSchemaInFileSystem(tableLocation,
identifier.getBranchNameOrDefault())
.orElseThrow(
() -> new RuntimeException("There is no paimon table
in " + tableLocation));
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 4b4e90fb7c..97f9ffe567 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -18,7 +18,6 @@
package org.apache.paimon.rest;
-import org.apache.paimon.TableType;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogLoader;
@@ -26,6 +25,7 @@ import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreCommit;
@@ -60,14 +60,10 @@ import org.apache.paimon.rest.responses.ListTablesResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -83,20 +79,16 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
-import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static
org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
-import static org.apache.paimon.catalog.CatalogUtils.getTableType;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.rest.RESTUtil.extractPrefixMap;
import static org.apache.paimon.rest.auth.AuthSession.createAuthSession;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
/** A catalog implementation for REST. */
@@ -171,6 +163,11 @@ public class RESTCatalog implements Catalog {
return fileIO;
}
+ @Override
+ public FileIO fileIO(Path path) {
+ return fileIO;
+ }
+
@Override
public List<String> listDatabases() {
ListDatabasesResponse response =
@@ -281,13 +278,28 @@ public class RESTCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
- if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
- return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
- } else if (identifier.isSystemTable()) {
- return getSystemTable(identifier);
- } else {
- return getDataOrFormatTable(identifier);
+ // TODO add lock from server
+ return CatalogUtils.loadTable(
+ this, identifier, this::loadTableMetadata,
Lock.emptyFactory());
+ }
+
+ private TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
+ GetTableResponse response;
+ try {
+ response =
+ client.get(
+ resourcePaths.table(
+ identifier.getDatabaseName(),
identifier.getTableName()),
+ GetTableResponse.class,
+ headers());
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
}
+
+ TableSchema schema = TableSchema.create(response.getSchemaId(),
response.getSchema());
+ return new TableMetadata(schema, response.getId());
}
@Override
@@ -520,56 +532,6 @@ public class RESTCatalog implements Catalog {
}
}
- private Table getDataOrFormatTable(Identifier identifier) throws
TableNotExistException {
- Preconditions.checkArgument(identifier.getSystemTableName() == null);
-
- GetTableResponse response;
- try {
- response =
- client.get(
- resourcePaths.table(
- identifier.getDatabaseName(),
identifier.getTableName()),
- GetTableResponse.class,
- headers());
- } catch (NoSuchResourceException e) {
- throw new TableNotExistException(identifier);
- } catch (ForbiddenException e) {
- throw new TableNoPermissionException(identifier, e);
- }
- TableType tableType = getTableType(response.getSchema().options());
- if (tableType == TableType.FORMAT_TABLE) {
- Schema schema = response.getSchema();
- return buildFormatTableByTableSchema(
- identifier,
- schema.options(),
- schema.rowType(),
- schema.partitionKeys(),
- schema.comment());
- }
- TableSchema schema = TableSchema.create(response.getSchemaId(),
response.getSchema());
- FileStoreTable table =
- FileStoreTableFactory.create(
- fileIO(),
- new Path(schema.options().get(PATH.key())),
- schema,
- new CatalogEnvironment(
- identifier,
- response.getId(),
- Lock.emptyFactory(),
- catalogLoader()));
- if (tableType == TableType.OBJECT_TABLE) {
- String objectLocation = table.coreOptions().objectLocation();
- checkNotNull(objectLocation, "Object location should not be null
for object table.");
- table =
- ObjectTable.builder()
- .underlyingTable(table)
- .objectLocation(objectLocation)
- .objectFileIO(this.fileIO())
- .build();
- }
- return table;
- }
-
private boolean isMetaStorePartitionedTable(Table table) {
Options options = Options.fromMap(table.options());
return Boolean.TRUE.equals(options.get(METASTORE_PARTITIONED_TABLE));
@@ -579,17 +541,6 @@ public class RESTCatalog implements Catalog {
return catalogAuth.getHeaders();
}
- private Table getSystemTable(Identifier identifier) throws
TableNotExistException {
- Table originTable =
- getDataOrFormatTable(
- new Identifier(
- identifier.getDatabaseName(),
- identifier.getTableName(),
- identifier.getBranchName(),
- null));
- return CatalogUtils.createSystemTable(identifier, originTable);
- }
-
private ScheduledExecutorService tokenRefreshExecutor() {
if (refreshExecutor == null) {
synchronized (this) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 97acfe7299..992425ed52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Map;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* A object table refers to a directory that contains multiple objects
(files), Object table
@@ -122,6 +123,7 @@ public interface ObjectTable extends FileStoreTable {
}
public ObjectTable build() {
+ checkNotNull(objectLocation, "Object location should not be null
for object table.");
return new ObjectTableImpl(underlyingTable, objectFileIO,
objectLocation);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index c8b9192c1c..1c453a1b3b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -409,7 +409,7 @@ public abstract class CatalogTestBase {
catalog.getTable(
Identifier.create(
"test_db",
"non_existing_table$snapshots")))
- .withMessage("Table test_db.non_existing_table does not
exist.");
+ .withMessage("Table test_db.non_existing_table$snapshots does
not exist.");
// Get system table throws TableNotExistException when system table
type does not exist
assertThatExceptionOfType(Catalog.TableNotExistException.class)
@@ -417,7 +417,7 @@ public abstract class CatalogTestBase {
() ->
catalog.getTable(
Identifier.create("test_db",
"non_existing_table$schema1")))
- .withMessage("Table test_db.non_existing_table does not
exist.");
+ .withMessage("Table test_db.non_existing_table$schema1 does
not exist.");
// Get data table throws TableNotExistException when table does not
exist
assertThatExceptionOfType(Catalog.TableNotExistException.class)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
index a0f820e7ad..4f15049311 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestRESTCatalog.java
@@ -18,12 +18,13 @@
package org.apache.paimon.rest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
@@ -166,7 +167,8 @@ public class TestRESTCatalog extends FileSystemCatalog {
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
if (tableFullName2Schema.containsKey(identifier.getFullName())) {
TableSchema schema =
tableFullName2Schema.get(identifier.getFullName());
- if (CatalogUtils.getTableType(schema.options()) ==
TableType.FORMAT_TABLE) {
+ Options options = Options.fromMap(schema.options());
+ if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
throw new UnsupportedOperationException("Only data table
support alter table.");
}
} else {
@@ -189,12 +191,12 @@ public class TestRESTCatalog extends FileSystemCatalog {
}
@Override
- protected TableMeta getDataTableMeta(Identifier identifier) throws
TableNotExistException {
+ protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
if (tableFullName2Schema.containsKey(identifier.getFullName())) {
TableSchema tableSchema =
tableFullName2Schema.get(identifier.getFullName());
- return new TableMeta(tableSchema, "uuid");
+ return new TableMetadata(tableSchema, "uuid");
}
- return super.getDataTableMeta(identifier);
+ return super.loadTableMetadata(identifier);
}
private Partition spec2Partition(Map<String, String> spec) {
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 4e2d7b6acb..438c6971aa 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.client.ClientPool;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -40,10 +41,8 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.CatalogTableType;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -51,7 +50,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PartitionPathUtils;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
@@ -115,7 +113,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
-import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
+import static org.apache.paimon.hive.HiveTableUtils.tryToFormatSchema;
import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
@@ -354,7 +352,7 @@ public class HiveCatalog extends AbstractCatalog {
Identifier.create(identifier.getDatabaseName(),
identifier.getTableName());
Table hmsTable = getHmsTable(tableIdentifier);
Path location = getTableLocation(tableIdentifier, hmsTable);
- TableSchema schema = getDataTableSchema(tableIdentifier, hmsTable);
+ TableSchema schema = loadTableSchema(tableIdentifier, hmsTable);
if (!metastorePartitioned(schema)) {
return;
@@ -395,7 +393,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
throws TableNotExistException {
- TableSchema schema = getDataTableSchema(identifier);
+ TableSchema schema = this.loadTableSchema(identifier);
CoreOptions options = CoreOptions.fromMap(schema.options());
boolean tagToPart = options.tagToPartitionField() != null;
if (metastorePartitioned(schema)) {
@@ -429,7 +427,7 @@ public class HiveCatalog extends AbstractCatalog {
public void alterPartitions(
Identifier identifier, List<org.apache.paimon.partition.Partition>
partitions)
throws TableNotExistException {
- TableSchema tableSchema = getDataTableSchema(identifier);
+ TableSchema tableSchema = this.loadTableSchema(identifier);
if (!tableSchema.partitionKeys().isEmpty()
&& new
CoreOptions(tableSchema.options()).partitionedTableInMetastore()) {
for (org.apache.paimon.partition.Partition partition : partitions)
{
@@ -674,32 +672,41 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- protected TableMeta getDataTableMeta(Identifier identifier) throws
TableNotExistException {
- return getDataTableMeta(identifier, getHmsTable(identifier));
+ protected TableMetadata loadTableMetadata(Identifier identifier) throws
TableNotExistException {
+ return loadTableMetadata(identifier, getHmsTable(identifier));
}
- private TableMeta getDataTableMeta(Identifier identifier, Table table)
+ private TableMetadata loadTableMetadata(Identifier identifier, Table table)
throws TableNotExistException {
- return new TableMeta(
- getDataTableSchema(identifier, table),
+ return new TableMetadata(
+ loadTableSchema(identifier, table),
identifier.getFullName() + "." + table.getCreateTime());
}
@Override
- public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
+ public TableSchema loadTableSchema(Identifier identifier) throws
TableNotExistException {
Table table = getHmsTable(identifier);
- return getDataTableSchema(identifier, table);
+ return loadTableSchema(identifier, table);
}
- private TableSchema getDataTableSchema(Identifier identifier, Table table)
+ private TableSchema loadTableSchema(Identifier identifier, Table table)
throws TableNotExistException {
- if (!isPaimonTable(table)) {
- throw new TableNotExistException(identifier);
+ if (isPaimonTable(table)) {
+ return tableSchemaInFileSystem(
+ getTableLocation(identifier, table),
+ identifier.getBranchNameOrDefault())
+ .orElseThrow(() -> new TableNotExistException(identifier));
+ }
+
+ if (!formatTableDisabled()) {
+ try {
+ Schema schema = tryToFormatSchema(table);
+ return TableSchema.create(0, schema);
+ } catch (UnsupportedOperationException ignored) {
+ }
}
- return tableSchemaInFileSystem(
- getTableLocation(identifier, table),
identifier.getBranchNameOrDefault())
- .orElseThrow(() -> new TableNotExistException(identifier));
+ throw new TableNotExistException(identifier);
}
@Override
@@ -835,39 +842,6 @@ public class HiveCatalog extends AbstractCatalog {
renameHiveTable(fromView, toView);
}
- @Override
- public org.apache.paimon.table.Table getDataOrFormatTable(Identifier
identifier)
- throws TableNotExistException {
- Preconditions.checkArgument(identifier.getSystemTableName() == null);
- Table table = getHmsTable(identifier);
- try {
- TableMeta tableMeta = getDataTableMeta(identifier, table);
- return FileStoreTableFactory.create(
- fileIO,
- getTableLocation(identifier, table),
- tableMeta.schema(),
- new CatalogEnvironment(
- identifier,
- tableMeta.uuid(),
- Lock.factory(
- lockFactory().orElse(null),
- lockContext().orElse(null),
- identifier),
- catalogLoader()));
- } catch (TableNotExistException ignore) {
- }
-
- if (formatTableDisabled()) {
- throw new TableNotExistException(identifier);
- }
-
- try {
- return convertToFormatTable(table);
- } catch (UnsupportedOperationException e) {
- throw new TableNotExistException(identifier);
- }
- }
-
@Override
public void createFormatTable(Identifier identifier, Schema schema) {
if (formatTableDisabled()) {
@@ -1278,7 +1252,7 @@ public class HiveCatalog extends AbstractCatalog {
private boolean isFormatTable(Table table) {
try {
- convertToFormatTable(table);
+ tryToFormatSchema(table);
return true;
} catch (UnsupportedOperationException e) {
return false;
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 5e5af75e52..0007247bff 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -18,8 +18,8 @@
package org.apache.paimon.hive;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FormatTable.Format;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
@@ -31,24 +31,25 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
+import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
import static org.apache.paimon.hive.HiveCatalog.isView;
import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
class HiveTableUtils {
- public static FormatTable convertToFormatTable(Table hiveTable) {
+ public static Schema tryToFormatSchema(Table hiveTable) {
if (isView(hiveTable)) {
throw new UnsupportedOperationException("Hive view is not
supported.");
}
- Identifier identifier = new Identifier(hiveTable.getDbName(),
hiveTable.getTableName());
- Map<String, String> options = new HashMap<>(hiveTable.getParameters());
+ Options options = Options.fromMap(hiveTable.getParameters());
List<String> partitionKeys =
getFieldNames(hiveTable.getPartitionKeys());
RowType rowType = createRowType(hiveTable);
String comment = options.remove(COMMENT_PROP);
@@ -65,20 +66,19 @@ class HiveTableUtils {
} else if (inputFormat.contains("Text")) {
format = Format.CSV;
// hive default field delimiter is '\u0001'
- options.put(
- FIELD_DELIMITER.key(),
- serdeInfo.getParameters().getOrDefault(FIELD_DELIM,
"\u0001"));
+ options.set(
+ FIELD_DELIMITER,
serdeInfo.getParameters().getOrDefault(FIELD_DELIM, "\u0001"));
} else {
throw new UnsupportedOperationException("Unsupported table: " +
hiveTable);
}
- return FormatTable.builder()
- .identifier(identifier)
- .rowType(rowType)
- .partitionKeys(partitionKeys)
- .location(location)
- .format(format)
- .options(options)
+ Schema.Builder builder = Schema.newBuilder();
+ rowType.getFields().forEach(f -> builder.column(f.name(), f.type(),
f.description()));
+ options.set(PATH, location);
+ options.set(TYPE, FORMAT_TABLE);
+ options.set(FILE_FORMAT, format.name().toLowerCase());
+ return builder.partitionKeys(partitionKeys)
+ .options(options.toMap())
.comment(comment)
.build();
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index ea669d254f..ff2bd04d1f 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -305,8 +305,6 @@ public class HiveCatalogTest extends CatalogTestBase {
Thread thread1 =
new Thread(
() -> {
- System.out.println(
- "First thread started at " +
System.currentTimeMillis());
try {
tables1.addAll(catalog.listTables(databaseName));
} catch (Catalog.DatabaseNotExistException e) {
@@ -316,8 +314,6 @@ public class HiveCatalogTest extends CatalogTestBase {
Thread thread2 =
new Thread(
() -> {
- System.out.println(
- "Second thread started at " +
System.currentTimeMillis());
try {
tables2.addAll(catalog.listTables(databaseName));
} catch (Catalog.DatabaseNotExistException e) {