This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 775c0867d [core] Introduce DelegateCatalog (#3824)
775c0867d is described below
commit 775c0867d60b914d7133e57e85a5fdf77ef0368a
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Jul 29 19:20:12 2024 +0800
[core] Introduce DelegateCatalog (#3824)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 17 +--
.../java/org/apache/paimon/catalog/Catalog.java | 9 +-
.../org/apache/paimon/catalog/DelegateCatalog.java | 162 +++++++++++++++++++++
.../apache/paimon/catalog/FileSystemCatalog.java | 11 +-
.../java/org/apache/paimon/jdbc/JdbcCatalog.java | 40 ++---
.../apache/paimon/privilege/PrivilegedCatalog.java | 95 +-----------
.../java/org/apache/paimon/hive/HiveCatalog.java | 23 +--
7 files changed, 194 insertions(+), 163 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 8b0245735..bdcafe8eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -129,17 +129,6 @@ public abstract class AbstractCatalog implements Catalog {
return catalogOptions.get(LOCK_ENABLED);
}
- @Override
- public boolean databaseExists(String databaseName) {
- if (isSystemDatabase(databaseName)) {
- return true;
- }
-
- return databaseExistsImpl(databaseName);
- }
-
- protected abstract boolean databaseExistsImpl(String databaseName);
-
@Override
public void createDatabase(String name, boolean ignoreIfExists,
Map<String, String> properties)
throws DatabaseAlreadyExistException {
@@ -159,13 +148,11 @@ public abstract class AbstractCatalog implements Catalog {
if (isSystemDatabase(name)) {
return Collections.emptyMap();
}
- if (!databaseExists(name)) {
- throw new DatabaseNotExistException(name);
- }
return loadDatabasePropertiesImpl(name);
}
- protected abstract Map<String, String> loadDatabasePropertiesImpl(String
name);
+ protected abstract Map<String, String> loadDatabasePropertiesImpl(String
name)
+ throws DatabaseNotExistException;
@Override
public void dropPartition(Identifier identifier, Map<String, String>
partitionSpec)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index b6221dac7..2d3f52901 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -85,7 +85,14 @@ public interface Catalog extends AutoCloseable {
* @param databaseName Name of the database
* @return true if the given database exists in the catalog false otherwise
*/
- boolean databaseExists(String databaseName);
+ default boolean databaseExists(String databaseName) {
+ try {
+ loadDatabaseProperties(databaseName);
+ return true;
+ } catch (DatabaseNotExistException e) {
+ return false;
+ }
+ }
/**
* Create a database, see {@link Catalog#createDatabase(String name,
boolean ignoreIfExists, Map
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
new file mode 100644
index 000000000..5f24d5cf8
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.metastore.MetastoreClient;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** A {@link Catalog} to delegate all operations to another {@link Catalog}. */
+public class DelegateCatalog implements Catalog {
+
+ protected final Catalog wrapped;
+
+ public DelegateCatalog(Catalog wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ public Catalog wrapped() {
+ return wrapped;
+ }
+
+ @Override
+ public boolean caseSensitive() {
+ return wrapped.caseSensitive();
+ }
+
+ @Override
+ public String warehouse() {
+ return wrapped.warehouse();
+ }
+
+ @Override
+ public Map<String, String> options() {
+ return wrapped.options();
+ }
+
+ @Override
+ public FileIO fileIO() {
+ return wrapped.fileIO();
+ }
+
+ @Override
+ public Optional<CatalogLockFactory> lockFactory() {
+ return wrapped.lockFactory();
+ }
+
+ @Override
+ public Optional<CatalogLockContext> lockContext() {
+ return wrapped.lockContext();
+ }
+
+ @Override
+ public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
+ return wrapped.metastoreClientFactory(identifier);
+ }
+
+ @Override
+ public List<String> listDatabases() {
+ return wrapped.listDatabases();
+ }
+
+ @Override
+ public void createDatabase(String name, boolean ignoreIfExists,
Map<String, String> properties)
+ throws DatabaseAlreadyExistException {
+ wrapped.createDatabase(name, ignoreIfExists, properties);
+ }
+
+ @Override
+ public Map<String, String> loadDatabaseProperties(String name)
+ throws DatabaseNotExistException {
+ return wrapped.loadDatabaseProperties(name);
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ wrapped.dropDatabase(name, ignoreIfNotExists, cascade);
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
+ return wrapped.listTables(databaseName);
+ }
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ wrapped.dropTable(identifier, ignoreIfNotExists);
+ }
+
+ @Override
+ public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ wrapped.createTable(identifier, schema, ignoreIfExists);
+ }
+
+ @Override
+ public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ wrapped.renameTable(fromTable, toTable, ignoreIfNotExists);
+ }
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ wrapped.alterTable(identifier, changes, ignoreIfNotExists);
+ }
+
+ @Override
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ return wrapped.getTable(identifier);
+ }
+
+ @Override
+ public void dropPartition(Identifier identifier, Map<String, String>
partitions)
+ throws TableNotExistException, PartitionNotExistException {
+ wrapped.dropPartition(identifier, partitions);
+ }
+
+ @Override
+ public void repairCatalog() {
+ wrapped.repairCatalog();
+ }
+
+ @Override
+ public void repairDatabase(String databaseName) {
+ wrapped.repairDatabase(databaseName);
+ }
+
+ @Override
+ public void repairTable(Identifier identifier) throws
TableNotExistException {
+ wrapped.repairTable(identifier);
+ }
+
+ @Override
+ public void close() throws Exception {
+ wrapped.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index 577e06836..20a7f8b2c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -61,11 +61,6 @@ public class FileSystemCatalog extends AbstractCatalog {
return uncheck(() -> listDatabasesInFileSystem(warehouse));
}
- @Override
- protected boolean databaseExistsImpl(String databaseName) {
- return uncheck(() -> fileIO.exists(newDatabasePath(databaseName)));
- }
-
@Override
protected void createDatabaseImpl(String name, Map<String, String>
properties) {
if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
@@ -81,7 +76,11 @@ public class FileSystemCatalog extends AbstractCatalog {
}
@Override
- public Map<String, String> loadDatabasePropertiesImpl(String name) {
+ public Map<String, String> loadDatabasePropertiesImpl(String name)
+ throws DatabaseNotExistException {
+ if (!uncheck(() -> fileIO.exists(newDatabasePath(name)))) {
+ throw new DatabaseNotExistException(name);
+ }
return Collections.emptyMap();
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
index 385b56fa5..54fd6a0ce 100644
--- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
@@ -159,14 +159,10 @@ public class JdbcCatalog extends AbstractCatalog {
}
@Override
- protected boolean databaseExistsImpl(String databaseName) {
- return JdbcUtils.databaseExists(connections, catalogKey, databaseName);
- }
-
- @Override
- protected Map<String, String> loadDatabasePropertiesImpl(String
databaseName) {
- if (!databaseExists(databaseName)) {
- throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
+ protected Map<String, String> loadDatabasePropertiesImpl(String
databaseName)
+ throws DatabaseNotExistException {
+ if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) {
+ throw new DatabaseNotExistException(databaseName);
}
Map<String, String> properties = Maps.newHashMap();
properties.putAll(fetchProperties(databaseName));
@@ -179,10 +175,6 @@ public class JdbcCatalog extends AbstractCatalog {
@Override
protected void createDatabaseImpl(String name, Map<String, String>
properties) {
- if (databaseExists(name)) {
- throw new RuntimeException(String.format("Database already exists:
%s", name));
- }
-
Map<String, String> createProps = new HashMap<>();
createProps.put(DATABASE_EXISTS_PROPERTY, "true");
if (properties != null && !properties.isEmpty()) {
@@ -206,9 +198,6 @@ public class JdbcCatalog extends AbstractCatalog {
@Override
protected List<String> listTablesImpl(String databaseName) {
- if (!databaseExists(databaseName)) {
- throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
- }
return fetch(
row -> row.getString(JdbcUtils.TABLE_NAME),
JdbcUtils.LIST_TABLES_SQL,
@@ -312,9 +301,6 @@ public class JdbcCatalog extends AbstractCatalog {
Identifier identifier, String branchName, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
assertMainBranch(branchName);
- if (!tableExists(identifier)) {
- throw new RuntimeException("Table is not exists " +
identifier.getFullName());
- }
SchemaManager schemaManager = getSchemaManager(identifier);
schemaManager.commitChanges(changes);
}
@@ -323,7 +309,11 @@ public class JdbcCatalog extends AbstractCatalog {
protected TableSchema getDataTableSchema(Identifier identifier, String
branchName)
throws TableNotExistException {
assertMainBranch(branchName);
- if (!tableExists(identifier)) {
+ if (!JdbcUtils.tableExists(
+ connections,
+ catalogKey,
+ identifier.getDatabaseName(),
+ identifier.getObjectName())) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
@@ -333,15 +323,6 @@ public class JdbcCatalog extends AbstractCatalog {
() -> new RuntimeException("There is no paimon table
in " + tableLocation));
}
- @Override
- public boolean tableExists(Identifier identifier) {
- if (isSystemTable(identifier)) {
- return super.tableExists(identifier);
- }
- return JdbcUtils.tableExists(
- connections, catalogKey, identifier.getDatabaseName(),
identifier.getObjectName());
- }
-
@Override
public boolean caseSensitive() {
return false;
@@ -383,9 +364,6 @@ public class JdbcCatalog extends AbstractCatalog {
}
private Map<String, String> fetchProperties(String databaseName) {
- if (!databaseExists(databaseName)) {
- throw new RuntimeException(String.format("Database does not exist:
%s", databaseName));
- }
List<Map.Entry<String, String>> entries =
fetch(
row ->
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index 851cc302a..3e4781086 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -19,11 +19,8 @@
package org.apache.paimon.privilege;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogLockContext;
-import org.apache.paimon.catalog.CatalogLockFactory;
+import org.apache.paimon.catalog.DelegateCatalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.schema.Schema;
@@ -34,10 +31,9 @@ import org.apache.paimon.utils.Preconditions;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
/** {@link Catalog} which supports privilege system. */
-public class PrivilegedCatalog implements Catalog {
+public class PrivilegedCatalog extends DelegateCatalog {
public static final ConfigOption<String> USER =
ConfigOptions.key("user").stringType().defaultValue(PrivilegeManager.USER_ANONYMOUS);
@@ -46,74 +42,17 @@ public class PrivilegedCatalog implements Catalog {
.stringType()
.defaultValue(PrivilegeManager.PASSWORD_ANONYMOUS);
- private final Catalog wrapped;
private final PrivilegeManager privilegeManager;
public PrivilegedCatalog(Catalog wrapped, PrivilegeManager
privilegeManager) {
- this.wrapped = wrapped;
+ super(wrapped);
this.privilegeManager = privilegeManager;
}
- public Catalog wrapped() {
- return wrapped;
- }
-
public PrivilegeManager privilegeManager() {
return privilegeManager;
}
- @Override
- public boolean caseSensitive() {
- return wrapped.caseSensitive();
- }
-
- @Override
- public String warehouse() {
- return wrapped.warehouse();
- }
-
- @Override
- public Map<String, String> options() {
- return wrapped.options();
- }
-
- @Override
- public FileIO fileIO() {
- return wrapped.fileIO();
- }
-
- @Override
- public Optional<CatalogLockFactory> lockFactory() {
- return wrapped.lockFactory();
- }
-
- @Override
- public Optional<CatalogLockContext> lockContext() {
- return wrapped.lockContext();
- }
-
- @Override
- public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier
identifier) {
- return wrapped.metastoreClientFactory(identifier);
- }
-
- @Override
- public List<String> listDatabases() {
- return wrapped.listDatabases();
- }
-
- @Override
- public boolean databaseExists(String databaseName) {
- return wrapped.databaseExists(databaseName);
- }
-
- @Override
- public void createDatabase(String name, boolean ignoreIfExists)
- throws DatabaseAlreadyExistException {
- privilegeManager.getPrivilegeChecker().assertCanCreateDatabase();
- wrapped.createDatabase(name, ignoreIfExists);
- }
-
@Override
public void createDatabase(String name, boolean ignoreIfExists,
Map<String, String> properties)
throws DatabaseAlreadyExistException {
@@ -121,12 +60,6 @@ public class PrivilegedCatalog implements Catalog {
wrapped.createDatabase(name, ignoreIfExists, properties);
}
- @Override
- public Map<String, String> loadDatabaseProperties(String name)
- throws DatabaseNotExistException {
- return wrapped.loadDatabaseProperties(name);
- }
-
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException {
@@ -135,16 +68,6 @@ public class PrivilegedCatalog implements Catalog {
privilegeManager.objectDropped(name);
}
- @Override
- public List<String> listTables(String databaseName) throws
DatabaseNotExistException {
- return wrapped.listTables(databaseName);
- }
-
- @Override
- public boolean tableExists(Identifier identifier) {
- return wrapped.tableExists(identifier);
- }
-
@Override
public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
throws TableNotExistException {
@@ -174,13 +97,6 @@ public class PrivilegedCatalog implements Catalog {
privilegeManager.objectRenamed(fromTable.getFullName(),
toTable.getFullName());
}
- @Override
- public void alterTable(Identifier identifier, SchemaChange change, boolean
ignoreIfNotExists)
- throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
- privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier);
- wrapped.alterTable(identifier, change, ignoreIfNotExists);
- }
-
@Override
public void alterTable(
Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
@@ -207,11 +123,6 @@ public class PrivilegedCatalog implements Catalog {
wrapped.dropPartition(identifier, partitions);
}
- @Override
- public void close() throws Exception {
- wrapped.close();
- }
-
public void createPrivilegedUser(String user, String password) {
privilegeManager.createUser(user, password);
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 94a52fd3a..7edb49f58 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -232,22 +232,6 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- @Override
- protected boolean databaseExistsImpl(String databaseName) {
- try {
- clients.run(client -> client.getDatabase(databaseName));
- return true;
- } catch (NoSuchObjectException e) {
- return false;
- } catch (TException e) {
- throw new RuntimeException(
- "Failed to determine if database " + databaseName + "
exists", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException("Interrupted in call to databaseExists
" + databaseName, e);
- }
- }
-
@Override
protected void createDatabaseImpl(String name, Map<String, String>
properties) {
try {
@@ -286,9 +270,12 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- public Map<String, String> loadDatabasePropertiesImpl(String name) {
+ public Map<String, String> loadDatabasePropertiesImpl(String name)
+ throws DatabaseNotExistException {
try {
return convertToProperties(clients.run(client ->
client.getDatabase(name)));
+ } catch (NoSuchObjectException e) {
+ throw new DatabaseNotExistException(name);
} catch (TException e) {
throw new RuntimeException(
String.format("Failed to get database %s properties",
name), e);
@@ -601,7 +588,7 @@ public class HiveCatalog extends AbstractCatalog {
checkNotSystemDatabase(databaseName);
// create database if needed
- if (!databaseExistsImpl(databaseName)) {
+ if (!databaseExists(databaseName)) {
createDatabaseImpl(databaseName, Collections.emptyMap());
}