This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4a665e6f5 [core] Introduce CachingCatalog (#3829)
4a665e6f5 is described below
commit 4a665e6f5e50dce9c8bc75bf9cd15d419404fccd
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 30 21:24:59 2024 +0800
[core] Introduce CachingCatalog (#3829)
---
.../generated/catalog_configuration.html | 12 +
.../org/apache/paimon/options/CatalogOptions.java | 14 +
.../org/apache/paimon/catalog/AbstractCatalog.java | 46 +---
.../org/apache/paimon/catalog/CachingCatalog.java | 198 ++++++++++++++
.../java/org/apache/paimon/catalog/Catalog.java | 41 +++
.../org/apache/paimon/catalog/CatalogFactory.java | 26 ++
.../apache/paimon/catalog/FileSystemCatalog.java | 2 +-
.../paimon/catalog/FileSystemCatalogFactory.java | 17 +-
.../org/apache/paimon/schema/SchemaManager.java | 2 +-
.../apache/paimon/catalog/CachingCatalogTest.java | 288 +++++++++++++++++++++
.../org/apache/paimon/catalog/CatalogTestBase.java | 20 +-
.../paimon/catalog/FileSystemCatalogTest.java | 32 +++
.../paimon/catalog/TestableCachingCatalog.java | 57 ++++
.../org/apache/paimon/jdbc/JdbcCatalogTest.java | 9 -
.../table/system/AllTableOptionsTableTest.java | 33 +--
.../java/org/apache/paimon/utils/FakeTicker.java | 42 +++
.../flink/action/cdc/SyncDatabaseActionBase.java | 7 +-
.../flink/action/cdc/SyncTableActionBase.java | 6 +-
.../mysql/MySqlSyncDatabaseTableListITCase.java | 2 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 5 +-
.../org/apache/paimon/flink/action/ActionBase.java | 7 +
.../paimon/flink/clone/CopyFileOperator.java | 32 ++-
.../flink/procedure/MigrateDatabaseProcedure.java | 7 +-
.../flink/procedure/MigrateFileProcedure.java | 6 +-
.../flink/procedure/MigrateTableProcedure.java | 7 +-
.../paimon/flink/procedure/RepairProcedure.java | 5 -
.../paimon/flink/utils/TableMigrationUtils.java | 26 +-
.../apache/paimon/flink/BatchFileStoreITCase.java | 35 ---
.../paimon/flink/FileSystemCatalogITCase.java | 3 +-
.../org/apache/paimon/flink/FlinkCatalogTest.java | 16 +-
.../apache/paimon/flink/PartialUpdateITCase.java | 43 ---
.../paimon/flink/action/ActionITCaseBase.java | 6 +-
.../privilege/PrivilegeProcedureITCase.java | 2 -
.../java/org/apache/paimon/hive/HiveCatalog.java | 28 +-
.../java/org/apache/paimon/spark/SparkCatalog.java | 8 +
.../paimon/spark/utils/TableMigrationUtils.java | 12 +-
.../apache/paimon/spark/SparkFileIndexITCase.java | 2 +
.../apache/paimon/spark/PaimonSparkTestBase.scala | 9 +-
38 files changed, 844 insertions(+), 269 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index a583c7471..4508d898d 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -32,6 +32,18 @@ under the License.
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default
value depends on the implementation of the specific catalog.</td>
</tr>
+ <tr>
+ <td><h5>cache-enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Controls whether the catalog will cache databases, tables and
manifests.</td>
+ </tr>
+ <tr>
+ <td><h5>cache.expiration-interval</h5></td>
+ <td style="word-wrap: break-word;">1 min</td>
+ <td>Duration</td>
+ <td>Controls the duration for which databases and tables in the
catalog are cached.</td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index 9ef681809..980083e71 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -91,6 +91,20 @@ public class CatalogOptions {
.defaultValue(2)
.withDescription("Configure the size of the connection
pool.");
+ public static final ConfigOption<Boolean> CACHE_ENABLED =
+ key("cache-enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Controls whether the catalog will cache
databases, tables and manifests.");
+
+ public static final ConfigOption<Duration> CACHE_EXPIRATION_INTERVAL_MS =
+ key("cache.expiration-interval")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "Controls the duration for which databases and
tables in the catalog are cached.");
+
public static final ConfigOption<String> LINEAGE_META =
key("lineage-meta")
.stringType()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index afe64666a..062e93532 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -45,7 +45,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -58,17 +57,12 @@ import static
org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
- public static final String DB_SUFFIX = ".db";
- protected static final String TABLE_DEFAULT_OPTION_PREFIX =
"table-default.";
- protected static final String DB_LOCATION_PROP = "location";
-
protected final FileIO fileIO;
protected final Map<String, String> tableDefaultOptions;
protected final Options catalogOptions;
@@ -86,8 +80,7 @@ public abstract class AbstractCatalog implements Catalog {
this.fileIO = fileIO;
this.lineageMetaFactory =
findAndCreateLineageMeta(options,
AbstractCatalog.class.getClassLoader());
- this.tableDefaultOptions =
- convertToPropertiesPrefixKey(options.toMap(),
TABLE_DEFAULT_OPTION_PREFIX);
+ this.tableDefaultOptions =
Catalog.tableDefaultOptions(options.toMap());
this.catalogOptions = options;
}
@@ -445,12 +438,12 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- private static boolean isSpecifiedSystemTable(Identifier identifier) {
+ public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
&& !getOriginalIdentifierAndBranch(identifier).isPresent();
}
- protected boolean isSystemTable(Identifier identifier) {
+ protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) ||
isSpecifiedSystemTable(identifier);
}
@@ -463,11 +456,11 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- public void copyTableDefaultOptions(Map<String, String> options) {
+ private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}
- private String[] tableAndSystemName(Identifier identifier) {
+ public static String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(),
SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
@@ -493,7 +486,7 @@ public abstract class AbstractCatalog implements Catalog {
return new Path(warehouse, database + DB_SUFFIX);
}
- private boolean isSystemDatabase(String database) {
+ public static boolean isSystemDatabase(String database) {
return SYSTEM_DATABASE_NAME.equals(database);
}
@@ -504,30 +497,9 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- /** Validate database, table and field names must be lowercase when not
case-sensitive. */
- public static void validateCaseInsensitive(
- boolean caseSensitive, String type, String... names) {
- validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
- }
-
- /** Validate database, table and field names must be lowercase when not
case-sensitive. */
- public static void validateCaseInsensitive(
- boolean caseSensitive, String type, List<String> names) {
- if (caseSensitive) {
- return;
- }
- List<String> illegalNames =
- names.stream().filter(f ->
!f.equals(f.toLowerCase())).collect(Collectors.toList());
- checkArgument(
- illegalNames.isEmpty(),
- String.format(
- "%s name %s cannot contain upper case in the catalog.",
- type, illegalNames));
- }
-
protected void validateIdentifierNameCaseInsensitive(Identifier
identifier) {
- validateCaseInsensitive(allowUpperCase(), "Database",
identifier.getDatabaseName());
- validateCaseInsensitive(allowUpperCase(), "Table",
identifier.getObjectName());
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Database",
identifier.getDatabaseName());
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Table",
identifier.getObjectName());
}
private void
validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
@@ -545,7 +517,7 @@ public abstract class AbstractCatalog implements Catalog {
}
protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
- validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
+ Catalog.validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}
private void validateAutoCreateClose(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
new file mode 100644
index 000000000..b11765a9a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalCause;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.RemovalListener;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
+import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
+import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;
+
+/** A {@link Catalog} to cache databases and tables and manifests. */
+public class CachingCatalog extends DelegateCatalog {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CachingCatalog.class);
+
+ protected final Cache<String, Map<String, String>> databaseCache;
+ protected final Cache<Identifier, Table> tableCache;
+
+ public CachingCatalog(Catalog wrapped) {
+ this(wrapped, CACHE_EXPIRATION_INTERVAL_MS.defaultValue());
+ }
+
+ public CachingCatalog(Catalog wrapped, Duration expirationInterval) {
+ this(wrapped, expirationInterval, Ticker.systemTicker());
+ }
+
+ public CachingCatalog(Catalog wrapped, Duration expirationInterval, Ticker
ticker) {
+ super(wrapped);
+ if (expirationInterval.isZero() || expirationInterval.isNegative()) {
+ throw new IllegalArgumentException(
+ "When cache.expiration-interval is set to negative or 0,
the catalog cache should be disabled.");
+ }
+
+ this.databaseCache =
+ Caffeine.newBuilder()
+ .softValues()
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .ticker(ticker)
+ .build();
+ this.tableCache =
+ Caffeine.newBuilder()
+ .softValues()
+ .removalListener(new
TableInvalidatingRemovalListener())
+ .executor(Runnable::run)
+ .expireAfterAccess(expirationInterval)
+ .ticker(ticker)
+ .build();
+ }
+
+ @Override
+ public Map<String, String> loadDatabaseProperties(String databaseName)
+ throws DatabaseNotExistException {
+ Map<String, String> properties =
databaseCache.getIfPresent(databaseName);
+ if (properties != null) {
+ return properties;
+ }
+
+ properties = super.loadDatabaseProperties(databaseName);
+ databaseCache.put(databaseName, properties);
+ return properties;
+ }
+
+ @Override
+ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean
cascade)
+ throws DatabaseNotExistException, DatabaseNotEmptyException {
+ super.dropDatabase(name, ignoreIfNotExists, cascade);
+ databaseCache.invalidate(name);
+ if (cascade) {
+ List<Identifier> tables = new ArrayList<>();
+ for (Identifier identifier : tableCache.asMap().keySet()) {
+ if (identifier.getDatabaseName().equals(name)) {
+ tables.add(identifier);
+ }
+ }
+ tables.forEach(tableCache::invalidate);
+ }
+ }
+
+ @Override
+ public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+ throws TableNotExistException {
+ super.dropTable(identifier, ignoreIfNotExists);
+ invalidateTable(identifier);
+ }
+
+ @Override
+ public void renameTable(Identifier fromTable, Identifier toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ super.renameTable(fromTable, toTable, ignoreIfNotExists);
+ invalidateTable(fromTable);
+ }
+
+ @Override
+ public void alterTable(
+ Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
+ throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ super.alterTable(identifier, changes, ignoreIfNotExists);
+ invalidateTable(identifier);
+ }
+
+ @Override
+ public Table getTable(Identifier identifier) throws TableNotExistException
{
+ Table table = tableCache.getIfPresent(identifier);
+ if (table != null) {
+ return table;
+ }
+
+ if (isSpecifiedSystemTable(identifier)) {
+ String[] splits = tableAndSystemName(identifier);
+ String tableName = splits[0];
+ String type = splits[1];
+
+ Identifier originIdentifier =
+ Identifier.create(identifier.getDatabaseName(), tableName);
+ Table originTable = tableCache.getIfPresent(originIdentifier);
+ if (originTable == null) {
+ originTable = wrapped.getTable(originIdentifier);
+ tableCache.put(originIdentifier, originTable);
+ }
+ table = SystemTableLoader.load(type, (FileStoreTable) originTable);
+ if (table == null) {
+ throw new TableNotExistException(identifier);
+ }
+ tableCache.put(identifier, table);
+ return table;
+ }
+
+ table = wrapped.getTable(identifier);
+ tableCache.put(identifier, table);
+ return table;
+ }
+
+ private class TableInvalidatingRemovalListener implements
RemovalListener<Identifier, Table> {
+ @Override
+ public void onRemoval(Identifier identifier, Table table, @NonNull
RemovalCause cause) {
+ LOG.debug("Evicted {} from the table cache ({})", identifier,
cause);
+ if (RemovalCause.EXPIRED.equals(cause)) {
+ tryInvalidateSysTables(identifier);
+ }
+ }
+ }
+
+ @Override
+ public void invalidateTable(Identifier identifier) {
+ tableCache.invalidate(identifier);
+ tryInvalidateSysTables(identifier);
+ }
+
+ private void tryInvalidateSysTables(Identifier identifier) {
+ if (!isSpecifiedSystemTable(identifier)) {
+ tableCache.invalidateAll(allSystemTables(identifier));
+ }
+ }
+
+ private static Iterable<Identifier> allSystemTables(Identifier ident) {
+ List<Identifier> tables = new ArrayList<>();
+ for (String type : SYSTEM_TABLES) {
+ tables.add(Identifier.fromString(ident.getFullName() +
SYSTEM_TABLE_SPLITTER + type));
+ }
+ return tables;
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index fe8e0b68b..ac229464b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,10 +26,15 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* This interface is responsible for reading and writing metadata such as
database/table from a
@@ -47,6 +52,9 @@ public interface Catalog extends AutoCloseable {
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String COMMENT_PROP = "comment";
+ String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
+ String DB_LOCATION_PROP = "location";
+ String DB_SUFFIX = ".db";
/** Warehouse root path containing all database directories in this
catalog. */
String warehouse();
@@ -235,6 +243,16 @@ public interface Catalog extends AutoCloseable {
void alterTable(Identifier identifier, List<SchemaChange> changes, boolean
ignoreIfNotExists)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException;
+ /**
+ * Invalidate cached table metadata for an {@link Identifier identifier}.
+ *
+ * <p>If the table is already loaded or cached, drop cached data. If the
table does not exist or
+ * is not cached, do nothing. Calling this method should not query remote
services.
+ *
+ * @param identifier a table identifier
+ */
+ default void invalidateTable(Identifier identifier) {}
+
/**
* Drop the partition of the specify table.
*
@@ -277,6 +295,29 @@ public interface Catalog extends AutoCloseable {
throw new UnsupportedOperationException();
}
+ static Map<String, String> tableDefaultOptions(Map<String, String>
options) {
+ return convertToPropertiesPrefixKey(options,
TABLE_DEFAULT_OPTION_PREFIX);
+ }
+
+ /** Validate database, table and field names must be lowercase when not
case-sensitive. */
+ static void validateCaseInsensitive(boolean caseSensitive, String type,
String... names) {
+ validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
+ }
+
+ /** Validate database, table and field names must be lowercase when not
case-sensitive. */
+ static void validateCaseInsensitive(boolean caseSensitive, String type,
List<String> names) {
+ if (caseSensitive) {
+ return;
+ }
+ List<String> illegalNames =
+ names.stream().filter(f ->
!f.equals(f.toLowerCase())).collect(Collectors.toList());
+ checkArgument(
+ illegalNames.isEmpty(),
+ String.format(
+ "%s name %s cannot contain upper case in the catalog.",
+ type, illegalNames));
+ }
+
/** Exception for trying to drop on a database that is not empty. */
class DatabaseNotEmptyException extends Exception {
private static final String MSG = "Database %s is not empty.";
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index c153f0ee3..2eff8e902 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -24,11 +24,16 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
+import org.apache.paimon.privilege.FileBasedPrivilegeManager;
+import org.apache.paimon.privilege.PrivilegeManager;
+import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
import java.io.UncheckedIOException;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static
org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
@@ -67,6 +72,27 @@ public interface CatalogFactory extends Factory {
}
static Catalog createCatalog(CatalogContext context, ClassLoader
classLoader) {
+ Catalog catalog = createUnwrappedCatalog(context, classLoader);
+
+ Options options = context.options();
+ if (options.get(CACHE_ENABLED)) {
+ catalog = new CachingCatalog(catalog,
options.get(CACHE_EXPIRATION_INTERVAL_MS));
+ }
+
+ PrivilegeManager privilegeManager =
+ new FileBasedPrivilegeManager(
+ catalog.warehouse(),
+ catalog.fileIO(),
+ context.options().get(PrivilegedCatalog.USER),
+ context.options().get(PrivilegedCatalog.PASSWORD));
+ if (privilegeManager.privilegeEnabled()) {
+ catalog = new PrivilegedCatalog(catalog, privilegeManager);
+ }
+
+ return catalog;
+ }
+
+ static Catalog createUnwrappedCatalog(CatalogContext context, ClassLoader
classLoader) {
Options options = context.options();
String metastore = options.get(METASTORE);
CatalogFactory catalogFactory =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index c2ff37601..64f38a106 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -63,7 +63,7 @@ public class FileSystemCatalog extends AbstractCatalog {
@Override
protected void createDatabaseImpl(String name, Map<String, String>
properties) {
- if (properties.containsKey(AbstractCatalog.DB_LOCATION_PROP)) {
+ if (properties.containsKey(Catalog.DB_LOCATION_PROP)) {
throw new IllegalArgumentException(
"Cannot specify location for a database when using
fileSystem catalog.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
index 8a0b1643f..6ab3664e4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalogFactory.java
@@ -20,9 +20,6 @@ package org.apache.paimon.catalog;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.table.TableType;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
@@ -44,18 +41,6 @@ public class FileSystemCatalogFactory implements
CatalogFactory {
"Only managed table is supported in File system catalog.");
}
- Catalog catalog = new FileSystemCatalog(fileIO, warehouse,
context.options());
-
- PrivilegeManager privilegeManager =
- new FileBasedPrivilegeManager(
- warehouse.toString(),
- fileIO,
- context.options().get(PrivilegedCatalog.USER),
- context.options().get(PrivilegedCatalog.PASSWORD));
- if (privilegeManager.privilegeEnabled()) {
- catalog = new PrivilegedCatalog(catalog, privilegeManager);
- }
-
- return catalog;
+ return new FileSystemCatalog(fileIO, warehouse, context.options());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 684adfe9d..3c9db09aa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -66,7 +66,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.paimon.catalog.AbstractCatalog.DB_SUFFIX;
+import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
import static org.apache.paimon.catalog.Identifier.UNKNOWN_DATABASE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.FileUtils.listVersionedFiles;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
new file mode 100644
index 000000000..56fc238a8
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.FakeTicker;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+class CachingCatalogTest extends CatalogTestBase {
+
+ private static final Duration EXPIRATION_TTL = Duration.ofMinutes(5);
+ private static final Duration HALF_OF_EXPIRATION =
EXPIRATION_TTL.dividedBy(2);
+
+ private FakeTicker ticker;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+ ticker = new FakeTicker();
+ catalog.createDatabase("db", false);
+ }
+
+ @Override
+ @Test
+ public void testListDatabasesWhenNoDatabases() {
+ List<String> databases = catalog.listDatabases();
+ assertThat(databases).contains("db");
+ }
+
+ @Test
+ public void testInvalidateSystemTablesIfBaseTableIsModified() throws
Exception {
+ Catalog catalog = new CachingCatalog(this.catalog);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA,
false);
+ Identifier sysIdent = new Identifier("db", "tbl$files");
+ Table sysTable = catalog.getTable(sysIdent);
+ catalog.alterTable(tableIdent, SchemaChange.addColumn("col3",
DataTypes.INT()), false);
+ assertThat(catalog.getTable(sysIdent)).isNotSameAs(sysTable);
+ }
+
+ @Test
+ public void testInvalidateSysTablesIfBaseTableIsDropped() throws Exception
{
+ Catalog catalog = new CachingCatalog(this.catalog);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(new Identifier("db", "tbl"), DEFAULT_TABLE_SCHEMA,
false);
+ Identifier sysIdent = new Identifier("db", "tbl$files");
+ catalog.getTable(sysIdent);
+ catalog.dropTable(tableIdent, false);
+ assertThatThrownBy(() -> catalog.getTable(sysIdent))
+ .hasMessage("Table db.tbl does not exist.");
+ }
+
+ @Test
+ public void testTableExpiresAfterInterval() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(tableIdent);
+
+ // Ensure table is cached with full ttl remaining upon creation
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
assertThat(catalog.remainingAgeFor(tableIdent)).isPresent().get().isEqualTo(EXPIRATION_TTL);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+
+ ticker.advance(HALF_OF_EXPIRATION.plus(Duration.ofSeconds(10)));
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+ assertThat(catalog.getTable(tableIdent))
+ .as("CachingCatalog should return a new instance after
expiration")
+ .isNotSameAs(table);
+ }
+
+ @Test
+ public void testCatalogExpirationTtlRefreshesAfterAccessViaCatalog()
throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(Duration.ZERO);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
assertThat(catalog.ageOf(tableIdent)).isPresent().get().isEqualTo(HALF_OF_EXPIRATION);
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION);
+
+ Duration oneMinute = Duration.ofMinutes(1L);
+ ticker.advance(oneMinute);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent))
+ .isPresent()
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION.plus(oneMinute));
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .get()
+ .isEqualTo(HALF_OF_EXPIRATION.minus(oneMinute));
+
+ // Access the table via the catalog, which should refresh the TTL
+ Table table = catalog.getTable(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(EXPIRATION_TTL);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+
assertThat(catalog.remainingAgeFor(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+ }
+
+ @Test
+ public void testCacheExpirationEagerlyRemovesSysTables() throws Exception {
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ Table table = catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(Duration.ZERO);
+
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+
assertThat(catalog.ageOf(tableIdent)).get().isEqualTo(HALF_OF_EXPIRATION);
+
+ for (Identifier sysTable : sysTables(tableIdent)) {
+ catalog.getTable(sysTable);
+ }
+
assertThat(catalog.cache().asMap()).containsKeys(sysTables(tableIdent));
+ assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+ .isNotEmpty()
+ .allMatch(age -> age.isPresent() &&
age.get().equals(Duration.ZERO));
+
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .as("Loading a non-cached sys table should refresh the main
table's age")
+ .isEqualTo(Optional.of(EXPIRATION_TTL));
+
+ // Move time forward and access already cached sys tables.
+ ticker.advance(HALF_OF_EXPIRATION);
+ for (Identifier sysTable : sysTables(tableIdent)) {
+ catalog.getTable(sysTable);
+ }
+ assertThat(Arrays.stream(sysTables(tableIdent)).map(catalog::ageOf))
+ .isNotEmpty()
+ .allMatch(age -> age.isPresent() &&
age.get().equals(Duration.ZERO));
+
+ assertThat(catalog.remainingAgeFor(tableIdent))
+ .as("Accessing a cached sys table should not affect the main
table's age")
+ .isEqualTo(Optional.of(HALF_OF_EXPIRATION));
+
+ // Move time forward so the data table drops.
+ ticker.advance(HALF_OF_EXPIRATION);
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+
+ Arrays.stream(sysTables(tableIdent))
+ .forEach(
+ sysTable ->
+ assertThat(catalog.cache().asMap())
+ .as(
+ "When a data table expires,
its sys tables should expire regardless of age")
+ .doesNotContainKeys(sysTable));
+ }
+
+ @Test
+ public void testDeadlock() throws Exception {
+ Catalog underlyCatalog = this.catalog;
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(this.catalog,
Duration.ofSeconds(1), ticker);
+ int numThreads = 20;
+ List<Identifier> createdTables = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ Identifier tableIdent = new Identifier("db", "tbl" + i);
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ createdTables.add(tableIdent);
+ }
+
+ Cache<Identifier, Table> cache = catalog.cache();
+ AtomicInteger cacheGetCount = new AtomicInteger(0);
+ AtomicInteger cacheCleanupCount = new AtomicInteger(0);
+ ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+ for (int i = 0; i < numThreads; i++) {
+ if (i % 2 == 0) {
+ String table = "tbl" + i;
+ executor.submit(
+ () -> {
+ ticker.advance(Duration.ofSeconds(2));
+ cache.get(
+ new Identifier("db", table),
+ identifier -> {
+ try {
+ return
underlyCatalog.getTable(identifier);
+ } catch
(Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ cacheGetCount.incrementAndGet();
+ });
+ } else {
+ executor.submit(
+ () -> {
+ ticker.advance(Duration.ofSeconds(2));
+ cache.cleanUp();
+ cacheCleanupCount.incrementAndGet();
+ });
+ }
+ }
+ executor.awaitTermination(2, TimeUnit.SECONDS);
+ assertThat(cacheGetCount).hasValue(numThreads / 2);
+ assertThat(cacheCleanupCount).hasValue(numThreads / 2);
+
+ executor.shutdown();
+ }
+
+ @Test
+ public void testCachingCatalogRejectsExpirationIntervalOfZero() {
+ Assertions.assertThatThrownBy(
+ () -> new TestableCachingCatalog(this.catalog,
Duration.ZERO, ticker))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "When cache.expiration-interval is set to negative or
0, the catalog cache should be disabled.");
+ }
+
+ @Test
+ public void testInvalidateTableForChainedCachingCatalogs() throws
Exception {
+ TestableCachingCatalog wrappedCatalog =
+ new TestableCachingCatalog(this.catalog, EXPIRATION_TTL,
ticker);
+ TestableCachingCatalog catalog =
+ new TestableCachingCatalog(wrappedCatalog, EXPIRATION_TTL,
ticker);
+ Identifier tableIdent = new Identifier("db", "tbl");
+ catalog.createTable(tableIdent, DEFAULT_TABLE_SCHEMA, false);
+ catalog.getTable(tableIdent);
+ assertThat(catalog.cache().asMap()).containsKey(tableIdent);
+ catalog.dropTable(tableIdent, false);
+ assertThat(catalog.cache().asMap()).doesNotContainKey(tableIdent);
+
assertThat(wrappedCatalog.cache().asMap()).doesNotContainKey(tableIdent);
+ }
+
+ public static Identifier[] sysTables(Identifier tableIdent) {
+ return SystemTableLoader.SYSTEM_TABLES.stream()
+ .map(type -> Identifier.fromString(tableIdent.getFullName() +
"$" + type))
+ .toArray(Identifier[]::new);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 378ebf159..19c9e83c4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -55,6 +55,7 @@ public abstract class CatalogTestBase {
protected String warehouse;
protected FileIO fileIO;
protected Catalog catalog;
+
protected static final Schema DEFAULT_TABLE_SCHEMA =
new Schema(
Lists.newArrayList(
@@ -66,18 +67,6 @@ public abstract class CatalogTestBase {
Maps.newHashMap(),
"");
- protected static final Schema PARTITION_SCHEMA =
- new Schema(
- Lists.newArrayList(
- new DataField(0, "pk1", DataTypes.INT()),
- new DataField(1, "pk2", DataTypes.STRING()),
- new DataField(3, "pk3", DataTypes.STRING()),
- new DataField(4, "col", DataTypes.STRING())),
- Arrays.asList("pk1", "pk2"),
- Arrays.asList("pk1", "pk2", "pk3"),
- Maps.newHashMap(),
- "");
-
@BeforeEach
public void setUp() throws Exception {
warehouse = tempFile.toUri().toString();
@@ -95,7 +84,10 @@ public abstract class CatalogTestBase {
}
@Test
- public abstract void testListDatabasesWhenNoDatabases();
+ public void testListDatabasesWhenNoDatabases() {
+ List<String> databases = catalog.listDatabases();
+ assertThat(databases).isEqualTo(new ArrayList<>());
+ }
@Test
public void testListDatabases() throws Exception {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
new file mode 100644
index 000000000..67fbd2718
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.fs.Path;
+
+import org.junit.jupiter.api.BeforeEach;
+
+class FileSystemCatalogTest extends CatalogTestBase {
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ catalog = new FileSystemCatalog(fileIO, new Path(warehouse));
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
new file mode 100644
index 000000000..c393d3aff
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/catalog/TestableCachingCatalog.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.catalog;
+
+import org.apache.paimon.table.Table;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.Optional;
+
+/**
+ * A wrapper around CachingCatalog that provides accessor methods to test the
underlying cache,
+ * without making those fields public in the CachingCatalog itself.
+ */
+public class TestableCachingCatalog extends CachingCatalog {
+
+ private final Duration cacheExpirationInterval;
+
+ public TestableCachingCatalog(Catalog catalog, Duration
expirationInterval, Ticker ticker) {
+ super(catalog, expirationInterval, ticker);
+ this.cacheExpirationInterval = expirationInterval;
+ }
+
+ public Cache<Identifier, Table> cache() {
+ // cleanUp must be called as tests apply assertions directly on the
underlying map, but
+ // metadata
+ // table map entries are cleaned up asynchronously.
+ tableCache.cleanUp();
+ return tableCache;
+ }
+
+ public Optional<Duration> ageOf(Identifier identifier) {
+ return tableCache.policy().expireAfterAccess().get().ageOf(identifier);
+ }
+
+ public Optional<Duration> remainingAgeFor(Identifier identifier) {
+ return ageOf(identifier).map(cacheExpirationInterval::minus);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index f0c84eb2c..f5befc724 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -32,8 +32,6 @@ import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -88,13 +86,6 @@ public class JdbcCatalogTest extends CatalogTestBase {
.isTrue();
}
- @Test
- @Override
- public void testListDatabasesWhenNoDatabases() {
- List<String> databases = catalog.listDatabases();
- assertThat(databases).isEqualTo(new ArrayList<>());
- }
-
@Test
public void testCheckIdentifierUpperCase() throws Exception {
catalog.createDatabase("test_db", false);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
index f2a1efce5..764c0f4e1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
@@ -18,10 +18,7 @@
package org.apache.paimon.table.system;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.types.DataTypes;
@@ -29,18 +26,12 @@ import org.apache.paimon.types.DataTypes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Spliterator;
-import java.util.Spliterators;
+import java.util.Objects;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
-import static org.apache.paimon.table.system.AllTableOptionsTable.options;
-import static org.apache.paimon.table.system.AllTableOptionsTable.toRow;
import static org.assertj.core.api.Assertions.assertThat;
/** Unit tests for {@link AllTableOptionsTable}. */
@@ -69,18 +60,14 @@ public class AllTableOptionsTableTest extends TableTestBase
{
@Test
public void testSchemasTable() throws Exception {
- List<InternalRow> expectRow = getExceptedResult();
- List<InternalRow> result = read(allTableOptionsTable);
- assertThat(result).containsExactlyElementsOf(expectRow);
- }
-
- private List<InternalRow> getExceptedResult() {
- AbstractCatalog abstractCatalog = (AbstractCatalog) catalog;
- Map<String, Map<String, Path>> stringMapMap =
abstractCatalog.allTablePaths();
- Iterator<InternalRow> rows =
- toRow(options(((AbstractCatalog) catalog).fileIO(),
stringMapMap));
- return StreamSupport.stream(
- Spliterators.spliteratorUnknownSize(rows,
Spliterator.ORDERED), false)
- .collect(Collectors.toList());
+ List<String> result =
+ read(allTableOptionsTable).stream()
+ .map(Objects::toString)
+ .collect(Collectors.toList());
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ "+I(default,T,fields.sales.aggregate-function,sum)",
+ "+I(default,T,merge-engine,aggregation)",
+ "+I(default,T,fields.price.aggregate-function,max)");
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
new file mode 100644
index 000000000..cbd536ad5
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FakeTicker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Ticker;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** A {@code Ticker} whose value can be advanced programmatically in tests. */
+public class FakeTicker implements Ticker {
+
+ private final AtomicLong nanos = new AtomicLong();
+
+ public FakeTicker() {}
+
+ public FakeTicker advance(Duration duration) {
+ nanos.addAndGet(duration.toNanos());
+ return this;
+ }
+
+ @Override
+ public long read() {
+ return nanos.get();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index d6c4c12ec..6c62bcc0c 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action.cdc;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
@@ -120,9 +119,9 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database",
database);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table
prefix", tablePrefix);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table
suffix", tableSuffix);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table prefix",
tablePrefix);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table suffix",
tableSuffix);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index be8cedf0b..4c7db6d28 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.Action;
@@ -114,8 +114,8 @@ public abstract class SyncTableActionBase extends
SynchronizationActionBase {
@Override
protected void validateCaseSensitivity() {
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database",
database);
- AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table",
table);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Database", database);
+ Catalog.validateCaseInsensitive(allowUpperCase, "Table", table);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
index 816b7b903..c15cbf898 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseTableListITCase.java
@@ -45,7 +45,7 @@ public class MySqlSyncDatabaseTableListITCase extends
MySqlActionITCaseBase {
}
@Test
- @Timeout(120)
+ @Timeout(180)
public void testActionRunResult() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 98efd479f..2cb96e007 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -347,10 +347,7 @@ public class FlinkCatalog extends AbstractCatalog {
// Although catalog.createTable will copy the default options, but
we need this info
// here before create table, such as
table-default.kafka.bootstrap.servers defined in
// catalog options. Temporarily, we copy the default options here.
- if (catalog instanceof org.apache.paimon.catalog.AbstractCatalog) {
- ((org.apache.paimon.catalog.AbstractCatalog) catalog)
- .copyTableDefaultOptions(options);
- }
+
Catalog.tableDefaultOptions(catalog.options()).forEach(options::putIfAbsent);
options.put(REGISTER_TIMEOUT.key(),
logStoreAutoRegisterTimeout.toString());
registerLogSystem(catalog, identifier, options, classLoader);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index dd32c52c6..30e32d62e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -40,6 +40,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
/** Abstract base of {@link Action} for table. */
public abstract class ActionBase implements Action {
@@ -55,6 +57,11 @@ public abstract class ActionBase implements Action {
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
+ // disable cache to avoid concurrent modification exception
+ if (!catalogOptions.contains(CACHE_ENABLED)) {
+ catalogOptions.set(CACHE_ENABLED, false);
+ }
+
catalog = initPaimonCatalog();
flinkCatalog = initFlinkCatalog();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index 5b320f4e9..e5c833606 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink.clone;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+import static org.apache.paimon.CoreOptions.PATH;
+
/** A Operator to copy files. */
public class CopyFileOperator extends AbstractStreamOperator<CloneFileInfo>
implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
@@ -43,8 +45,8 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
private final Map<String, String> sourceCatalogConfig;
private final Map<String, String> targetCatalogConfig;
- private AbstractCatalog sourceCatalog;
- private AbstractCatalog targetCatalog;
+ private Catalog sourceCatalog;
+ private Catalog targetCatalog;
public CopyFileOperator(
Map<String, String> sourceCatalogConfig, Map<String, String>
targetCatalogConfig) {
@@ -55,13 +57,9 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
@Override
public void open() throws Exception {
sourceCatalog =
- (AbstractCatalog)
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(sourceCatalogConfig));
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
- (AbstractCatalog)
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(targetCatalogConfig));
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
}
@Override
@@ -71,11 +69,19 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();
Path sourceTableRootPath =
- sourceCatalog.getDataTableLocation(
-
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
+ new Path(
+ sourceCatalog
+ .getTable(
+
Identifier.fromString(cloneFileInfo.getSourceIdentifier()))
+ .options()
+ .get(PATH.key()));
Path targetTableRootPath =
- targetCatalog.getDataTableLocation(
-
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
+ new Path(
+ targetCatalog
+ .getTable(
+
Identifier.fromString(cloneFileInfo.getTargetIdentifier()))
+ .options()
+ .get(PATH.key()));
String filePathExcludeTableRoot =
cloneFileInfo.getFilePathExcludeTableRoot();
Path sourcePath = new Path(sourceTableRootPath +
filePathExcludeTableRoot);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
index 166544eb7..128875a8b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateDatabaseProcedure.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
@@ -47,14 +46,10 @@ public class MigrateDatabaseProcedure extends ProcedureBase
{
String sourceDatabasePath,
String properties)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
- HiveCatalog hiveCatalog = (HiveCatalog) this.catalog;
List<Migrator> migrators =
TableMigrationUtils.getImporters(
connector,
- hiveCatalog,
+ catalog,
sourceDatabasePath,
ParameterUtils.parseCommaSeparatedKeyValues(properties));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index c9a273336..110b4e25f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.migrate.Migrator;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -62,9 +61,6 @@ public class MigrateFileProcedure extends ProcedureBase {
String targetPaimonTablePath,
boolean deleteOrigin)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId =
Identifier.fromString(targetPaimonTablePath);
@@ -76,7 +72,7 @@ public class MigrateFileProcedure extends ProcedureBase {
Migrator importer =
TableMigrationUtils.getImporter(
connector,
- (HiveCatalog) catalog,
+ catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index f721f8925..39e6092d8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -51,10 +50,6 @@ public class MigrateTableProcedure extends ProcedureBase {
String sourceTablePath,
String properties)
throws Exception {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
-
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
Identifier sourceTableId = Identifier.fromString(sourceTablePath);
@@ -62,7 +57,7 @@ public class MigrateTableProcedure extends ProcedureBase {
TableMigrationUtils.getImporter(
connector,
- (HiveCatalog) catalog,
+ catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
index 0c208116b..196adf475 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RepairProcedure.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
@@ -55,10 +54,6 @@ public class RepairProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String identifier)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
- if (!(catalog instanceof HiveCatalog)) {
- throw new IllegalArgumentException("Only support Hive Catalog");
- }
-
if (StringUtils.isBlank(identifier)) {
catalog.repairCatalog();
return new String[] {"Success"};
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index 32d017be6..47655d39a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.utils;
+import org.apache.paimon.catalog.CachingCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
import org.apache.paimon.migrate.Migrator;
@@ -30,7 +32,7 @@ public class TableMigrationUtils {
public static Migrator getImporter(
String connector,
- HiveCatalog paimonCatalog,
+ Catalog catalog,
String sourceDatabase,
String souceTableName,
String targetDatabase,
@@ -38,8 +40,14 @@ public class TableMigrationUtils {
Map<String, String> options) {
switch (connector) {
case "hive":
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive
Catalog");
+ }
return new HiveMigrator(
- paimonCatalog,
+ (HiveCatalog) catalog,
sourceDatabase,
souceTableName,
targetDatabase,
@@ -51,13 +59,17 @@ public class TableMigrationUtils {
}
public static List<Migrator> getImporters(
- String connector,
- HiveCatalog paimonCatalog,
- String sourceDatabase,
- Map<String, String> options) {
+ String connector, Catalog catalog, String sourceDatabase,
Map<String, String> options) {
switch (connector) {
case "hive":
- return HiveMigrator.databaseMigrators(paimonCatalog,
sourceDatabase, options);
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive
Catalog");
+ }
+ return HiveMigrator.databaseMigrators(
+ (HiveCatalog) catalog, sourceDatabase, options);
default:
throw new UnsupportedOperationException("Don't support
connector " + connector);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 0fe417364..bbc827b24 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -19,14 +19,7 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
@@ -38,7 +31,6 @@ import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -440,33 +432,6 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "B"));
}
- @Test
- public void testIgnoreDeleteCompatible() throws Exception {
- sql(
- "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
v STRING) "
- + "WITH ('merge-engine' = 'deduplicate', 'write-only'
= 'true')");
-
- sql("INSERT INTO ignore_delete VALUES (1, 'A')");
- // write delete records
- sql("DELETE FROM ignore_delete WHERE pk = 1");
- assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();
-
- // set ignore-delete and read
- Map<String, String> newOptions = new HashMap<>();
- newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), new Path(path,
"default.db/ignore_delete")),
- new Schema(
- Arrays.asList(
- new DataField(0, "pk",
DataTypes.INT().notNull()),
- new DataField(1, "v", DataTypes.STRING())),
- Collections.emptyList(),
- Collections.singletonList("pk"),
- newOptions,
- null));
- assertThat(sql("SELECT * FROM
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
- }
-
@Test
public void testIgnoreDeleteWithRowKindField() {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index e358fb311..4e1ea424f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLock;
import org.apache.paimon.catalog.CatalogLockContext;
@@ -98,7 +97,7 @@ public class FileSystemCatalogITCase extends AbstractTestBase
{
Identifier identifier = new Identifier(DB_NAME, "t3");
Catalog catalog =
((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
- Path tablePath = ((AbstractCatalog)
catalog).getDataTableLocation(identifier);
+ Path tablePath = new
Path(catalog.getTable(identifier).options().get("path"));
assertThat(tablePath.toString())
.isEqualTo(new File(path, DB_NAME + ".db" + File.separator +
"t3").toString());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
index 587069ada..f95ec3ae4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.log.LogSinkProvider;
@@ -647,9 +646,18 @@ public class FlinkCatalogTest {
CatalogTable t2,
Map<String, String> optionsToAdd,
Set<String> optionsToRemove) {
- Path tablePath =
- ((AbstractCatalog) ((FlinkCatalog) catalog).catalog())
- .getDataTableLocation(FlinkCatalog.toIdentifier(path));
+ Path tablePath;
+ try {
+ tablePath =
+ new Path(
+ ((FlinkCatalog) catalog)
+ .catalog()
+ .getTable(FlinkCatalog.toIdentifier(path))
+ .options()
+ .get(CoreOptions.PATH.key()));
+ } catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
Map<String, String> options = new HashMap<>(t1.getOptions());
options.put("path", tablePath.toString());
options.putAll(optionsToAdd);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index f65a50081..399109137 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -18,14 +18,6 @@
package org.apache.paimon.flink;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.SchemaUtils;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.CommonTestUtils;
@@ -44,9 +36,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -627,37 +617,4 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apple"));
iterator.close();
}
-
- @Test
- public void testIgnoreDeleteCompatible() throws Exception {
- sql(
- "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a STRING, b STRING) WITH ("
- + " 'merge-engine' = 'deduplicate',"
- + " 'write-only' = 'true')");
- sql("INSERT INTO ignore_delete VALUES (1, CAST (NULL AS STRING),
'apple')");
- // write delete records
- sql("DELETE FROM ignore_delete WHERE pk = 1");
- sql("INSERT INTO ignore_delete VALUES (1, 'A', CAST (NULL AS
STRING))");
- assertThat(sql("SELECT * FROM ignore_delete"))
- .containsExactlyInAnyOrder(Row.of(1, "A", null));
-
- // force altering merge engine and read
- Map<String, String> newOptions = new HashMap<>();
- newOptions.put(
- CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
- newOptions.put(CoreOptions.IGNORE_DELETE.key(), "true");
- SchemaUtils.forceCommit(
- new SchemaManager(LocalFileIO.create(), new Path(path,
"default.db/ignore_delete")),
- new Schema(
- Arrays.asList(
- new DataField(0, "pk",
DataTypes.INT().notNull()),
- new DataField(1, "a", DataTypes.STRING()),
- new DataField(2, "b", DataTypes.STRING())),
- Collections.emptyList(),
- Collections.singletonList("pk"),
- newOptions,
- null));
- assertThat(sql("SELECT * FROM ignore_delete"))
- .containsExactlyInAnyOrder(Row.of(1, "A", "apple"));
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 5e7f0108c..1b17ee172 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -51,6 +51,8 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
/** {@link Action} test base. */
public abstract class ActionITCaseBase extends AbstractTestBase {
@@ -70,7 +72,9 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
tableName = "test_table_" + UUID.randomUUID();
commitUser = UUID.randomUUID().toString();
incrementalIdentifier = 0;
- catalog = CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
+ CatalogContext context = CatalogContext.create(new Path(warehouse));
+ context.options().set(CACHE_ENABLED, false);
+ catalog = CatalogFactory.createCatalog(context);
}
@AfterEach
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
index e0c5f5d66..db973b39c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/privilege/PrivilegeProcedureITCase.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure.privilege;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.privilege.FileBasedPrivilegeManager;
@@ -85,7 +84,6 @@ public class PrivilegeProcedureITCase extends
AbstractTestBase {
Catalog paimonCatalog = ((FlinkCatalog) catalog).catalog();
assertThat(paimonCatalog).isInstanceOf(PrivilegedCatalog.class);
PrivilegedCatalog privilegedCatalog = (PrivilegedCatalog)
paimonCatalog;
-
assertThat(privilegedCatalog.wrapped()).isInstanceOf(FileSystemCatalog.class);
assertThat(privilegedCatalog.privilegeManager())
.isInstanceOf(FileBasedPrivilegeManager.class);
assertThat(privilegedCatalog.privilegeManager().privilegeEnabled()).isTrue();
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 1bbf4b387..e9fc9f32c 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -36,9 +36,6 @@ import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.OptionsUtils;
-import org.apache.paimon.privilege.FileBasedPrivilegeManager;
-import org.apache.paimon.privilege.PrivilegeManager;
-import org.apache.paimon.privilege.PrivilegedCatalog;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
@@ -875,25 +872,12 @@ public class HiveCatalog extends AbstractCatalog {
throw new UncheckedIOException(e);
}
- Catalog catalog =
- new HiveCatalog(
- fileIO,
- hiveConf,
- options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
- options,
- warehouse.toUri().toString());
-
- PrivilegeManager privilegeManager =
- new FileBasedPrivilegeManager(
- warehouse.toString(),
- fileIO,
- context.options().get(PrivilegedCatalog.USER),
- context.options().get(PrivilegedCatalog.PASSWORD));
- if (privilegeManager.privilegeEnabled()) {
- catalog = new PrivilegedCatalog(catalog, privilegeManager);
- }
-
- return catalog;
+ return new HiveCatalog(
+ fileIO,
+ hiveConf,
+ options.get(HiveCatalogFactory.METASTORE_CLIENT_CLASS),
+ options,
+ warehouse.toUri().toString());
}
public static HiveConf createHiveConf(CatalogContext context) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index b04dce2fa..dd3efc724 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -219,6 +219,14 @@ public class SparkCatalog extends SparkBaseCatalog {
}
}
+ @Override
+ public void invalidateTable(Identifier ident) {
+ try {
+ catalog.invalidateTable(toIdentifier(ident));
+ } catch (NoSuchTableException ignored) {
+ }
+ }
+
@Override
public SparkTable loadTable(Identifier ident) throws NoSuchTableException {
return loadSparkTable(ident, Collections.emptyMap());
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
index 4a45b365a..992f17253 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/TableMigrationUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.utils;
+import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
@@ -30,7 +31,7 @@ public class TableMigrationUtils {
public static Migrator getImporter(
String connector,
- Catalog paimonCatalog,
+ Catalog catalog,
String sourceDatabase,
String sourceTableName,
String targetDatabase,
@@ -38,9 +39,14 @@ public class TableMigrationUtils {
Map<String, String> options) {
switch (connector) {
case "hive":
- assert paimonCatalog instanceof HiveCatalog;
+ if (catalog instanceof CachingCatalog) {
+ catalog = ((CachingCatalog) catalog).wrapped();
+ }
+ if (!(catalog instanceof HiveCatalog)) {
+ throw new IllegalArgumentException("Only support Hive
Catalog");
+ }
return new HiveMigrator(
- (HiveCatalog) paimonCatalog,
+ (HiveCatalog) catalog,
sourceDatabase,
sourceTableName,
targetDatabase,
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index 55ce7c9aa..f12a3d8fa 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -59,6 +59,7 @@ import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -88,6 +89,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
Options options = new Options();
options.set(WAREHOUSE,
spark.conf().get("spark.sql.catalog.paimon.warehouse"));
+ options.set(CACHE_ENABLED, false);
fileSystemCatalog =
(FileSystemCatalog)
CatalogFactory.createCatalog(CatalogContext.create(options));
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index b8115132d..983dd037f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.catalog.{Catalog, CatalogContext, CatalogFactory,
Identifier}
-import org.apache.paimon.options.Options
+import org.apache.paimon.options.{CatalogOptions, Options}
import org.apache.paimon.spark.catalog.Catalogs
import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
import org.apache.paimon.spark.sql.{SparkVersionSupport, WithTableOptions}
@@ -36,6 +36,8 @@ import org.scalactic.source.Position
import org.scalatest.Tag
import java.io.File
+import java.util
+import java.util.{HashMap => JHashMap}
import java.util.TimeZone
import scala.util.Random
@@ -64,6 +66,7 @@ class PaimonSparkTestBase
super.sparkConf
.set("spark.sql.catalog.paimon", classOf[SparkCatalog].getName)
.set("spark.sql.catalog.paimon.warehouse", tempDBDir.getCanonicalPath)
+ .set("spark.sql.catalog.paimon.cache-enabled", "false")
.set("spark.sql.extensions",
classOf[PaimonSparkSessionExtensions].getName)
.set("spark.serializer", serializer)
}
@@ -122,7 +125,9 @@ class PaimonSparkTestBase
private def initCatalog(): Catalog = {
val currentCatalog =
spark.sessionState.catalogManager.currentCatalog.name()
- val options = Catalogs.catalogOptions(currentCatalog,
spark.sessionState.conf)
+ val options =
+ new JHashMap[String, String](Catalogs.catalogOptions(currentCatalog,
spark.sessionState.conf))
+ options.put(CatalogOptions.CACHE_ENABLED.key(), "false")
val catalogContext =
CatalogContext.create(Options.fromMap(options),
spark.sessionState.newHadoopConf())
CatalogFactory.createCatalog(catalogContext)