This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4ccda6fd9 [core] Introduce Object Table to manage unstructured files
(#4459)
4ccda6fd9 is described below
commit 4ccda6fd9e187cf440076b22646e288310ab83cf
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 12 10:43:30 2024 +0800
[core] Introduce Object Table to manage unstructured files (#4459)
---
docs/content/concepts/table-types.md | 68 +++++++-
.../shortcodes/generated/core_configuration.html | 8 +-
.../main/java/org/apache/paimon/CoreOptions.java | 15 ++
.../src/main/java/org/apache/paimon/TableType.java | 7 +-
.../main/java/org/apache/paimon/fs/FileStatus.java | 22 +++
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 10 ++
.../java/org/apache/paimon/types/DataTypes.java | 4 +
.../main/java/org/apache/paimon/types/RowType.java | 7 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 34 +++-
.../main/java/org/apache/paimon/schema/Schema.java | 4 +
.../apache/paimon/table/FileStoreTableFactory.java | 16 +-
.../apache/paimon/table/object/ObjectRefresh.java | 107 ++++++++++++
.../apache/paimon/table/object/ObjectTable.java | 186 +++++++++++++++++++++
.../apache/paimon/oss/HadoopCompliantFileIO.java | 10 ++
.../apache/paimon/s3/HadoopCompliantFileIO.java | 10 ++
.../java/org/apache/paimon/flink/FlinkFileIO.java | 5 +
.../procedure/RefreshObjectTableProcedure.java | 54 ++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../org/apache/paimon/flink/ObjectTableITCase.java | 83 +++++++++
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../procedure/RefreshObjectTableProcedure.java | 85 ++++++++++
.../apache/paimon/spark/sql/ObjectTableTest.scala | 62 +++++++
22 files changed, 790 insertions(+), 10 deletions(-)
diff --git a/docs/content/concepts/table-types.md
b/docs/content/concepts/table-types.md
index 49e447962..0a1ef6481 100644
--- a/docs/content/concepts/table-types.md
+++ b/docs/content/concepts/table-types.md
@@ -33,7 +33,8 @@ Paimon supports table types:
3. view: metastore required, views in SQL are a kind of virtual table
4. format-table: file format table refers to a directory that contains
multiple files of the same format, where
operations on this table allow for reading or writing to these files,
compatible with Hive tables
-5. materialized-table: aimed at simplifying both batch and stream data
pipelines, providing a consistent development
+5. object table: provides metadata indexes for unstructured data objects in
the specified Object Storage storage directory.
+6. materialized-table: aimed at simplifying both batch and stream data
pipelines, providing a consistent development
experience, see [Flink Materialized
Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)
## Table with PK
@@ -169,6 +170,71 @@ CREATE TABLE my_parquet_table (
{{< /tabs >}}
+## Object Table
+
+Object Table provides metadata indexes for unstructured data objects in the
specified Object Storage storage directory.
+Object tables allow users to analyze unstructured data in Object Storage:
+
+1. Use Python API to manipulate these unstructured data, such as converting
images to PDF format.
+2. Model functions can also be used to perform inference, and then the results
of these operations can be concatenated
+ with other structured data in the Catalog.
+
+The object table is managed by Catalog and can also have access permissions
and the ability to manage blood relations.
+
+{{< tabs "object-table" >}}
+
+{{< tab "Flink-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` WITH (
+ 'type' = 'object-table',
+ 'object-location' = 'oss://my_bucket/my_location'
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */;
+```
+
+{{< /tab >}}
+
+{{< tab "Spark-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` TBLPROPERTIES (
+ 'type' = 'object-table',
+ 'object-location' = 'oss://my_bucket/my_location'
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` VERSION AS OF 1;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Materialized Table
Materialized Table aimed at simplifying both batch and stream data pipelines,
providing a consistent development
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 78392c3ef..1305dfe92 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -533,6 +533,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Integer</td>
<td>The number of sorted runs that trigger the stopping of writes,
the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
</tr>
+ <tr>
+ <td><h5>object-location</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The object location for object table.</td>
+ </tr>
<tr>
<td><h5>page-size</h5></td>
<td style="word-wrap: break-word;">64 kb</td>
@@ -910,7 +916,7 @@ If the data size allocated for the sorting task is
uneven,which may lead to perf
<td><h5>type</h5></td>
<td style="word-wrap: break-word;">table</td>
<td><p>Enum</p></td>
- <td>Type of the table.<br /><br />Possible values:<ul><li>"table":
Normal Paimon table.</li><li>"format-table": A file format table refers to a
directory that contains multiple files of the same
format.</li><li>"materialized-table": A materialized table.</li></ul></td>
+ <td>Type of the table.<br /><br />Possible values:<ul><li>"table":
Normal Paimon table.</li><li>"format-table": A file format table refers to a
directory that contains multiple files of the same
format.</li><li>"materialized-table": A materialized table combines normal
Paimon table and materialized SQL.</li><li>"object-table": A object table
combines normal Paimon table and object location.</li></ul></td>
</tr>
<tr>
<td><h5>write-buffer-for-append</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index bde2f1cc3..bb1661d6f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1412,6 +1412,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Whether to enable asynchronous IO writing when
writing files.");
+ public static final ConfigOption<String> OBJECT_LOCATION =
+ key("object-location")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The object location for object table.");
+
@ExcludeFromDocumentation("Only used internally to support materialized
table")
public static final ConfigOption<String>
MATERIALIZED_TABLE_DEFINITION_QUERY =
key("materialized-table.definition-query")
@@ -1523,6 +1529,10 @@ public class CoreOptions implements Serializable {
return new Path(options.get(PATH));
}
+ public TableType type() {
+ return options.get(TYPE);
+ }
+
public String formatType() {
return normalizeFileFormat(options.get(FILE_FORMAT));
}
@@ -1572,6 +1582,11 @@ public class CoreOptions implements Serializable {
return FileFormat.fromIdentifier(formatIdentifier, options);
}
+ public String objectLocation() {
+ checkArgument(type() == TableType.OBJECT_TABLE, "Only object table has
object location!");
+ return options.get(OBJECT_LOCATION);
+ }
+
public Map<Integer, String> fileCompressionPerLevel() {
Map<String, String> levelCompressions =
options.get(FILE_COMPRESSION_PER_LEVEL);
return levelCompressions.entrySet().stream()
diff --git a/paimon-common/src/main/java/org/apache/paimon/TableType.java
b/paimon-common/src/main/java/org/apache/paimon/TableType.java
index d690d5db3..d9ac020f7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/TableType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/TableType.java
@@ -29,7 +29,12 @@ public enum TableType implements DescribedEnum {
FORMAT_TABLE(
"format-table",
"A file format table refers to a directory that contains multiple
files of the same format."),
- MATERIALIZED_TABLE("materialized-table", "A materialized table.");
+ MATERIALIZED_TABLE(
+ "materialized-table",
+ "A materialized table combines normal Paimon table and
materialized SQL."),
+ OBJECT_TABLE(
+ "object-table", "A object table combines normal Paimon table and
object location.");
+
private final String value;
private final String description;
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
index 8308f5205..c3e6cde9c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
@@ -20,6 +20,8 @@ package org.apache.paimon.fs;
import org.apache.paimon.annotation.Public;
+import javax.annotation.Nullable;
+
/**
* Interface that represents the client side information for a file
independent of the file system.
*
@@ -56,4 +58,24 @@ public interface FileStatus {
* milliseconds since the epoch (UTC January 1, 1970).
*/
long getModificationTime();
+
+ /**
+ * Get the last access time of the file.
+ *
+ * @return A long value representing the time the file was last accessed,
measured in
+ * milliseconds since the epoch (UTC January 1, 1970).
+ */
+ default long getAccessTime() {
+ return 0;
+ }
+
+ /**
+ * Returns the owner of this file.
+ *
+ * @return the owner of this file
+ */
+ @Nullable
+ default String getOwner() {
+ return null;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 70325ee69..0a8d64a73 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -329,6 +329,16 @@ public class HadoopFileIO implements FileIO {
public long getModificationTime() {
return status.getModificationTime();
}
+
+ @Override
+ public long getAccessTime() {
+ return status.getAccessTime();
+ }
+
+ @Override
+ public String getOwner() {
+ return status.getOwner();
+ }
}
// ============================== extra methods
===================================
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
index 659212b06..b025b6a83 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
@@ -103,6 +103,10 @@ public class DataTypes {
return new LocalZonedTimestampType(precision);
}
+ public static LocalZonedTimestampType TIMESTAMP_LTZ_MILLIS() {
+ return new LocalZonedTimestampType(3);
+ }
+
public static DecimalType DECIMAL(int precision, int scale) {
return new DecimalType(precision, scale);
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 80e41be71..f3fce0db6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -169,11 +169,16 @@ public final class RowType extends DataType {
}
@Override
- public DataType copy(boolean isNullable) {
+ public RowType copy(boolean isNullable) {
return new RowType(
isNullable,
fields.stream().map(DataField::copy).collect(Collectors.toList()));
}
+ @Override
+ public RowType notNull() {
+ return copy(false);
+ }
+
@Override
public String asSQLString() {
return withNullability(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4c36ad4db..c2e4afe5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,8 +38,10 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -48,6 +50,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -56,7 +59,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
@@ -284,13 +286,35 @@ public abstract class AbstractCatalog implements Catalog {
copyTableDefaultOptions(schema.options());
- if (Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE) {
- createFormatTable(identifier, schema);
- } else {
- createTableImpl(identifier, schema);
+ switch (Options.fromMap(schema.options()).get(TYPE)) {
+ case TABLE:
+ case MATERIALIZED_TABLE:
+ createTableImpl(identifier, schema);
+ break;
+ case OBJECT_TABLE:
+ createObjectTable(identifier, schema);
+ break;
+ case FORMAT_TABLE:
+ createFormatTable(identifier, schema);
+ break;
}
}
+ private void createObjectTable(Identifier identifier, Schema schema) {
+ RowType rowType = schema.rowType();
+ checkArgument(
+ rowType.getFields().isEmpty()
+ || new HashSet<>(ObjectTable.SCHEMA.getFields())
+ .containsAll(rowType.getFields()),
+ "Schema of Object Table can be empty or %s, but is %s.",
+ ObjectTable.SCHEMA,
+ rowType);
+ checkArgument(
+
schema.options().containsKey(CoreOptions.OBJECT_LOCATION.key()),
+ "Object table should have object-location option.");
+ createTableImpl(identifier, schema.copy(ObjectTable.SCHEMA));
+ }
+
protected abstract void createTableImpl(Identifier identifier, Schema
schema);
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index c6c79f4d4..9984e3fee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -96,6 +96,10 @@ public class Schema {
return comment;
}
+ public Schema copy(RowType rowType) {
+ return new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options, comment);
+ }
+
private static List<DataField> normalizeFields(
List<DataField> fields, List<String> primaryKeys, List<String>
partitionKeys) {
List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 423dc1726..47d877724 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -19,12 +19,14 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TableType;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.StringUtils;
import java.io.IOException;
@@ -33,6 +35,7 @@ import java.util.Optional;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
@@ -124,6 +127,17 @@ public class FileStoreTableFactory {
fileIO, tablePath, tableSchema,
catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema,
catalogEnvironment);
- return table.copy(dynamicOptions.toMap());
+ table = table.copy(dynamicOptions.toMap());
+ CoreOptions options = table.coreOptions();
+ if (options.type() == TableType.OBJECT_TABLE) {
+ String objectLocation = options.objectLocation();
+ checkNotNull(objectLocation, "Object location should not be null
for object table.");
+ table =
+ ObjectTable.builder()
+ .underlyingTable(table)
+ .objectLocation(objectLocation)
+ .build();
+ }
+ return table;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
new file mode 100644
index 000000000..326efbc0e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.object;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Util class for refreshing object table. */
+public class ObjectRefresh {
+
+ public static long refresh(ObjectTable table) throws Exception {
+ String location = table.objectLocation();
+ FileStoreTable underlyingTable = table.underlyingTable();
+ FileIO fileIO = underlyingTable.fileIO();
+
+ List<FileStatus> fileCollector = new ArrayList<>();
+ listAllFiles(fileIO, new Path(location), fileCollector);
+
+ BatchWriteBuilder writeBuilder =
underlyingTable.newBatchWriteBuilder().withOverwrite();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ for (FileStatus file : fileCollector) {
+ write.write(toRow(file));
+ }
+ commit.commit(write.prepareCommit());
+ }
+
+ return fileCollector.size();
+ }
+
+ private static void listAllFiles(FileIO fileIO, Path directory,
List<FileStatus> fileCollector)
+ throws IOException {
+ FileStatus[] files = fileIO.listStatus(directory);
+ if (files == null) {
+ return;
+ }
+
+ for (FileStatus file : files) {
+ if (file.isDir()) {
+ listAllFiles(fileIO, file.getPath(), fileCollector);
+ } else {
+ fileCollector.add(file);
+ }
+ }
+ }
+
+ private static InternalRow toRow(FileStatus file) {
+ return toRow(
+ file.getPath().toString(),
+ file.getPath().getName(),
+ file.getLen(),
+ Timestamp.fromEpochMillis(file.getModificationTime()),
+ Timestamp.fromEpochMillis(file.getAccessTime()),
+ file.getOwner(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ new GenericMap(Collections.emptyMap()));
+ }
+
+ public static GenericRow toRow(Object... values) {
+ GenericRow row = new GenericRow(values.length);
+
+ for (int i = 0; i < values.length; ++i) {
+ Object value = values[i];
+ if (value instanceof String) {
+ value = BinaryString.fromString((String) value);
+ }
+ row.setField(i, value);
+ }
+
+ return row;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
new file mode 100644
index 000000000..65689108c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.object;
+
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.DelegatedFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A object table refers to a directory that contains multiple objects
(files), Object table
+ * provides metadata indexes for unstructured data objects in this directory.
Allowing users to
+ * analyze unstructured data in Object Storage.
+ *
+ * <p>Object Table stores the metadata of objects in the underlying table.
+ */
+public interface ObjectTable extends FileStoreTable {
+
+ RowType SCHEMA =
+ RowType.builder()
+ .field("path", DataTypes.STRING().notNull())
+ .field("name", DataTypes.STRING().notNull())
+ .field("length", DataTypes.BIGINT().notNull())
+ .field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS())
+ .field("atime", DataTypes.TIMESTAMP_LTZ_MILLIS())
+ .field("owner", DataTypes.STRING().nullable())
+ .field("generation", DataTypes.INT().nullable())
+ .field("content_type", DataTypes.STRING().nullable())
+ .field("storage_class", DataTypes.STRING().nullable())
+ .field("md5_hash", DataTypes.STRING().nullable())
+ .field("metadata_mtime",
DataTypes.TIMESTAMP_LTZ_MILLIS().nullable())
+ .field("metadata", DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))
+ .build()
+ .notNull();
+
+ /** Object location in file system. */
+ String objectLocation();
+
+ /** Underlying table to store metadata. */
+ FileStoreTable underlyingTable();
+
+ long refresh();
+
+ @Override
+ ObjectTable copy(Map<String, String> dynamicOptions);
+
+ /** Create a new builder for {@link ObjectTable}. */
+ static ObjectTable.Builder builder() {
+ return new ObjectTable.Builder();
+ }
+
+ /** Builder for {@link ObjectTable}. */
+ class Builder {
+
+ private FileStoreTable underlyingTable;
+ private String objectLocation;
+
+ public ObjectTable.Builder underlyingTable(FileStoreTable
underlyingTable) {
+ this.underlyingTable = underlyingTable;
+ checkArgument(
+ new HashSet<>(SCHEMA.getFields())
+
.containsAll(underlyingTable.rowType().getFields()),
+ "Schema of Object Table should be %s, but is %s.",
+ SCHEMA,
+ underlyingTable.rowType());
+ return this;
+ }
+
+ public ObjectTable.Builder objectLocation(String objectLocation) {
+ this.objectLocation = objectLocation;
+ return this;
+ }
+
+ public ObjectTable build() {
+ return new ObjectTableImpl(underlyingTable, objectLocation);
+ }
+ }
+
+ /** An implementation for {@link ObjectTable}. */
+ class ObjectTableImpl extends DelegatedFileStoreTable implements
ObjectTable {
+
+ private final String objectLocation;
+
+ public ObjectTableImpl(FileStoreTable underlyingTable, String
objectLocation) {
+ super(underlyingTable);
+ this.objectLocation = objectLocation;
+ }
+
+ @Override
+ public BatchWriteBuilder newBatchWriteBuilder() {
+ throw new UnsupportedOperationException("Object table does not
support Write.");
+ }
+
+ @Override
+ public StreamWriteBuilder newStreamWriteBuilder() {
+ throw new UnsupportedOperationException("Object table does not
support Write.");
+ }
+
+ @Override
+ public TableWriteImpl<?> newWrite(String commitUser) {
+ throw new UnsupportedOperationException("Object table does not
support Write.");
+ }
+
+ @Override
+ public TableWriteImpl<?> newWrite(String commitUser,
ManifestCacheFilter manifestFilter) {
+ throw new UnsupportedOperationException("Object table does not
support Write.");
+ }
+
+ @Override
+ public TableCommitImpl newCommit(String commitUser) {
+ throw new UnsupportedOperationException("Object table does not
support Commit.");
+ }
+
+ @Override
+ public String objectLocation() {
+ return objectLocation;
+ }
+
+ @Override
+ public FileStoreTable underlyingTable() {
+ return wrapped;
+ }
+
+ @Override
+ public long refresh() {
+ try {
+ return ObjectRefresh.refresh(this);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public ObjectTable copy(Map<String, String> dynamicOptions) {
+ return new ObjectTableImpl(wrapped.copy(dynamicOptions),
objectLocation);
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new ObjectTableImpl(wrapped.copy(newTableSchema),
objectLocation);
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new ObjectTableImpl(
+ wrapped.copyWithoutTimeTravel(dynamicOptions),
objectLocation);
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new ObjectTableImpl(wrapped.copyWithLatestSchema(),
objectLocation);
+ }
+
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ return new ObjectTableImpl(wrapped.switchToBranch(branchName),
objectLocation);
+ }
+ }
+}
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
index 4d86c12a6..67027eaba 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
@@ -286,5 +286,15 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
public long getModificationTime() {
return status.getModificationTime();
}
+
+ @Override
+ public long getAccessTime() {
+ return status.getAccessTime();
+ }
+
+ @Override
+ public String getOwner() {
+ return status.getOwner();
+ }
}
}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index abfe0fabb..80f3df582 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -286,5 +286,15 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
public long getModificationTime() {
return status.getModificationTime();
}
+
+ @Override
+ public long getAccessTime() {
+ return status.getAccessTime();
+ }
+
+ @Override
+ public String getOwner() {
+ return status.getOwner();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index db66379f3..74512409b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -227,5 +227,10 @@ public class FlinkFileIO implements FileIO {
public long getModificationTime() {
return status.getModificationTime();
}
+
+ @Override
+ public long getAccessTime() {
+ return status.getAccessTime();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
new file mode 100644
index 000000000..97eb3095f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.table.object.ObjectTable;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+/**
+ * Refresh Object Table procedure. Usage:
+ *
+ * <pre><code>
+ * CALL sys.refresh_object_table('tableId')
+ * </code></pre>
+ */
+public class RefreshObjectTableProcedure extends ProcedureBase {
+
+ private static final String IDENTIFIER = "refresh_object_table";
+
+ @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
+ public @DataTypeHint("ROW<file_number BIGINT>") Row[] call(
+ ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ ObjectTable table = (ObjectTable) table(tableId);
+ long fileNumber = table.refresh();
+ return new Row[] {Row.of(fileNumber)};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6fe5e74eb..0ff3ac1f1 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -79,3 +79,4 @@ org.apache.paimon.flink.procedure.FastForwardProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CloneProcedure
org.apache.paimon.flink.procedure.CompactManifestProcedure
+org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
new file mode 100644
index 000000000..b9e30035b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for object table. */
+public class ObjectTableITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testIllegalObjectTable() {
+ assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE T (a INT, b INT, c INT)
WITH ('type' = 'object-table')"))
+ .rootCause()
+ .hasMessageContaining("Schema of Object Table can be empty
or");
+ assertThatThrownBy(() -> sql("CREATE TABLE T WITH ('type' =
'object-table')"))
+ .rootCause()
+ .hasMessageContaining("Object table should have
object-location option.");
+ }
+
+ @Test
+ public void testObjectTableRefresh() throws IOException {
+ Path objectLocation = new Path(path + "/object-location");
+ FileIO fileIO = LocalFileIO.create();
+ sql(
+ "CREATE TABLE T WITH ('type' = 'object-table',
'object-location' = '%s')",
+ objectLocation);
+
+ // add new file
+ fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3");
+ sql("CALL sys.refresh_object_table('default.T')");
+ assertThat(sql("SELECT name, length FROM
T")).containsExactlyInAnyOrder(Row.of("f0", 5L));
+
+ // add new file
+ fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6");
+ sql("CALL sys.refresh_object_table('default.T')");
+ assertThat(sql("SELECT name, length FROM T"))
+ .containsExactlyInAnyOrder(Row.of("f0", 5L), Row.of("f1", 5L));
+
+ // delete file
+ fileIO.deleteQuietly(new Path(objectLocation, "f0"));
+ sql("CALL sys.refresh_object_table('default.T')");
+ assertThat(sql("SELECT name, length FROM
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
+
+ // time travel
+ assertThat(sql("SELECT name, length FROM T /*+
OPTIONS('scan.snapshot-id' = '1') */"))
+ .containsExactlyInAnyOrder(Row.of("f0", 5L));
+
+ // insert into
+ assertThatThrownBy(() -> sql("INSERT INTO T SELECT * FROM T"))
+ .rootCause()
+ .hasMessageContaining("Object table does not support Write.");
+ assertThat(sql("SELECT name, length FROM
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index c93aad41a..35b65a7b5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateFileProcedure;
import org.apache.paimon.spark.procedure.MigrateTableProcedure;
import org.apache.paimon.spark.procedure.Procedure;
import org.apache.paimon.spark.procedure.ProcedureBuilder;
+import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
@@ -87,6 +88,7 @@ public class SparkProcedures {
procedureBuilders.put("reset_consumer",
ResetConsumerProcedure::builder);
procedureBuilders.put("mark_partition_done",
MarkPartitionDoneProcedure::builder);
procedureBuilders.put("compact_manifest",
CompactManifestProcedure::builder);
+ procedureBuilders.put("refresh_object_table",
RefreshObjectTableProcedure::builder);
return procedureBuilders.build();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
new file mode 100644
index 000000000..c6b6fdab4
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.object.ObjectTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Spark procedure to refresh Object Table. */
+public class RefreshObjectTableProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("file_number", DataTypes.LongType,
false, Metadata.empty())
+ });
+
+ protected RefreshObjectTableProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ ObjectTable objectTable = (ObjectTable) table;
+ long fileNumber = objectTable.refresh();
+ InternalRow outputRow = newInternalRow(fileNumber);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<RefreshObjectTableProcedure>() {
+ @Override
+ public RefreshObjectTableProcedure doBuild() {
+ return new RefreshObjectTableProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "RefreshObjectTableProcedure";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
new file mode 100644
index 000000000..3a446e33d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.fs.local.LocalFileIO
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class ObjectTableTest extends PaimonSparkTestBase {
+
+ test(s"Paimon object table") {
+ val objectLocation = new Path(tempDBDir + "/object-location")
+ val fileIO = LocalFileIO.create
+
+ spark.sql(s"""
+ |CREATE TABLE T TBLPROPERTIES (
+ | 'type' = 'object-table',
+ | 'object-location' = '$objectLocation'
+ |)
+ |""".stripMargin)
+
+ // add new file
+ fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3")
+ spark.sql("CALL sys.refresh_object_table('test.T')")
+ checkAnswer(
+ spark.sql("SELECT name, length FROM T"),
+ Row("f0", 5L) :: Nil
+ )
+
+ // add new file
+ fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6")
+ spark.sql("CALL sys.refresh_object_table('test.T')")
+ checkAnswer(
+ spark.sql("SELECT name, length FROM T"),
+ Row("f0", 5L) :: Row("f1", 5L) :: Nil
+ )
+
+ // time travel
+ checkAnswer(
+ spark.sql("SELECT name, length FROM T VERSION AS OF 1"),
+ Row("f0", 5L) :: Nil
+ )
+ }
+}