This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 16a405882c [core] Improve object table for fileIO, Privileged and
parent_path (#4575)
16a405882c is described below
commit 16a405882cd8f0ca9afbe8566b919aef930144ef
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Nov 22 19:14:47 2024 +0800
[core] Improve object table for fileIO, Privileged and parent_path (#4575)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 46 ++++++++---
.../apache/paimon/privilege/PrivilegedCatalog.java | 2 +-
.../paimon/privilege/PrivilegedFileStoreTable.java | 78 ++++++++++--------
.../paimon/privilege/PrivilegedObjectTable.java | 92 ++++++++++++++++++++++
.../apache/paimon/table/FileStoreTableFactory.java | 16 +---
.../apache/paimon/table/object/ObjectRefresh.java | 11 +--
.../apache/paimon/table/object/ObjectTable.java | 55 ++++++++++---
.../org/apache/paimon/flink/ObjectTableITCase.java | 27 +++++++
8 files changed, 252 insertions(+), 75 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index fff593aabb..d3a8d628a2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -19,6 +19,7 @@
package org.apache.paimon.catalog;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -66,6 +67,7 @@ import static
org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
@@ -430,17 +432,39 @@ public abstract class AbstractCatalog implements Catalog {
protected Table getDataOrFormatTable(Identifier identifier) throws
TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableMeta tableMeta = getDataTableMeta(identifier);
- return FileStoreTableFactory.create(
- fileIO,
- getTableLocation(identifier),
- tableMeta.schema,
- new CatalogEnvironment(
- identifier,
- tableMeta.uuid,
- Lock.factory(
- lockFactory().orElse(null),
lockContext().orElse(null), identifier),
- metastoreClientFactory(identifier,
tableMeta.schema).orElse(null),
- lineageMetaFactory));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ fileIO,
+ getTableLocation(identifier),
+ tableMeta.schema,
+ new CatalogEnvironment(
+ identifier,
+ tableMeta.uuid,
+ Lock.factory(
+ lockFactory().orElse(null),
+ lockContext().orElse(null),
+ identifier),
+ metastoreClientFactory(identifier,
tableMeta.schema).orElse(null),
+ lineageMetaFactory));
+ CoreOptions options = table.coreOptions();
+ if (options.type() == TableType.OBJECT_TABLE) {
+ String objectLocation = options.objectLocation();
+ checkNotNull(objectLocation, "Object location should not be null
for object table.");
+ table =
+ ObjectTable.builder()
+ .underlyingTable(table)
+ .objectLocation(objectLocation)
+ .objectFileIO(objectFileIO(objectLocation))
+ .build();
+ }
+ return table;
+ }
+
+ /**
+ * Catalog implementation may override this method to provide {@link
FileIO} to object table.
+ */
+ protected FileIO objectFileIO(String objectLocation) {
+ return fileIO;
}
/**
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index c9b9c21937..2e88213a24 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -127,7 +127,7 @@ public class PrivilegedCatalog extends DelegateCatalog {
public Table getTable(Identifier identifier) throws TableNotExistException
{
Table table = wrapped.getTable(identifier);
if (table instanceof FileStoreTable) {
- return new PrivilegedFileStoreTable(
+ return PrivilegedFileStoreTable.wrap(
(FileStoreTable) table,
privilegeManager.getPrivilegeChecker(), identifier);
} else {
return table;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 37990ed5a1..52c806c7c5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
@@ -48,10 +49,10 @@ import java.util.OptionalLong;
/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {
- private final PrivilegeChecker privilegeChecker;
- private final Identifier identifier;
+ protected final PrivilegeChecker privilegeChecker;
+ protected final Identifier identifier;
- public PrivilegedFileStoreTable(
+ protected PrivilegedFileStoreTable(
FileStoreTable wrapped, PrivilegeChecker privilegeChecker,
Identifier identifier) {
super(wrapped);
this.privilegeChecker = privilegeChecker;
@@ -106,18 +107,6 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
return wrapped.statistics();
}
- @Override
- public FileStoreTable copy(Map<String, String> dynamicOptions) {
- return new PrivilegedFileStoreTable(
- wrapped.copy(dynamicOptions), privilegeChecker, identifier);
- }
-
- @Override
- public FileStoreTable copy(TableSchema newTableSchema) {
- return new PrivilegedFileStoreTable(
- wrapped.copy(newTableSchema), privilegeChecker, identifier);
- }
-
@Override
public void rollbackTo(long snapshotId) {
privilegeChecker.assertCanInsert(identifier);
@@ -202,18 +191,6 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
return wrapped.newExpireChangelog();
}
- @Override
- public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
- return new PrivilegedFileStoreTable(
- wrapped.copyWithoutTimeTravel(dynamicOptions),
privilegeChecker, identifier);
- }
-
- @Override
- public FileStoreTable copyWithLatestSchema() {
- return new PrivilegedFileStoreTable(
- wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
- }
-
@Override
public DataTableScan newScan() {
privilegeChecker.assertCanSelect(identifier);
@@ -262,11 +239,7 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
return wrapped.newLocalTableQuery();
}
- @Override
- public FileStoreTable switchToBranch(String branchName) {
- return new PrivilegedFileStoreTable(
- wrapped.switchToBranch(branchName), privilegeChecker,
identifier);
- }
+ // ======================= equals ============================
@Override
public boolean equals(Object o) {
@@ -281,4 +254,45 @@ public class PrivilegedFileStoreTable extends
DelegatedFileStoreTable {
&& Objects.equals(privilegeChecker, that.privilegeChecker)
&& Objects.equals(identifier, that.identifier);
}
+
+ // ======================= copy ============================
+
+ @Override
+ public PrivilegedFileStoreTable copy(Map<String, String> dynamicOptions) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copy(dynamicOptions), privilegeChecker, identifier);
+ }
+
+ @Override
+ public PrivilegedFileStoreTable copy(TableSchema newTableSchema) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copy(newTableSchema), privilegeChecker, identifier);
+ }
+
+ @Override
+ public PrivilegedFileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new PrivilegedFileStoreTable(
+ wrapped.copyWithoutTimeTravel(dynamicOptions),
privilegeChecker, identifier);
+ }
+
+ @Override
+ public PrivilegedFileStoreTable copyWithLatestSchema() {
+ return new PrivilegedFileStoreTable(
+ wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
+ }
+
+ @Override
+ public PrivilegedFileStoreTable switchToBranch(String branchName) {
+ return new PrivilegedFileStoreTable(
+ wrapped.switchToBranch(branchName), privilegeChecker,
identifier);
+ }
+
+ public static PrivilegedFileStoreTable wrap(
+ FileStoreTable table, PrivilegeChecker privilegeChecker,
Identifier identifier) {
+ if (table instanceof ObjectTable) {
+ return new PrivilegedObjectTable((ObjectTable) table,
privilegeChecker, identifier);
+ } else {
+ return new PrivilegedFileStoreTable(table, privilegeChecker,
identifier);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
new file mode 100644
index 0000000000..c5a319c1fe
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.privilege;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.object.ObjectTable;
+
+import java.util.Map;
+
+/** A {@link PrivilegedFileStoreTable} for {@link ObjectTable}. */
+public class PrivilegedObjectTable extends PrivilegedFileStoreTable implements
ObjectTable {
+
+ private final ObjectTable objectTable;
+
+ protected PrivilegedObjectTable(
+ ObjectTable wrapped, PrivilegeChecker privilegeChecker, Identifier
identifier) {
+ super(wrapped, privilegeChecker, identifier);
+ this.objectTable = wrapped;
+ }
+
+ @Override
+ public String objectLocation() {
+ return objectTable.objectLocation();
+ }
+
+ @Override
+ public FileStoreTable underlyingTable() {
+ return objectTable.underlyingTable();
+ }
+
+ @Override
+ public FileIO objectFileIO() {
+ return objectTable.objectFileIO();
+ }
+
+ @Override
+ public long refresh() {
+ privilegeChecker.assertCanInsert(identifier);
+ return objectTable.refresh();
+ }
+
+ // ======================= copy ============================
+
+ @Override
+ public PrivilegedObjectTable copy(Map<String, String> dynamicOptions) {
+ return new PrivilegedObjectTable(
+ objectTable.copy(dynamicOptions), privilegeChecker,
identifier);
+ }
+
+ @Override
+ public PrivilegedObjectTable copy(TableSchema newTableSchema) {
+ return new PrivilegedObjectTable(
+ objectTable.copy(newTableSchema), privilegeChecker,
identifier);
+ }
+
+ @Override
+ public PrivilegedObjectTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new PrivilegedObjectTable(
+ objectTable.copyWithoutTimeTravel(dynamicOptions),
privilegeChecker, identifier);
+ }
+
+ @Override
+ public PrivilegedObjectTable copyWithLatestSchema() {
+ return new PrivilegedObjectTable(
+ objectTable.copyWithLatestSchema(), privilegeChecker,
identifier);
+ }
+
+ @Override
+ public PrivilegedObjectTable switchToBranch(String branchName) {
+ return new PrivilegedObjectTable(
+ objectTable.switchToBranch(branchName), privilegeChecker,
identifier);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 47d8777241..423dc17263 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -19,14 +19,12 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.TableType;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.StringUtils;
import java.io.IOException;
@@ -35,7 +33,6 @@ import java.util.Optional;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
@@ -127,17 +124,6 @@ public class FileStoreTableFactory {
fileIO, tablePath, tableSchema,
catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema,
catalogEnvironment);
- table = table.copy(dynamicOptions.toMap());
- CoreOptions options = table.coreOptions();
- if (options.type() == TableType.OBJECT_TABLE) {
- String objectLocation = options.objectLocation();
- checkNotNull(objectLocation, "Object location should not be null
for object table.");
- table =
- ObjectTable.builder()
- .underlyingTable(table)
- .objectLocation(objectLocation)
- .build();
- }
- return table;
+ return table.copy(dynamicOptions.toMap());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
index 326efbc0ea..b1be840c51 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
@@ -26,7 +26,6 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -41,13 +40,14 @@ public class ObjectRefresh {
public static long refresh(ObjectTable table) throws Exception {
String location = table.objectLocation();
- FileStoreTable underlyingTable = table.underlyingTable();
- FileIO fileIO = underlyingTable.fileIO();
+ // 1. collect all files for object table
List<FileStatus> fileCollector = new ArrayList<>();
- listAllFiles(fileIO, new Path(location), fileCollector);
+ listAllFiles(table.objectFileIO(), new Path(location), fileCollector);
- BatchWriteBuilder writeBuilder =
underlyingTable.newBatchWriteBuilder().withOverwrite();
+ // 2. write to underlying table
+ BatchWriteBuilder writeBuilder =
+ table.underlyingTable().newBatchWriteBuilder().withOverwrite();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
for (FileStatus file : fileCollector) {
@@ -78,6 +78,7 @@ public class ObjectRefresh {
private static InternalRow toRow(FileStatus file) {
return toRow(
file.getPath().toString(),
+ file.getPath().getParent().toString(),
file.getPath().getName(),
file.getLen(),
Timestamp.fromEpochMillis(file.getModificationTime()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
index 65689108ca..97acfe7299 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.object;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DelegatedFileStoreTable;
@@ -46,6 +47,7 @@ public interface ObjectTable extends FileStoreTable {
RowType SCHEMA =
RowType.builder()
.field("path", DataTypes.STRING().notNull())
+ .field("parent_path", DataTypes.STRING().notNull())
.field("name", DataTypes.STRING().notNull())
.field("length", DataTypes.BIGINT().notNull())
.field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS())
@@ -66,11 +68,26 @@ public interface ObjectTable extends FileStoreTable {
/** Underlying table to store metadata. */
FileStoreTable underlyingTable();
+ /** File io for object file system. */
+ FileIO objectFileIO();
+
long refresh();
@Override
ObjectTable copy(Map<String, String> dynamicOptions);
+ @Override
+ ObjectTable copy(TableSchema newTableSchema);
+
+ @Override
+ ObjectTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
+
+ @Override
+ ObjectTable copyWithLatestSchema();
+
+ @Override
+ ObjectTable switchToBranch(String branchName);
+
/** Create a new builder for {@link ObjectTable}. */
static ObjectTable.Builder builder() {
return new ObjectTable.Builder();
@@ -80,6 +97,7 @@ public interface ObjectTable extends FileStoreTable {
class Builder {
private FileStoreTable underlyingTable;
+ private FileIO objectFileIO;
private String objectLocation;
public ObjectTable.Builder underlyingTable(FileStoreTable
underlyingTable) {
@@ -93,23 +111,31 @@ public interface ObjectTable extends FileStoreTable {
return this;
}
+ public ObjectTable.Builder objectFileIO(FileIO objectFileIO) {
+ this.objectFileIO = objectFileIO;
+ return this;
+ }
+
public ObjectTable.Builder objectLocation(String objectLocation) {
this.objectLocation = objectLocation;
return this;
}
public ObjectTable build() {
- return new ObjectTableImpl(underlyingTable, objectLocation);
+ return new ObjectTableImpl(underlyingTable, objectFileIO,
objectLocation);
}
}
/** An implementation for {@link ObjectTable}. */
class ObjectTableImpl extends DelegatedFileStoreTable implements
ObjectTable {
+ private final FileIO objectFileIO;
private final String objectLocation;
- public ObjectTableImpl(FileStoreTable underlyingTable, String
objectLocation) {
+ public ObjectTableImpl(
+ FileStoreTable underlyingTable, FileIO objectFileIO, String
objectLocation) {
super(underlyingTable);
+ this.objectFileIO = objectFileIO;
this.objectLocation = objectLocation;
}
@@ -148,6 +174,11 @@ public interface ObjectTable extends FileStoreTable {
return wrapped;
}
+ @Override
+ public FileIO objectFileIO() {
+ return objectFileIO;
+ }
+
@Override
public long refresh() {
try {
@@ -159,28 +190,30 @@ public interface ObjectTable extends FileStoreTable {
@Override
public ObjectTable copy(Map<String, String> dynamicOptions) {
- return new ObjectTableImpl(wrapped.copy(dynamicOptions),
objectLocation);
+ return new ObjectTableImpl(wrapped.copy(dynamicOptions),
objectFileIO, objectLocation);
}
@Override
- public FileStoreTable copy(TableSchema newTableSchema) {
- return new ObjectTableImpl(wrapped.copy(newTableSchema),
objectLocation);
+ public ObjectTable copy(TableSchema newTableSchema) {
+ return new ObjectTableImpl(wrapped.copy(newTableSchema),
objectFileIO, objectLocation);
}
@Override
- public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ public ObjectTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
return new ObjectTableImpl(
- wrapped.copyWithoutTimeTravel(dynamicOptions),
objectLocation);
+ wrapped.copyWithoutTimeTravel(dynamicOptions),
objectFileIO, objectLocation);
}
@Override
- public FileStoreTable copyWithLatestSchema() {
- return new ObjectTableImpl(wrapped.copyWithLatestSchema(),
objectLocation);
+ public ObjectTable copyWithLatestSchema() {
+ return new ObjectTableImpl(
+ wrapped.copyWithLatestSchema(), objectFileIO,
objectLocation);
}
@Override
- public FileStoreTable switchToBranch(String branchName) {
- return new ObjectTableImpl(wrapped.switchToBranch(branchName),
objectLocation);
+ public ObjectTable switchToBranch(String branchName) {
+ return new ObjectTableImpl(
+ wrapped.switchToBranch(branchName), objectFileIO,
objectLocation);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
index b9e30035b0..d3ad1d4a52 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
@@ -80,4 +80,31 @@ public class ObjectTableITCase extends CatalogITCaseBase {
.hasMessageContaining("Object table does not support Write.");
assertThat(sql("SELECT name, length FROM
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
}
+
+ @Test
+ public void testObjectTableRefreshInPrivileged() throws IOException {
+ sql("CALL sys.init_file_based_privilege('root-passwd')");
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG rootcat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '%s',\n"
+ + " 'user' = 'root',\n"
+ + " 'password' = 'root-passwd'\n"
+ + ")",
+ path));
+ tEnv.useCatalog("rootcat");
+
+ Path objectLocation = new Path(path + "/object-location");
+ FileIO fileIO = LocalFileIO.create();
+ sql(
+ "CREATE TABLE T WITH ('type' = 'object-table',
'object-location' = '%s')",
+ objectLocation);
+
+ // add new file
+ fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3");
+ sql("CALL sys.refresh_object_table('default.T')");
+ assertThat(sql("SELECT name, length FROM
T")).containsExactlyInAnyOrder(Row.of("f0", 5L));
+ }
}