This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6fb8deebe [core] Delete useless methods in Catalog (#4468)
6fb8deebe is described below
commit 6fb8deebe8abfff9aa6296ea86a5dce905681f22
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 6 19:31:53 2024 +0800
[core] Delete useless methods in Catalog (#4468)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 57 +++++++++------
.../org/apache/paimon/catalog/CachingCatalog.java | 17 +++--
.../java/org/apache/paimon/catalog/Catalog.java | 71 ++-----------------
.../java/org/apache/paimon/catalog/Database.java | 82 ++++++++++++++++++++++
.../org/apache/paimon/catalog/DelegateCatalog.java | 33 +--------
.../apache/paimon/catalog/FileSystemCatalog.java | 16 +----
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 16 ++---
.../apache/paimon/privilege/PrivilegedCatalog.java | 31 +++++---
.../org/apache/paimon/catalog/CatalogTestBase.java | 54 +++-----------
.../flink/procedure/MigrateFileProcedure.java | 5 +-
.../flink/action/cdc/SyncTableActionBase.java | 4 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 34 +++++++--
.../apache/paimon/flink/log/LogStoreRegister.java | 13 ++--
.../flink/procedure/MigrateFileProcedure.java | 5 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 75 +++++++++-----------
.../apache/paimon/hive/migrate/HiveMigrator.java | 22 +++---
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 18 ++---
.../java/org/apache/paimon/spark/SparkCatalog.java | 21 +++---
.../spark/procedure/MigrateFileProcedure.java | 4 +-
19 files changed, 288 insertions(+), 290 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b3f255f10..8e885b95c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -98,7 +98,6 @@ public abstract class AbstractCatalog implements Catalog {
return fileIO;
}
- @Override
public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
@@ -118,7 +117,6 @@ public abstract class AbstractCatalog implements Catalog {
return Optional.empty();
}
- @Override
public Optional<CatalogLockContext> lockContext() {
return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
}
@@ -136,26 +134,26 @@ public abstract class AbstractCatalog implements Catalog {
public void createDatabase(String name, boolean ignoreIfExists,
Map<String, String> properties)
throws DatabaseAlreadyExistException {
checkNotSystemDatabase(name);
- if (databaseExists(name)) {
+ try {
+ getDatabase(name);
if (ignoreIfExists) {
return;
}
throw new DatabaseAlreadyExistException(name);
+ } catch (DatabaseNotExistException ignored) {
}
createDatabaseImpl(name, properties);
}
@Override
- public Map<String, String> loadDatabaseProperties(String name)
- throws DatabaseNotExistException {
+ public Database getDatabase(String name) throws DatabaseNotExistException {
if (isSystemDatabase(name)) {
- return Collections.emptyMap();
+ return Database.of(name);
}
- return loadDatabasePropertiesImpl(name);
+ return getDatabaseImpl(name);
}
- protected abstract Map<String, String> loadDatabasePropertiesImpl(String
name)
- throws DatabaseNotExistException;
+ protected abstract Database getDatabaseImpl(String name) throws
DatabaseNotExistException;
@Override
public void createPartition(Identifier identifier, Map<String, String>
partitionSpec)
@@ -211,7 +209,9 @@ public abstract class AbstractCatalog implements Catalog {
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
checkNotSystemDatabase(name);
- if (!databaseExists(name)) {
+ try {
+ getDatabase(name);
+ } catch (DatabaseNotExistException e) {
if (ignoreIfNotExists) {
return;
}
@@ -232,9 +232,9 @@ public abstract class AbstractCatalog implements Catalog {
if (isSystemDatabase(databaseName)) {
return SystemTableLoader.loadGlobalTableNames();
}
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(databaseName);
- }
+
+ // check db exists
+ getDatabase(databaseName);
return
listTablesImpl(databaseName).stream().sorted().collect(Collectors.toList());
}
@@ -247,7 +247,9 @@ public abstract class AbstractCatalog implements Catalog {
checkNotBranch(identifier, "dropTable");
checkNotSystemTable(identifier, "dropTable");
- if (!tableExists(identifier)) {
+ try {
+ getTable(identifier);
+ } catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
@@ -268,15 +270,16 @@ public abstract class AbstractCatalog implements Catalog {
validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
validateAutoCreateClose(schema.options());
- if (!databaseExists(identifier.getDatabaseName())) {
- throw new DatabaseNotExistException(identifier.getDatabaseName());
- }
+ // check db exists
+ getDatabase(identifier.getDatabaseName());
- if (tableExists(identifier)) {
+ try {
+ getTable(identifier);
if (ignoreIfExists) {
return;
}
throw new TableAlreadyExistException(identifier);
+ } catch (TableNotExistException ignored) {
}
copyTableDefaultOptions(schema.options());
@@ -299,15 +302,19 @@ public abstract class AbstractCatalog implements Catalog {
checkNotSystemTable(toTable, "renameTable");
validateIdentifierNameCaseInsensitive(toTable);
- if (!tableExists(fromTable)) {
+ try {
+ getTable(fromTable);
+ } catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(fromTable);
}
- if (tableExists(toTable)) {
+ try {
+ getTable(toTable);
throw new TableAlreadyExistException(toTable);
+ } catch (TableNotExistException ignored) {
}
renameTableImpl(fromTable, toTable);
@@ -323,7 +330,9 @@ public abstract class AbstractCatalog implements Catalog {
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);
- if (!tableExists(identifier)) {
+ try {
+ getTable(identifier);
+ } catch (TableNotExistException e) {
if (ignoreIfNotExists) {
return;
}
@@ -452,6 +461,12 @@ public abstract class AbstractCatalog implements Catalog {
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
+ /** Get metastore client factory for the table specified by {@code
identifier}. */
+ protected Optional<MetastoreClient.Factory>
metastoreClientFactory(Identifier identifier)
+ throws TableNotExistException {
+ return Optional.empty();
+ }
+
@Override
public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()),
identifier.getTableName());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
index 1a6b23078..003f0edb4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -62,7 +62,7 @@ public class CachingCatalog extends DelegateCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
- protected final Cache<String, Map<String, String>> databaseCache;
+ protected final Cache<String, Database> databaseCache;
protected final Cache<Identifier, Table> tableCache;
@Nullable protected final SegmentsCache<Path> manifestCache;
@@ -159,16 +159,15 @@ public class CachingCatalog extends DelegateCatalog {
}
@Override
- public Map<String, String> loadDatabaseProperties(String databaseName)
- throws DatabaseNotExistException {
- Map<String, String> properties =
databaseCache.getIfPresent(databaseName);
- if (properties != null) {
- return properties;
+ public Database getDatabase(String databaseName) throws
DatabaseNotExistException {
+ Database database = databaseCache.getIfPresent(databaseName);
+ if (database != null) {
+ return database;
}
- properties = super.loadDatabaseProperties(databaseName);
- databaseCache.put(databaseName, properties);
- return properties;
+ database = super.getDatabase(databaseName);
+ databaseCache.put(databaseName, database);
+ return database;
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index c72c354e4..6a6a047bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -22,7 +22,6 @@ import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
@@ -33,7 +32,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -72,23 +70,6 @@ public interface Catalog extends AutoCloseable {
FileIO fileIO();
- /**
- * Get lock factory from catalog. Lock is used to support multiple
concurrent writes on the
- * object store.
- */
- Optional<CatalogLockFactory> lockFactory();
-
- /** Get lock context for lock factory to create a lock. */
- default Optional<CatalogLockContext> lockContext() {
- return Optional.empty();
- }
-
- /** Get metastore client factory for the table specified by {@code
identifier}. */
- default Optional<MetastoreClient.Factory>
metastoreClientFactory(Identifier identifier)
- throws TableNotExistException {
- return Optional.empty();
- }
-
/**
* Get the names of all databases in this catalog.
*
@@ -96,21 +77,6 @@ public interface Catalog extends AutoCloseable {
*/
List<String> listDatabases();
- /**
- * Check if a database exists in this catalog.
- *
- * @param databaseName Name of the database
- * @return true if the given database exists in the catalog false otherwise
- */
- default boolean databaseExists(String databaseName) {
- try {
- loadDatabaseProperties(databaseName);
- return true;
- } catch (DatabaseNotExistException e) {
- return false;
- }
- }
-
/**
* Create a database, see {@link Catalog#createDatabase(String name,
boolean ignoreIfExists, Map
* properties)}.
@@ -135,13 +101,13 @@ public interface Catalog extends AutoCloseable {
throws DatabaseAlreadyExistException;
/**
- * Load database properties.
+ * Return a {@link Database} identified by the given name.
*
* @param name Database name
- * @return The requested database's properties
+ * @return The requested {@link Database}
* @throws DatabaseNotExistException if the requested database does not
exist
*/
- Map<String, String> loadDatabaseProperties(String name) throws
DatabaseNotExistException;
+ Database getDatabase(String name) throws DatabaseNotExistException;
/**
* Drop a database.
@@ -186,20 +152,6 @@ public interface Catalog extends AutoCloseable {
*/
List<String> listTables(String databaseName) throws
DatabaseNotExistException;
- /**
- * Check if a table exists in this catalog.
- *
- * @param identifier Path of the table
- * @return true if the given table exists in the catalog false otherwise
- */
- default boolean tableExists(Identifier identifier) {
- try {
- return getTable(identifier) != null;
- } catch (TableNotExistException e) {
- return false;
- }
- }
-
/**
* Drop a table.
*
@@ -273,6 +225,9 @@ public interface Catalog extends AutoCloseable {
/**
* Create the partition of the specify table.
*
+ * <p>Only catalog with metastore can support this method, and only table
with
+ * 'metastore.partitioned-table' can support this method.
+ *
* @param identifier path of the table to drop partition
* @param partitionSpec the partition to be created
* @throws TableNotExistException if the table does not exist
@@ -315,20 +270,6 @@ public interface Catalog extends AutoCloseable {
alterTable(identifier, Collections.singletonList(change),
ignoreIfNotExists);
}
- /**
- * Check if a view exists in this catalog.
- *
- * @param identifier Path of the view
- * @return true if the given view exists in the catalog false otherwise
- */
- default boolean viewExists(Identifier identifier) {
- try {
- return getView(identifier) != null;
- } catch (ViewNotExistException e) {
- return false;
- }
- }
-
/**
* Return a {@link View} identified by the given {@link Identifier}.
*
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java
new file mode 100644
index 000000000..f855e57e9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Database.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.annotation.Public;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Interface of a database in a catalog.
+ *
+ * @since 1.0
+ */
+@Public
+public interface Database {
+
+ /** A name to identify this database. */
+ String name();
+
+ /** Options of this database. */
+ Map<String, String> options();
+
+ /** Optional comment of this database. */
+ Optional<String> comment();
+
+ static Database of(String name, Map<String, String> options, @Nullable
String comment) {
+ return new DatabaseImpl(name, options, comment);
+ }
+
+ static Database of(String name) {
+ return new DatabaseImpl(name, new HashMap<>(), null);
+ }
+
+ /** Implementation of {@link Database}. */
+ class DatabaseImpl implements Database {
+
+ private final String name;
+ private final Map<String, String> options;
+ @Nullable private final String comment;
+
+ public DatabaseImpl(String name, Map<String, String> options,
@Nullable String comment) {
+ this.name = name;
+ this.options = options;
+ this.comment = comment;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Map<String, String> options() {
+ return options;
+ }
+
+ @Override
+ public Optional<String> comment() {
+ return Optional.ofNullable(comment);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 01719e590..ec14d53a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -21,7 +21,6 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
@@ -29,7 +28,6 @@ import org.apache.paimon.view.View;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
public class DelegateCatalog implements Catalog {
@@ -64,22 +62,6 @@ public class DelegateCatalog implements Catalog {
return wrapped.fileIO();
}
- @Override
- public Optional<CatalogLockFactory> lockFactory() {
- return wrapped.lockFactory();
- }
-
- @Override
- public Optional<CatalogLockContext> lockContext() {
- return wrapped.lockContext();
- }
-
- @Override
- public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier)
- throws TableNotExistException {
- return wrapped.metastoreClientFactory(identifier);
- }
-
@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
@@ -92,9 +74,8 @@ public class DelegateCatalog implements Catalog {
}
@Override
- public Map<String, String> loadDatabaseProperties(String name)
- throws DatabaseNotExistException {
- return wrapped.loadDatabaseProperties(name);
+ public Database getDatabase(String name) throws DatabaseNotExistException {
+ return wrapped.getDatabase(name);
}
@Override
@@ -138,16 +119,6 @@ public class DelegateCatalog implements Catalog {
return wrapped.getTable(identifier);
}
- @Override
- public boolean tableExists(Identifier identifier) {
- return wrapped.tableExists(identifier);
- }
-
- @Override
- public boolean viewExists(Identifier identifier) {
- return wrapped.viewExists(identifier);
- }
-
@Override
public View getView(Identifier identifier) throws ViewNotExistException {
return wrapped.getView(identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 450f78873..9264a5464 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -30,7 +30,6 @@ import org.apache.paimon.schema.TableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -81,12 +80,11 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public Map<String, String> loadDatabasePropertiesImpl(String name)
- throws DatabaseNotExistException {
+ public Database getDatabaseImpl(String name) throws
DatabaseNotExistException {
if (!uncheck(() -> fileIO.exists(newDatabasePath(name)))) {
throw new DatabaseNotExistException(name);
}
- return Collections.emptyMap();
+ return Database.of(name);
}
@Override
@@ -99,16 +97,6 @@ public class FileSystemCatalog extends AbstractCatalog {
return uncheck(() ->
listTablesInFileSystem(newDatabasePath(databaseName)));
}
- @Override
- public boolean tableExists(Identifier identifier) {
- if (isTableInSystemDatabase(identifier)) {
- return super.tableExists(identifier);
- }
-
- return tableExistsInFileSystem(
- getTableLocation(identifier),
identifier.getBranchNameOrDefault());
- }
-
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
return tableSchemaInFileSystem(
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index e52bc9d6b..c80f8e3a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogLockContext;
import org.apache.paimon.catalog.CatalogLockFactory;
+import org.apache.paimon.catalog.Database;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -159,18 +160,17 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected Map<String, String> loadDatabasePropertiesImpl(String
databaseName)
- throws DatabaseNotExistException {
+ protected Database getDatabaseImpl(String databaseName) throws
DatabaseNotExistException {
if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
throw new DatabaseNotExistException(databaseName);
}
- Map<String, String> properties = Maps.newHashMap();
- properties.putAll(fetchProperties(databaseName));
- if (!properties.containsKey(DB_LOCATION_PROP)) {
- properties.put(DB_LOCATION_PROP,
newDatabasePath(databaseName).getName());
+ Map<String, String> options = Maps.newHashMap();
+ options.putAll(fetchProperties(databaseName));
+ if (!options.containsKey(DB_LOCATION_PROP)) {
+ options.put(DB_LOCATION_PROP,
newDatabasePath(databaseName).getName());
}
- properties.remove(DATABASE_EXISTS_PROPERTY);
- return ImmutableMap.copyOf(properties);
+ options.remove(DATABASE_EXISTS_PROPERTY);
+ return Database.of(databaseName, options, null);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index bda06a08d..c9b9c2193 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -102,12 +102,16 @@ public class PrivilegedCatalog extends DelegateCatalog {
throws TableNotExistException, TableAlreadyExistException {
privilegeManager.getPrivilegeChecker().assertCanAlterTable(fromTable);
wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
- Preconditions.checkState(
- wrapped.tableExists(toTable),
- "Table "
- + toTable
- + " does not exist. There might be concurrent
renaming. "
- + "Aborting updates in privilege system.");
+
+ try {
+ getTable(toTable);
+ } catch (TableNotExistException e) {
+ throw new IllegalStateException(
+ "Table "
+ + toTable
+ + " does not exist. There might be concurrent
renaming. "
+ + "Aborting updates in privilege system.");
+ }
privilegeManager.objectRenamed(fromTable.getFullName(),
toTable.getFullName());
}
@@ -157,8 +161,11 @@ public class PrivilegedCatalog extends DelegateCatalog {
Preconditions.checkArgument(
privilege.canGrantOnDatabase(),
"Privilege " + privilege + " can't be granted on a database");
- Preconditions.checkArgument(
- databaseExists(databaseName), "Database " + databaseName + "
does not exist");
+ try {
+ getDatabase(databaseName);
+ } catch (DatabaseNotExistException e) {
+ throw new IllegalArgumentException("Database " + databaseName + "
does not exist");
+ }
privilegeManager.grant(user, databaseName, privilege);
}
@@ -166,8 +173,12 @@ public class PrivilegedCatalog extends DelegateCatalog {
Preconditions.checkArgument(
privilege.canGrantOnTable(),
"Privilege " + privilege + " can't be granted on a table");
- Preconditions.checkArgument(
- tableExists(identifier), "Table " + identifier + " does not
exist");
+
+ try {
+ getTable(identifier);
+ } catch (TableNotExistException e) {
+ throw new IllegalArgumentException("Table " + identifier + " does
not exist");
+ }
privilegeManager.grant(user, identifier.getFullName(), privilege);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 1a087f6b4..27992b56f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -110,24 +110,11 @@ public abstract class CatalogTestBase {
assertThat(databases).contains("db1", "db2", "db3");
}
- @Test
- public void testDatabaseExistsWhenExists() throws Exception {
- // Database exists returns true when the database exists
- catalog.createDatabase("test_db", false);
- boolean exists = catalog.databaseExists("test_db");
- assertThat(exists).isTrue();
-
- // Database exists returns false when the database does not exist
- exists = catalog.databaseExists("non_existing_db");
- assertThat(exists).isFalse();
- }
-
@Test
public void testCreateDatabase() throws Exception {
// Create database creates a new database when it does not exist
catalog.createDatabase("new_db", false);
- boolean exists = catalog.databaseExists("new_db");
- assertThat(exists).isTrue();
+ catalog.getDatabase("new_db");
catalog.createDatabase("existing_db", false);
@@ -148,8 +135,8 @@ public abstract class CatalogTestBase {
// Drop database deletes the database when it exists and there are no
tables
catalog.createDatabase("db_to_drop", false);
catalog.dropDatabase("db_to_drop", false, false);
- boolean exists = catalog.databaseExists("db_to_drop");
- assertThat(exists).isFalse();
+ assertThatThrownBy(() -> catalog.getDatabase("db_to_drop"))
+ .isInstanceOf(Catalog.DatabaseNotExistException.class);
// Drop database does not throw exception when database does not exist
and ignoreIfNotExists
// is true
@@ -162,8 +149,8 @@ public abstract class CatalogTestBase {
catalog.createTable(Identifier.create("db_to_drop", "table2"),
DEFAULT_TABLE_SCHEMA, false);
catalog.dropDatabase("db_to_drop", false, true);
- exists = catalog.databaseExists("db_to_drop");
- assertThat(exists).isFalse();
+ assertThatThrownBy(() -> catalog.getDatabase("db_to_drop"))
+ .isInstanceOf(Catalog.DatabaseNotExistException.class);
// Drop database throws DatabaseNotEmptyException when cascade is
false and there are tables
// in the database
@@ -192,21 +179,6 @@ public abstract class CatalogTestBase {
assertThat(tables).containsExactlyInAnyOrder("table1", "table2",
"table3");
}
- @Test
- public void testTableExists() throws Exception {
- // Table exists returns true when the table exists in the database
- catalog.createDatabase("test_db", false);
- Identifier identifier = Identifier.create("test_db", "test_table");
- catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
-
- boolean exists = catalog.tableExists(identifier);
- assertThat(exists).isTrue();
-
- // Table exists returns false when the table does not exist in the
database
- exists = catalog.tableExists(Identifier.create("non_existing_db",
"non_existing_table"));
- assertThat(exists).isFalse();
- }
-
@Test
public void testCreateTable() throws Exception {
catalog.createDatabase("test_db", false);
@@ -238,8 +210,7 @@ public abstract class CatalogTestBase {
schema.options().remove(CoreOptions.AUTO_CREATE.key());
catalog.createTable(identifier, schema, false);
- boolean exists = catalog.tableExists(identifier);
- assertThat(exists).isTrue();
+ catalog.getTable(identifier);
// Create table throws Exception when table is system table
assertThatExceptionOfType(IllegalArgumentException.class)
@@ -381,8 +352,8 @@ public abstract class CatalogTestBase {
Identifier identifier = Identifier.create("test_db", "table_to_drop");
catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
catalog.dropTable(identifier, false);
- boolean exists = catalog.tableExists(identifier);
- assertThat(exists).isFalse();
+ assertThatThrownBy(() -> catalog.getTable(identifier))
+ .isInstanceOf(Catalog.TableNotExistException.class);
// Drop table throws Exception when table is system table
assertThatExceptionOfType(IllegalArgumentException.class)
@@ -414,8 +385,9 @@ public abstract class CatalogTestBase {
catalog.createTable(fromTable, DEFAULT_TABLE_SCHEMA, false);
Identifier toTable = Identifier.create("test_db", "new_table");
catalog.renameTable(fromTable, toTable, false);
- assertThat(catalog.tableExists(fromTable)).isFalse();
- assertThat(catalog.tableExists(toTable)).isTrue();
+ assertThatThrownBy(() -> catalog.getTable(fromTable))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+ catalog.getTable(toTable);
// Rename table throws Exception when original or target table is
system table
assertThatExceptionOfType(IllegalArgumentException.class)
@@ -885,8 +857,6 @@ public abstract class CatalogTestBase {
catalog.createView(identifier, view, false);
- assertThat(catalog.viewExists(identifier)).isTrue();
-
View catalogView = catalog.getView(identifier);
assertThat(catalogView.fullName()).isEqualTo(view.fullName());
assertThat(catalogView.rowType()).isEqualTo(view.rowType());
@@ -911,8 +881,6 @@ public abstract class CatalogTestBase {
catalog.renameView(identifier, newIdentifier, false);
catalog.dropView(newIdentifier, false);
- assertThat(catalog.viewExists(newIdentifier)).isFalse();
-
catalog.dropView(newIdentifier, true);
assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
.isInstanceOf(Catalog.ViewNotExistException.class);
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 5d68cc0f5..1e581c38c 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
@@ -88,7 +89,9 @@ public class MigrateFileProcedure extends ProcedureBase {
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
- if (!(catalog.tableExists(targetTableId))) {
+ try {
+ catalog.getTable(targetTableId);
+ } catch (Catalog.TableNotExistException e) {
throw new IllegalArgumentException(
"Target paimon table does not exist: " +
targetPaimonTablePath);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index 4c7db6d28..87efeb2a1 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -122,7 +122,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
protected void beforeBuildingSourceSink() throws Exception {
Identifier identifier = new Identifier(database, table);
// Check if table exists before trying to get or create it
- if (catalog.tableExists(identifier)) {
+ try {
fileStoreTable = (FileStoreTable) catalog.getTable(identifier);
fileStoreTable = alterTableOptions(identifier, fileStoreTable);
try {
@@ -146,7 +146,7 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
// check partition keys and primary keys in case that user
specified them
checkConstraints();
}
- } else {
+ } catch (Catalog.TableNotExistException e) {
Schema retrievedSchema = retrieveSchema();
computedColumns = buildComputedColumns(computedColumnArgs,
retrievedSchema.fields());
Schema paimonSchema = buildPaimonSchema(retrievedSchema);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 194d73213..ec485d2eb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -187,7 +187,9 @@ public class FlinkCatalog extends AbstractCatalog {
this.logStoreAutoRegisterTimeout = options.get(REGISTER_TIMEOUT);
this.disableCreateTableInDefaultDatabase =
options.get(DISABLE_CREATE_TABLE_IN_DEFAULT_DB);
if (!disableCreateTableInDefaultDatabase) {
- if (!catalog.databaseExists(defaultDatabase)) {
+ try {
+ getDatabase(defaultDatabase);
+ } catch (DatabaseNotExistException e) {
try {
catalog.createDatabase(defaultDatabase, true);
} catch (Catalog.DatabaseAlreadyExistException ignore) {
@@ -212,7 +214,12 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public boolean databaseExists(String databaseName) throws CatalogException
{
- return catalog.databaseExists(databaseName);
+ try {
+ catalog.getDatabase(databaseName);
+ return true;
+ } catch (Catalog.DatabaseNotExistException e) {
+ return false;
+ }
}
@Override
@@ -346,26 +353,41 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
Identifier identifier = toIdentifier(tablePath);
- return catalog.tableExists(identifier) ||
catalog.viewExists(identifier);
+ try {
+ catalog.getTable(identifier);
+ return true;
+ } catch (Catalog.TableNotExistException e) {
+ try {
+ catalog.getView(identifier);
+ return true;
+ } catch (Catalog.ViewNotExistException ex) {
+ return false;
+ }
+ }
}
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
Identifier identifier = toIdentifier(tablePath);
- if (catalog.viewExists(identifier)) {
+ try {
+ catalog.getView(identifier);
try {
catalog.dropView(identifier, ignoreIfNotExists);
return;
} catch (Catalog.ViewNotExistException e) {
throw new RuntimeException("Unexpected exception.", e);
}
+ } catch (Catalog.ViewNotExistException ignored) {
}
try {
Table table = null;
- if (logStoreAutoRegister && catalog.tableExists(identifier)) {
- table = catalog.getTable(identifier);
+ if (logStoreAutoRegister) {
+ try {
+ table = catalog.getTable(identifier);
+ } catch (Catalog.TableNotExistException ignored) {
+ }
}
catalog.dropTable(toIdentifier(tablePath), ignoreIfNotExists);
if (logStoreAutoRegister && table != null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
index b730d289b..ad501e204 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java
@@ -46,11 +46,14 @@ public interface LogStoreRegister {
ClassLoader classLoader) {
Options tableOptions = Options.fromMap(options);
String logStore = tableOptions.get(LOG_SYSTEM);
- if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)
- && !catalog.tableExists(identifier)) {
- LogStoreRegister logStoreRegister =
- getLogStoreRegister(identifier, classLoader, tableOptions,
logStore);
- options.putAll(logStoreRegister.registerTopic());
+ if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) {
+ try {
+ catalog.getTable(identifier);
+ } catch (Catalog.TableNotExistException e) {
+ LogStoreRegister logStoreRegister =
+ getLogStoreRegister(identifier, classLoader,
tableOptions, logStore);
+ options.putAll(logStoreRegister.registerTopic());
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 34b016fe0..f2f10d087 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
@@ -77,7 +78,9 @@ public class MigrateFileProcedure extends ProcedureBase {
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
- if (!(catalog.tableExists(targetTableId))) {
+ try {
+ catalog.getTable(targetTableId);
+ } catch (Catalog.TableNotExistException e) {
throw new IllegalArgumentException(
"Target paimon table does not exist: " +
targetPaimonTablePath);
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 8f2211041..ce1607e8d 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -297,10 +297,18 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public Map<String, String> loadDatabasePropertiesImpl(String name)
+ public org.apache.paimon.catalog.Database getDatabaseImpl(String name)
throws DatabaseNotExistException {
try {
- return convertToProperties(clients.run(client ->
client.getDatabase(name)));
+ Database database = clients.run(client ->
client.getDatabase(name));
+ Map<String, String> options = new
HashMap<>(database.getParameters());
+ if (database.getLocationUri() != null) {
+ options.put(DB_LOCATION_PROP, database.getLocationUri());
+ }
+ if (database.getDescription() != null) {
+ options.put(COMMENT_PROP, database.getDescription());
+ }
+ return org.apache.paimon.catalog.Database.of(name, options,
database.getDescription());
} catch (NoSuchObjectException e) {
throw new DatabaseNotExistException(name);
} catch (TException e) {
@@ -312,17 +320,6 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- private Map<String, String> convertToProperties(Database database) {
- Map<String, String> properties = new
HashMap<>(database.getParameters());
- if (database.getLocationUri() != null) {
- properties.put(DB_LOCATION_PROP, database.getLocationUri());
- }
- if (database.getDescription() != null) {
- properties.put(COMMENT_PROP, database.getDescription());
- }
- return properties;
- }
-
@Override
public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
throws TableNotExistException {
@@ -395,11 +392,16 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected List<String> listTablesImpl(String databaseName) {
try {
- return clients.run(
- client ->
- client.getAllTables(databaseName).stream()
- .filter(t -> tableExists(new
Identifier(databaseName, t)))
- .collect(Collectors.toList()));
+ List<String> allTables = clients.run(client ->
client.getAllTables(databaseName));
+ List<String> result = new ArrayList<>(allTables.size());
+ for (String table : allTables) {
+ try {
+ getTable(new Identifier(databaseName, table));
+ result.add(table);
+ } catch (TableNotExistException ignored) {
+ }
+ }
+ return result;
} catch (TException e) {
throw new RuntimeException("Failed to list all tables in database
" + databaseName, e);
} catch (InterruptedException e) {
@@ -408,21 +410,6 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- @Override
- public boolean tableExists(Identifier identifier) {
- if (isTableInSystemDatabase(identifier)) {
- return super.tableExists(identifier);
- }
-
- try {
- Table table = getHmsTable(identifier);
- return isPaimonTable(identifier, table)
- || (formatTableEnabled() && isFormatTable(table));
- } catch (TableNotExistException e) {
- return false;
- }
- }
-
@Override
public TableSchema getDataTableSchema(Identifier identifier) throws
TableNotExistException {
Table table = getHmsTable(identifier);
@@ -462,9 +449,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void createView(Identifier identifier, View view, boolean
ignoreIfExists)
throws ViewAlreadyExistException, DatabaseNotExistException {
- if (!databaseExists(identifier.getDatabaseName())) {
- throw new DatabaseNotExistException(identifier.getDatabaseName());
- }
+ getDatabase(identifier.getDatabaseName());
try {
getView(identifier);
@@ -541,9 +526,7 @@ public class HiveCatalog extends AbstractCatalog {
if (isSystemDatabase(databaseName)) {
return Collections.emptyList();
}
- if (!databaseExists(databaseName)) {
- throw new DatabaseNotExistException(databaseName);
- }
+ getDatabase(databaseName);
try {
List<String> tables = clients.run(client ->
client.getAllTables(databaseName));
@@ -571,15 +554,21 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void renameView(Identifier fromView, Identifier toView, boolean
ignoreIfNotExists)
throws ViewNotExistException, ViewAlreadyExistException {
- if (!viewExists(fromView)) {
+ try {
+ getView(fromView);
+ } catch (ViewNotExistException e) {
if (ignoreIfNotExists) {
return;
}
throw new ViewNotExistException(fromView);
}
- if (viewExists(toView)) {
+
+ try {
+ getView(toView);
throw new ViewAlreadyExistException(toView);
+ } catch (ViewNotExistException ignored) {
}
+
try {
String fromDB = fromView.getDatabaseName();
String fromViewName = fromView.getTableName();
@@ -870,7 +859,9 @@ public class HiveCatalog extends AbstractCatalog {
checkNotSystemDatabase(databaseName);
// create database if needed
- if (!databaseExists(databaseName)) {
+ try {
+ getDatabase(databaseName);
+ } catch (DatabaseNotExistException e) {
createDatabaseImpl(databaseName, Collections.emptyMap());
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
index b9928ce73..d1478830a 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/migrate/HiveMigrator.java
@@ -19,6 +19,7 @@
package org.apache.paimon.hive.migrate;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryWriter;
@@ -81,8 +82,8 @@ public class HiveMigrator implements Migrator {
private final String targetDatabase;
private final String targetTable;
private final CoreOptions coreOptions;
- private Boolean delete = true;
- private Integer parallelism;
+
+ private Boolean deleteOriginTable = true;
public HiveMigrator(
HiveCatalog hiveCatalog,
@@ -99,7 +100,6 @@ public class HiveMigrator implements Migrator {
this.sourceTable = sourceTable;
this.targetDatabase = targetDatabase;
this.targetTable = targetTable;
- this.parallelism = parallelism;
this.coreOptions = new CoreOptions(options);
this.executor = createCachedThreadPool(parallelism, "HIVE_MIGRATOR");
}
@@ -129,8 +129,8 @@ public class HiveMigrator implements Migrator {
}
@Override
- public void deleteOriginTable(boolean delete) {
- this.delete = delete;
+ public void deleteOriginTable(boolean deleteOriginTable) {
+ this.deleteOriginTable = deleteOriginTable;
}
@Override
@@ -145,14 +145,18 @@ public class HiveMigrator implements Migrator {
// create paimon table if not exists
Identifier identifier = Identifier.create(targetDatabase, targetTable);
- boolean alreadyExist = hiveCatalog.tableExists(identifier);
- if (!alreadyExist) {
+
+ boolean deleteIfFail = false;
+ try {
+ hiveCatalog.getTable(identifier);
+ } catch (Catalog.TableNotExistException e) {
Schema schema =
from(
client.getSchema(sourceDatabase, sourceTable),
sourceHiveTable.getPartitionKeys(),
properties);
hiveCatalog.createTable(identifier, schema, false);
+ deleteIfFail = true;
}
try {
@@ -211,14 +215,14 @@ public class HiveMigrator implements Migrator {
commit.commit(new ArrayList<>(commitMessages));
}
} catch (Exception e) {
- if (!alreadyExist) {
+ if (deleteIfFail) {
hiveCatalog.dropTable(identifier, true);
}
throw new RuntimeException("Migrating failed", e);
}
// if all success, drop the origin table according the delete field
- if (delete) {
+ if (deleteOriginTable) {
client.dropTable(sourceDatabase, sourceTable, true, true);
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 5123a1ee0..37601f4f8 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -19,15 +19,15 @@
package org.apache.paimon.hive;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogLock;
-import org.apache.paimon.catalog.CatalogLockFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.hive.annotation.Minio;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.operation.Lock;
import org.apache.paimon.privilege.NoPrivilegeException;
import org.apache.paimon.s3.MinioTestContainer;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
@@ -203,7 +203,7 @@ public abstract class HiveCatalogITCaseBase {
@Test
@LocationInProperties
public void testDbLocationWithMetastoreLocationInProperties()
- throws Catalog.DatabaseAlreadyExistException {
+ throws Catalog.DatabaseAlreadyExistException,
Catalog.DatabaseNotExistException {
String dbLocation = minioTestContainer.getS3UriForDefaultBucket() +
"/" + UUID.randomUUID();
Catalog catalog =
((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
@@ -211,7 +211,7 @@ public abstract class HiveCatalogITCaseBase {
properties.put("location", dbLocation);
catalog.createDatabase("location_test_db", false, properties);
- assertThat(catalog.databaseExists("location_test_db"));
+ catalog.getDatabase("location_test_db");
hiveShell.execute("USE location_test_db");
hiveShell.execute("CREATE TABLE location_test_db ( a INT, b INT )");
@@ -1128,11 +1128,12 @@ public abstract class HiveCatalogITCaseBase {
}
@Test
- public void testHiveLock() throws InterruptedException {
+ public void testHiveLock() throws InterruptedException,
Catalog.TableNotExistException {
tEnv.executeSql("CREATE TABLE t (a INT)");
Catalog catalog =
((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
- CatalogLockFactory lockFactory = catalog.lockFactory().get();
+ FileStoreTable table = (FileStoreTable) catalog.getTable(new
Identifier("test_db", "t"));
+ CatalogEnvironment catalogEnv = table.catalogEnvironment();
AtomicInteger count = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
@@ -1147,11 +1148,10 @@ public abstract class HiveCatalogITCaseBase {
Thread thread =
new Thread(
() -> {
- CatalogLock lock =
-
lockFactory.createLock(catalog.lockContext().get());
+ Lock lock = catalogEnv.lockFactory().create();
for (int j = 0; j < 10; j++) {
try {
- lock.runWithLock("test_db", "t",
unsafeIncrement);
+ lock.runWithLock(unsafeIncrement);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index ae5ab8b6e..2e4a2eaec 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -102,7 +102,9 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(),
DEFAULT_DATABASE.defaultValue());
- if (!catalog.databaseExists(defaultNamespace()[0])) {
+ try {
+ catalog.getDatabase(defaultNamespace()[0]);
+ } catch (Catalog.DatabaseNotExistException e) {
try {
createNamespace(defaultNamespace(), new HashMap<>());
} catch (NamespaceAlreadyExistsException ignored) {
@@ -152,10 +154,12 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
if (!isValidateNamespace(namespace)) {
throw new NoSuchNamespaceException(namespace);
}
- if (catalog.databaseExists(namespace[0])) {
+ try {
+ catalog.getDatabase(namespace[0]);
return new String[0][];
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new NoSuchNamespaceException(namespace);
}
- throw new NoSuchNamespaceException(namespace);
}
@Override
@@ -167,7 +171,7 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
Arrays.toString(namespace));
String dataBaseName = namespace[0];
try {
- return catalog.loadDatabaseProperties(dataBaseName);
+ return catalog.getDatabase(dataBaseName).options();
} catch (Catalog.DatabaseNotExistException e) {
throw new NoSuchNamespaceException(namespace);
}
@@ -284,15 +288,6 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
}
}
- @Override
- public boolean tableExists(Identifier ident) {
- try {
- return catalog.tableExists(toIdentifier(ident));
- } catch (NoSuchTableException e) {
- return false;
- }
- }
-
@Override
public org.apache.spark.sql.connector.catalog.Table alterTable(
Identifier ident, TableChange... changes) throws
NoSuchTableException {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
index 32f89d47b..95d55df01 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java
@@ -88,7 +88,9 @@ public class MigrateFileProcedure extends BaseProcedure {
Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
- if (!(paimonCatalog.tableExists(targetTableId))) {
+ try {
+ paimonCatalog.getTable(targetTableId);
+ } catch (Catalog.TableNotExistException e) {
throw new IllegalArgumentException(
"Target paimon table does not exist: " + targetTable);
}