This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 406c4e58d7 [core] Add supportsVersionManagement to Catalog (#5326)
406c4e58d7 is described below
commit 406c4e58d7dac8a8dc50e1e04a8e13c1f2cfb5b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 21 23:15:34 2025 +0800
[core] Add supportsVersionManagement to Catalog (#5326)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 25 +++++
.../java/org/apache/paimon/catalog/Catalog.java | 117 ++++++++++++---------
.../paimon/catalog/CatalogSnapshotCommit.java | 31 ++----
.../org/apache/paimon/catalog/CatalogUtils.java | 3 +-
.../org/apache/paimon/catalog/DelegateCatalog.java | 5 +
.../java/org/apache/paimon/rest/RESTCatalog.java | 31 ++++++
.../paimon/table/AbstractFileStoreTable.java | 12 +--
.../apache/paimon/table/CatalogEnvironment.java | 17 ++-
.../apache/paimon/utils/CatalogBranchManager.java | 37 ++-----
.../paimon/operation/PartitionExpireTest.java | 2 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 3 +-
11 files changed, 163 insertions(+), 120 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 498e13e38b..18e2f8141e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
@@ -479,6 +480,30 @@ public abstract class AbstractCatalog implements Catalog {
throw new UnsupportedOperationException();
}
+ @Override
+ public boolean supportsVersionManagement() {
+ return false;
+ }
+
+ @Override
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {}
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ Table table = getTable(identifier);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
+ throws TableNotExistException {}
+
/**
* Create a {@link FormatTable} identified by the given {@link Identifier}.
*
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index fa2bc0ea80..3698f7a670 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -28,7 +28,6 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
-import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.view.View;
import javax.annotation.Nullable;
@@ -464,7 +463,60 @@ public interface Catalog extends AutoCloseable {
throw new UnsupportedOperationException();
}
- // ==================== Branch methods ==========================
+ // ==================== Version management methods
==========================
+
+ /**
+ * Whether this catalog supports version management for tables. If not,
corresponding methods
+ * will throw an {@link UnsupportedOperationException}, affect the
following methods:
+ *
+ * <ul>
+ * <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
+ * <li>{@link #loadSnapshot(Identifier)}.
+ * <li>{@link #rollbackTo(Identifier, Instant)}.
+ * <li>{@link #createBranch(Identifier, String, String)}.
+ * <li>{@link #dropBranch(Identifier, String)}.
+ * <li>{@link #listBranches(Identifier)}.
+ * </ul>
+ */
+ boolean supportsVersionManagement();
+
+ /**
+ * Commit the {@link Snapshot} for table identified by the given {@link
Identifier}.
+ *
+ * @param identifier Path of the table
+ * @param snapshot Snapshot to be committed
+ * @param statistics statistics information of this change
+ * @return Success or not
+ * @throws Catalog.TableNotExistException if the target does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ boolean commitSnapshot(
+ Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
+ throws Catalog.TableNotExistException;
+
+ /**
+ * Return the snapshot of table identified by the given {@link Identifier}.
+ *
+ * @param identifier Path of the table
+ * @return The requested snapshot of the table
+ * @throws Catalog.TableNotExistException if the target does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ Optional<TableSnapshot> loadSnapshot(Identifier identifier)
+ throws Catalog.TableNotExistException;
+
+ /**
+ * rollback table by the given {@link Identifier} and instant.
+ *
+ * @param identifier path of the table
+ * @param instant like snapshotId or tagName
+ * @throws Catalog.TableNotExistException if the table does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ void rollbackTo(Identifier identifier, Instant instant) throws
Catalog.TableNotExistException;
/**
* Create a new branch for this table. By default, an empty branch will be
created using the
@@ -477,6 +529,8 @@ public interface Catalog extends AutoCloseable {
* @throws TableNotExistException if the table in identifier doesn't exist
* @throws BranchAlreadyExistException if the branch already exists
* @throws TagNotExistException if the tag doesn't exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
*/
void createBranch(Identifier identifier, String branch, @Nullable String
fromTag)
throws TableNotExistException, BranchAlreadyExistException,
TagNotExistException;
@@ -487,6 +541,8 @@ public interface Catalog extends AutoCloseable {
* @param identifier path of the table, cannot be system or branch name.
* @param branch the branch name
* @throws BranchNotExistException if the branch doesn't exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
*/
void dropBranch(Identifier identifier, String branch) throws
BranchNotExistException;
@@ -496,6 +552,8 @@ public interface Catalog extends AutoCloseable {
* @param identifier path of the table, cannot be system or branch name.
* @param branch the branch name
* @throws BranchNotExistException if the branch doesn't exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
*/
void fastForward(Identifier identifier, String branch) throws
BranchNotExistException;
@@ -504,43 +562,11 @@ public interface Catalog extends AutoCloseable {
*
* @param identifier path of the table, cannot be system or branch name.
* @throws TableNotExistException if the table in identifier doesn't exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
*/
List<String> listBranches(Identifier identifier) throws
TableNotExistException;
- // ==================== Snapshot Operations ==========================
-
- /**
- * Commit the {@link Snapshot} for table identified by the given {@link
Identifier}.
- *
- * @param identifier Path of the table
- * @param snapshot Snapshot to be committed
- * @param statistics statistics information of this change
- * @return Success or not
- * @throws Catalog.TableNotExistException if the target does not exist
- */
- boolean commitSnapshot(
- Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
- throws Catalog.TableNotExistException;
-
- /**
- * Return the snapshot of table identified by the given {@link Identifier}.
- *
- * @param identifier Path of the table
- * @return The requested snapshot of the table
- * @throws Catalog.TableNotExistException if the target does not exist
- */
- Optional<TableSnapshot> loadSnapshot(Identifier identifier)
- throws Catalog.TableNotExistException;
-
- /**
- * rollback table by the given {@link Identifier} and instant.
- *
- * @param identifier path of the table
- * @param instant like snapshotId or tagName
- * @throws Catalog.TableNotExistException if the table does not exist
- */
- void rollbackTo(Identifier identifier, Instant instant) throws
Catalog.TableNotExistException;
-
// ==================== Partition Modifications ==========================
/**
@@ -550,8 +576,8 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be created
* @throws TableNotExistException if the table does not exist
*/
- default void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {}
+ void createPartitions(Identifier identifier, List<Map<String, String>>
partitions)
+ throws TableNotExistException;
/**
* Drop partitions of the specify table. Ignore non-existent partitions.
@@ -560,15 +586,8 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be deleted
* @throws TableNotExistException if the table does not exist
*/
- default void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
- throws TableNotExistException {
- Table table = getTable(identifier);
- try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
- commit.truncatePartitions(partitions);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ void dropPartitions(Identifier identifier, List<Map<String, String>>
partitions)
+ throws TableNotExistException;
/**
* Alter partitions of the specify table. For non-existent partitions,
partitions will be
@@ -578,8 +597,8 @@ public interface Catalog extends AutoCloseable {
* @param partitions partitions to be altered
* @throws TableNotExistException if the table does not exist
*/
- default void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
- throws TableNotExistException {}
+ void alterPartitions(Identifier identifier, List<PartitionStatistics>
partitions)
+ throws TableNotExistException;
// ==================== Catalog Information ==========================
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
index 491cd22d88..62e5b58ac5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
@@ -22,8 +22,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.util.List;
/** A {@link SnapshotCommit} using {@link Catalog} to commit. */
@@ -31,25 +29,18 @@ public class CatalogSnapshotCommit implements
SnapshotCommit {
private final Catalog catalog;
private final Identifier identifier;
- private final RenamingSnapshotCommit renamingCommit;
- public CatalogSnapshotCommit(
- Catalog catalog, Identifier identifier, RenamingSnapshotCommit
renamingCommit) {
+ public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
this.catalog = catalog;
this.identifier = identifier;
- this.renamingCommit = renamingCommit;
}
@Override
public boolean commit(Snapshot snapshot, String branch,
List<PartitionStatistics> statistics)
throws Exception {
- try {
- Identifier newIdentifier =
- new Identifier(identifier.getDatabaseName(),
identifier.getTableName(), branch);
- return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
- } catch (UnsupportedOperationException e) {
- return renamingCommit.commit(snapshot, branch, statistics);
- }
+ Identifier newIdentifier =
+ new Identifier(identifier.getDatabaseName(),
identifier.getTableName(), branch);
+ return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
}
@Override
@@ -63,24 +54,14 @@ public class CatalogSnapshotCommit implements
SnapshotCommit {
private static final long serialVersionUID = 1L;
private final CatalogLoader catalogLoader;
- @Nullable private final CatalogLockFactory lockFactory;
- @Nullable private final CatalogLockContext lockContext;
- public Factory(
- CatalogLoader catalogLoader,
- @Nullable CatalogLockFactory lockFactory,
- @Nullable CatalogLockContext lockContext) {
+ public Factory(CatalogLoader catalogLoader) {
this.catalogLoader = catalogLoader;
- this.lockFactory = lockFactory;
- this.lockContext = lockContext;
}
@Override
public SnapshotCommit create(Identifier identifier, SnapshotManager
snapshotManager) {
- RenamingSnapshotCommit renamingCommit =
- new RenamingSnapshotCommit.Factory(lockFactory,
lockContext)
- .create(identifier, snapshotManager);
- return new CatalogSnapshotCommit(catalogLoader.load(), identifier,
renamingCommit);
+ return new CatalogSnapshotCommit(catalogLoader.load(), identifier);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 416941e6a6..bdbfe41587 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -194,7 +194,8 @@ public class CatalogUtils {
metadata.uuid(),
catalog.catalogLoader(),
lockFactory,
- lockContext);
+ lockContext,
+ catalog.supportsVersionManagement());
Path path = new Path(schema.options().get(PATH.key()));
FileStoreTable table =
FileStoreTableFactory.create(dataFileIO.apply(path), path,
schema, catalogEnv);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 39c8914192..756881af17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -135,6 +135,11 @@ public abstract class DelegateCatalog implements Catalog {
wrapped.alterTable(identifier, changes, ignoreIfNotExists);
}
+ @Override
+ public boolean supportsVersionManagement() {
+ return wrapped.supportsVersionManagement();
+ }
+
@Override
public Optional<TableSnapshot> loadSnapshot(Identifier identifier)
throws TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 48b665cf3a..7ea1f84a2c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -76,6 +76,7 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
@@ -383,6 +384,11 @@ public class RESTCatalog implements Catalog {
return Optional.of(response.getSnapshot());
}
+ @Override
+ public boolean supportsVersionManagement() {
+ return true;
+ }
+
@Override
public boolean commitSnapshot(
Identifier identifier, Snapshot snapshot,
List<PartitionStatistics> statistics)
@@ -727,6 +733,31 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public void createPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ // partitions of the REST Catalog server are automatically calculated
and do not require
+ // special creating.
+ }
+
+ @Override
+ public void dropPartitions(Identifier identifier, List<Map<String,
String>> partitions)
+ throws TableNotExistException {
+ Table table = getTable(identifier);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void alterPartitions(Identifier identifier,
List<PartitionStatistics> partitions)
+ throws TableNotExistException {
+ // The partition statistics of the REST Catalog server are
automatically calculated and do
+ // not require special reporting.
+ }
+
@Override
public View getView(Identifier identifier) throws ViewNotExistException {
try {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4a4aa88cd9..2e9647f9d5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -711,14 +711,12 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public BranchManager branchManager() {
- FileSystemBranchManager branchManager =
- new FileSystemBranchManager(
- fileIO, path, snapshotManager(), tagManager(),
schemaManager());
- if (catalogEnvironment.catalogLoader() != null) {
- return new CatalogBranchManager(
- catalogEnvironment.catalogLoader(), identifier(),
branchManager);
+ if (catalogEnvironment.catalogLoader() != null
+ && catalogEnvironment.supportsVersionManagement()) {
+ return new
CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
}
- return branchManager;
+ return new FileSystemBranchManager(
+ fileIO, path, snapshotManager(), tagManager(),
schemaManager());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index ef7e18be2c..3d7cb3c248 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -44,22 +44,25 @@ public class CatalogEnvironment implements Serializable {
@Nullable private final CatalogLoader catalogLoader;
@Nullable private final CatalogLockFactory lockFactory;
@Nullable private final CatalogLockContext lockContext;
+ private final boolean supportsVersionManagement;
public CatalogEnvironment(
@Nullable Identifier identifier,
@Nullable String uuid,
@Nullable CatalogLoader catalogLoader,
@Nullable CatalogLockFactory lockFactory,
- @Nullable CatalogLockContext lockContext) {
+ @Nullable CatalogLockContext lockContext,
+ boolean supportsVersionManagement) {
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
this.lockFactory = lockFactory;
this.lockContext = lockContext;
+ this.supportsVersionManagement = supportsVersionManagement;
}
public static CatalogEnvironment empty() {
- return new CatalogEnvironment(null, null, null, null, null);
+ return new CatalogEnvironment(null, null, null, null, null, false);
}
@Nullable
@@ -81,13 +84,17 @@ public class CatalogEnvironment implements Serializable {
return PartitionHandler.create(catalog, identifier);
}
+ public boolean supportsVersionManagement() {
+ return supportsVersionManagement;
+ }
+
@Nullable
public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
SnapshotCommit.Factory factory;
- if (catalogLoader == null) {
- factory = new RenamingSnapshotCommit.Factory(lockFactory,
lockContext);
+ if (catalogLoader != null && supportsVersionManagement) {
+ factory = new CatalogSnapshotCommit.Factory(catalogLoader);
} else {
- factory = new CatalogSnapshotCommit.Factory(catalogLoader,
lockFactory, lockContext);
+ factory = new RenamingSnapshotCommit.Factory(lockFactory,
lockContext);
}
return factory.create(identifier, snapshotManager);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index 5ba283c0d5..d87f97badd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -31,15 +31,10 @@ public class CatalogBranchManager implements BranchManager {
private final CatalogLoader catalogLoader;
private final Identifier identifier;
- private final FileSystemBranchManager branchManager;
- public CatalogBranchManager(
- CatalogLoader catalogLoader,
- Identifier identifier,
- FileSystemBranchManager branchManager) {
+ public CatalogBranchManager(CatalogLoader catalogLoader, Identifier
identifier) {
this.catalogLoader = catalogLoader;
this.identifier = identifier;
- this.branchManager = branchManager;
}
private void executePost(ThrowingConsumer<Catalog, Exception> func) {
@@ -62,46 +57,26 @@ public class CatalogBranchManager implements BranchManager {
@Override
public void createBranch(String branchName) {
- try {
- executePost(catalog -> catalog.createBranch(identifier,
branchName, null));
- } catch (UnsupportedOperationException e) {
- branchManager.createBranch(branchName);
- }
+ executePost(catalog -> catalog.createBranch(identifier, branchName,
null));
}
@Override
public void createBranch(String branchName, @Nullable String tagName) {
- try {
- executePost(catalog -> catalog.createBranch(identifier,
branchName, tagName));
- } catch (UnsupportedOperationException e) {
- branchManager.createBranch(branchName, tagName);
- }
+ executePost(catalog -> catalog.createBranch(identifier, branchName,
tagName));
}
@Override
public void dropBranch(String branchName) {
- try {
- executePost(catalog -> catalog.dropBranch(identifier, branchName));
- } catch (UnsupportedOperationException e) {
- branchManager.dropBranch(branchName);
- }
+ executePost(catalog -> catalog.dropBranch(identifier, branchName));
}
@Override
public void fastForward(String branchName) {
- try {
- executePost(catalog -> catalog.fastForward(identifier,
branchName));
- } catch (UnsupportedOperationException e) {
- branchManager.fastForward(branchName);
- }
+ executePost(catalog -> catalog.fastForward(identifier, branchName));
}
@Override
public List<String> branches() {
- try {
- return executeGet(catalog -> catalog.listBranches(identifier));
- } catch (UnsupportedOperationException e) {
- return branchManager.branches();
- }
+ return executeGet(catalog -> catalog.listBranches(identifier));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 2f835440aa..aba37d4cce 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -134,7 +134,7 @@ public class PartitionExpireTest {
};
CatalogEnvironment env =
- new CatalogEnvironment(null, null, null, null, null) {
+ new CatalogEnvironment(null, null, null, null, null, false) {
@Override
public PartitionHandler partitionHandler() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 62382e10b1..98e11830a2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -1546,7 +1546,8 @@ public class RESTCatalogServer {
tableMetadata.uuid(),
catalog.catalogLoader(),
catalog.lockFactory().orElse(null),
- catalog.lockContext().orElse(null));
+ catalog.lockContext().orElse(null),
+ false);
Path path = new Path(schema.options().get(PATH.key()));
FileIO dataFileIO = catalog.fileIO();
FileStoreTable table =