This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ccda6fd9 [core] Introduce Object Table to manage unstructured files 
(#4459)
4ccda6fd9 is described below

commit 4ccda6fd9e187cf440076b22646e288310ab83cf
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 12 10:43:30 2024 +0800

    [core] Introduce Object Table to manage unstructured files (#4459)
---
 docs/content/concepts/table-types.md               |  68 +++++++-
 .../shortcodes/generated/core_configuration.html   |   8 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  15 ++
 .../src/main/java/org/apache/paimon/TableType.java |   7 +-
 .../main/java/org/apache/paimon/fs/FileStatus.java |  22 +++
 .../org/apache/paimon/fs/hadoop/HadoopFileIO.java  |  10 ++
 .../java/org/apache/paimon/types/DataTypes.java    |   4 +
 .../main/java/org/apache/paimon/types/RowType.java |   7 +-
 .../org/apache/paimon/catalog/AbstractCatalog.java |  34 +++-
 .../main/java/org/apache/paimon/schema/Schema.java |   4 +
 .../apache/paimon/table/FileStoreTableFactory.java |  16 +-
 .../apache/paimon/table/object/ObjectRefresh.java  | 107 ++++++++++++
 .../apache/paimon/table/object/ObjectTable.java    | 186 +++++++++++++++++++++
 .../apache/paimon/oss/HadoopCompliantFileIO.java   |  10 ++
 .../apache/paimon/s3/HadoopCompliantFileIO.java    |  10 ++
 .../java/org/apache/paimon/flink/FlinkFileIO.java  |   5 +
 .../procedure/RefreshObjectTableProcedure.java     |  54 ++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../org/apache/paimon/flink/ObjectTableITCase.java |  83 +++++++++
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../procedure/RefreshObjectTableProcedure.java     |  85 ++++++++++
 .../apache/paimon/spark/sql/ObjectTableTest.scala  |  62 +++++++
 22 files changed, 790 insertions(+), 10 deletions(-)

diff --git a/docs/content/concepts/table-types.md 
b/docs/content/concepts/table-types.md
index 49e447962..0a1ef6481 100644
--- a/docs/content/concepts/table-types.md
+++ b/docs/content/concepts/table-types.md
@@ -33,7 +33,8 @@ Paimon supports table types:
 3. view: metastore required, views in SQL are a kind of virtual table
 4. format-table: file format table refers to a directory that contains 
multiple files of the same format, where
    operations on this table allow for reading or writing to these files, 
compatible with Hive tables
-5. materialized-table: aimed at simplifying both batch and stream data 
pipelines, providing a consistent development
+5. object table: provides metadata indexes for unstructured data objects in 
the specified Object Storage storage directory.
+6. materialized-table: aimed at simplifying both batch and stream data 
pipelines, providing a consistent development
    experience, see [Flink Materialized 
Table](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/materialized-table/overview/)
 
 ## Table with PK
@@ -169,6 +170,71 @@ CREATE TABLE my_parquet_table (
 
 {{< /tabs >}}
 
+## Object Table
+
+Object Table provides metadata indexes for unstructured data objects in the 
specified Object Storage storage directory.
+Object tables allow users to analyze unstructured data in Object Storage:
+
+1. Use Python API to manipulate these unstructured data, such as converting 
images to PDF format.
+2. Model functions can also be used to perform inference, and then the results 
of these operations can be concatenated
+   with other structured data in the Catalog.
+
+The object table is managed by Catalog and can also have access permissions 
and the ability to manage blood relations.
+
+{{< tabs "object-table" >}}
+
+{{< tab "Flink-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` WITH (
+  'type' = 'object-table',
+  'object-location' = 'oss://my_bucket/my_location' 
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` /*+ OPTIONS('scan.snapshot-id' = '1') */;
+```
+
+{{< /tab >}}
+
+{{< tab "Spark-SQL" >}}
+
+```sql
+-- Create Object Table
+
+CREATE TABLE `my_object_table` TBLPROPERTIES (
+  'type' = 'object-table',
+  'object-location' = 'oss://my_bucket/my_location' 
+);
+
+-- Refresh Object Table
+
+CALL sys.refresh_object_table('mydb.my_object_table');
+
+-- Query Object Table
+
+SELECT * FROM `my_object_table`;
+
+-- Query Object Table with Time Travel
+
+SELECT * FROM `my_object_table` VERSION AS OF 1;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ## Materialized Table
 
 Materialized Table aimed at simplifying both batch and stream data pipelines, 
providing a consistent development
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 78392c3ef..1305dfe92 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -533,6 +533,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>The number of sorted runs that trigger the stopping of writes, 
the default value is 'num-sorted-run.compaction-trigger' + 3.</td>
         </tr>
+        <tr>
+            <td><h5>object-location</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The object location for object table.</td>
+        </tr>
         <tr>
             <td><h5>page-size</h5></td>
             <td style="word-wrap: break-word;">64 kb</td>
@@ -910,7 +916,7 @@ If the data size allocated for the sorting task is 
uneven,which may lead to perf
             <td><h5>type</h5></td>
             <td style="word-wrap: break-word;">table</td>
             <td><p>Enum</p></td>
-            <td>Type of the table.<br /><br />Possible values:<ul><li>"table": 
Normal Paimon table.</li><li>"format-table": A file format table refers to a 
directory that contains multiple files of the same 
format.</li><li>"materialized-table": A materialized table.</li></ul></td>
+            <td>Type of the table.<br /><br />Possible values:<ul><li>"table": 
Normal Paimon table.</li><li>"format-table": A file format table refers to a 
directory that contains multiple files of the same 
format.</li><li>"materialized-table": A materialized table combines normal 
Paimon table and materialized SQL.</li><li>"object-table": A object table 
combines normal Paimon table and object location.</li></ul></td>
         </tr>
         <tr>
             <td><h5>write-buffer-for-append</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index bde2f1cc3..bb1661d6f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1412,6 +1412,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether to enable asynchronous IO writing when 
writing files.");
 
+    public static final ConfigOption<String> OBJECT_LOCATION =
+            key("object-location")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("The object location for object table.");
+
     @ExcludeFromDocumentation("Only used internally to support materialized 
table")
     public static final ConfigOption<String> 
MATERIALIZED_TABLE_DEFINITION_QUERY =
             key("materialized-table.definition-query")
@@ -1523,6 +1529,10 @@ public class CoreOptions implements Serializable {
         return new Path(options.get(PATH));
     }
 
+    public TableType type() {
+        return options.get(TYPE);
+    }
+
     public String formatType() {
         return normalizeFileFormat(options.get(FILE_FORMAT));
     }
@@ -1572,6 +1582,11 @@ public class CoreOptions implements Serializable {
         return FileFormat.fromIdentifier(formatIdentifier, options);
     }
 
+    public String objectLocation() {
+        checkArgument(type() == TableType.OBJECT_TABLE, "Only object table has 
object location!");
+        return options.get(OBJECT_LOCATION);
+    }
+
     public Map<Integer, String> fileCompressionPerLevel() {
         Map<String, String> levelCompressions = 
options.get(FILE_COMPRESSION_PER_LEVEL);
         return levelCompressions.entrySet().stream()
diff --git a/paimon-common/src/main/java/org/apache/paimon/TableType.java 
b/paimon-common/src/main/java/org/apache/paimon/TableType.java
index d690d5db3..d9ac020f7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/TableType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/TableType.java
@@ -29,7 +29,12 @@ public enum TableType implements DescribedEnum {
     FORMAT_TABLE(
             "format-table",
             "A file format table refers to a directory that contains multiple 
files of the same format."),
-    MATERIALIZED_TABLE("materialized-table", "A materialized table.");
+    MATERIALIZED_TABLE(
+            "materialized-table",
+            "A materialized table combines normal Paimon table and 
materialized SQL."),
+    OBJECT_TABLE(
+            "object-table", "A object table combines normal Paimon table and 
object location.");
+
     private final String value;
     private final String description;
 
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
index 8308f5205..c3e6cde9c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
@@ -20,6 +20,8 @@ package org.apache.paimon.fs;
 
 import org.apache.paimon.annotation.Public;
 
+import javax.annotation.Nullable;
+
 /**
  * Interface that represents the client side information for a file 
independent of the file system.
  *
@@ -56,4 +58,24 @@ public interface FileStatus {
      *     milliseconds since the epoch (UTC January 1, 1970).
      */
     long getModificationTime();
+
+    /**
+     * Get the last access time of the file.
+     *
+     * @return A long value representing the time the file was last accessed, 
measured in
+     *     milliseconds since the epoch (UTC January 1, 1970).
+     */
+    default long getAccessTime() {
+        return 0;
+    }
+
+    /**
+     * Returns the owner of this file.
+     *
+     * @return the owner of this file
+     */
+    @Nullable
+    default String getOwner() {
+        return null;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 70325ee69..0a8d64a73 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -329,6 +329,16 @@ public class HadoopFileIO implements FileIO {
         public long getModificationTime() {
             return status.getModificationTime();
         }
+
+        @Override
+        public long getAccessTime() {
+            return status.getAccessTime();
+        }
+
+        @Override
+        public String getOwner() {
+            return status.getOwner();
+        }
     }
 
     // ============================== extra methods 
===================================
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java 
b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
index 659212b06..b025b6a83 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataTypes.java
@@ -103,6 +103,10 @@ public class DataTypes {
         return new LocalZonedTimestampType(precision);
     }
 
+    public static LocalZonedTimestampType TIMESTAMP_LTZ_MILLIS() {
+        return new LocalZonedTimestampType(3);
+    }
+
     public static DecimalType DECIMAL(int precision, int scale) {
         return new DecimalType(precision, scale);
     }
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index 80e41be71..f3fce0db6 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -169,11 +169,16 @@ public final class RowType extends DataType {
     }
 
     @Override
-    public DataType copy(boolean isNullable) {
+    public RowType copy(boolean isNullable) {
         return new RowType(
                 isNullable, 
fields.stream().map(DataField::copy).collect(Collectors.toList()));
     }
 
+    @Override
+    public RowType notNull() {
+        return copy(false);
+    }
+
     @Override
     public String asSQLString() {
         return withNullability(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 4c36ad4db..c2e4afe5d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,8 +38,10 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
@@ -48,6 +50,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -56,7 +59,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
 import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
@@ -284,13 +286,35 @@ public abstract class AbstractCatalog implements Catalog {
 
         copyTableDefaultOptions(schema.options());
 
-        if (Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE) {
-            createFormatTable(identifier, schema);
-        } else {
-            createTableImpl(identifier, schema);
+        switch (Options.fromMap(schema.options()).get(TYPE)) {
+            case TABLE:
+            case MATERIALIZED_TABLE:
+                createTableImpl(identifier, schema);
+                break;
+            case OBJECT_TABLE:
+                createObjectTable(identifier, schema);
+                break;
+            case FORMAT_TABLE:
+                createFormatTable(identifier, schema);
+                break;
         }
     }
 
+    private void createObjectTable(Identifier identifier, Schema schema) {
+        RowType rowType = schema.rowType();
+        checkArgument(
+                rowType.getFields().isEmpty()
+                        || new HashSet<>(ObjectTable.SCHEMA.getFields())
+                                .containsAll(rowType.getFields()),
+                "Schema of Object Table can be empty or %s, but is %s.",
+                ObjectTable.SCHEMA,
+                rowType);
+        checkArgument(
+                
schema.options().containsKey(CoreOptions.OBJECT_LOCATION.key()),
+                "Object table should have object-location option.");
+        createTableImpl(identifier, schema.copy(ObjectTable.SCHEMA));
+    }
+
     protected abstract void createTableImpl(Identifier identifier, Schema 
schema);
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index c6c79f4d4..9984e3fee 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -96,6 +96,10 @@ public class Schema {
         return comment;
     }
 
+    public Schema copy(RowType rowType) {
+        return new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
options, comment);
+    }
+
     private static List<DataField> normalizeFields(
             List<DataField> fields, List<String> primaryKeys, List<String> 
partitionKeys) {
         List<String> fieldNames = 
fields.stream().map(DataField::name).collect(Collectors.toList());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 423dc1726..47d877724 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -19,12 +19,14 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.utils.StringUtils;
 
 import java.io.IOException;
@@ -33,6 +35,7 @@ import java.util.Optional;
 
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Factory to create {@link FileStoreTable}. */
 public class FileStoreTableFactory {
@@ -124,6 +127,17 @@ public class FileStoreTableFactory {
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment)
                         : new PrimaryKeyFileStoreTable(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment);
-        return table.copy(dynamicOptions.toMap());
+        table = table.copy(dynamicOptions.toMap());
+        CoreOptions options = table.coreOptions();
+        if (options.type() == TableType.OBJECT_TABLE) {
+            String objectLocation = options.objectLocation();
+            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
+            table =
+                    ObjectTable.builder()
+                            .underlyingTable(table)
+                            .objectLocation(objectLocation)
+                            .build();
+        }
+        return table;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
new file mode 100644
index 000000000..326efbc0e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.object;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Util class for refreshing object table. */
+public class ObjectRefresh {
+
+    public static long refresh(ObjectTable table) throws Exception {
+        String location = table.objectLocation();
+        FileStoreTable underlyingTable = table.underlyingTable();
+        FileIO fileIO = underlyingTable.fileIO();
+
+        List<FileStatus> fileCollector = new ArrayList<>();
+        listAllFiles(fileIO, new Path(location), fileCollector);
+
+        BatchWriteBuilder writeBuilder = 
underlyingTable.newBatchWriteBuilder().withOverwrite();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            for (FileStatus file : fileCollector) {
+                write.write(toRow(file));
+            }
+            commit.commit(write.prepareCommit());
+        }
+
+        return fileCollector.size();
+    }
+
+    private static void listAllFiles(FileIO fileIO, Path directory, 
List<FileStatus> fileCollector)
+            throws IOException {
+        FileStatus[] files = fileIO.listStatus(directory);
+        if (files == null) {
+            return;
+        }
+
+        for (FileStatus file : files) {
+            if (file.isDir()) {
+                listAllFiles(fileIO, file.getPath(), fileCollector);
+            } else {
+                fileCollector.add(file);
+            }
+        }
+    }
+
+    private static InternalRow toRow(FileStatus file) {
+        return toRow(
+                file.getPath().toString(),
+                file.getPath().getName(),
+                file.getLen(),
+                Timestamp.fromEpochMillis(file.getModificationTime()),
+                Timestamp.fromEpochMillis(file.getAccessTime()),
+                file.getOwner(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                new GenericMap(Collections.emptyMap()));
+    }
+
+    public static GenericRow toRow(Object... values) {
+        GenericRow row = new GenericRow(values.length);
+
+        for (int i = 0; i < values.length; ++i) {
+            Object value = values[i];
+            if (value instanceof String) {
+                value = BinaryString.fromString((String) value);
+            }
+            row.setField(i, value);
+        }
+
+        return row;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
new file mode 100644
index 000000000..65689108c
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.object;
+
+import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.DelegatedFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.HashSet;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A object table refers to a directory that contains multiple objects 
(files), Object table
+ * provides metadata indexes for unstructured data objects in this directory. 
Allowing users to
+ * analyze unstructured data in Object Storage.
+ *
+ * <p>Object Table stores the metadata of objects in the underlying table.
+ */
+public interface ObjectTable extends FileStoreTable {
+
+    RowType SCHEMA =
+            RowType.builder()
+                    .field("path", DataTypes.STRING().notNull())
+                    .field("name", DataTypes.STRING().notNull())
+                    .field("length", DataTypes.BIGINT().notNull())
+                    .field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS())
+                    .field("atime", DataTypes.TIMESTAMP_LTZ_MILLIS())
+                    .field("owner", DataTypes.STRING().nullable())
+                    .field("generation", DataTypes.INT().nullable())
+                    .field("content_type", DataTypes.STRING().nullable())
+                    .field("storage_class", DataTypes.STRING().nullable())
+                    .field("md5_hash", DataTypes.STRING().nullable())
+                    .field("metadata_mtime", 
DataTypes.TIMESTAMP_LTZ_MILLIS().nullable())
+                    .field("metadata", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))
+                    .build()
+                    .notNull();
+
+    /** Object location in file system. */
+    String objectLocation();
+
+    /** Underlying table to store metadata. */
+    FileStoreTable underlyingTable();
+
+    long refresh();
+
+    @Override
+    ObjectTable copy(Map<String, String> dynamicOptions);
+
+    /** Create a new builder for {@link ObjectTable}. */
+    static ObjectTable.Builder builder() {
+        return new ObjectTable.Builder();
+    }
+
+    /** Builder for {@link ObjectTable}. */
+    class Builder {
+
+        private FileStoreTable underlyingTable;
+        private String objectLocation;
+
+        public ObjectTable.Builder underlyingTable(FileStoreTable 
underlyingTable) {
+            this.underlyingTable = underlyingTable;
+            checkArgument(
+                    new HashSet<>(SCHEMA.getFields())
+                            
.containsAll(underlyingTable.rowType().getFields()),
+                    "Schema of Object Table should be %s, but is %s.",
+                    SCHEMA,
+                    underlyingTable.rowType());
+            return this;
+        }
+
+        public ObjectTable.Builder objectLocation(String objectLocation) {
+            this.objectLocation = objectLocation;
+            return this;
+        }
+
+        public ObjectTable build() {
+            return new ObjectTableImpl(underlyingTable, objectLocation);
+        }
+    }
+
+    /** An implementation for {@link ObjectTable}. */
+    class ObjectTableImpl extends DelegatedFileStoreTable implements 
ObjectTable {
+
+        private final String objectLocation;
+
+        public ObjectTableImpl(FileStoreTable underlyingTable, String 
objectLocation) {
+            super(underlyingTable);
+            this.objectLocation = objectLocation;
+        }
+
+        @Override
+        public BatchWriteBuilder newBatchWriteBuilder() {
+            throw new UnsupportedOperationException("Object table does not 
support Write.");
+        }
+
+        @Override
+        public StreamWriteBuilder newStreamWriteBuilder() {
+            throw new UnsupportedOperationException("Object table does not 
support Write.");
+        }
+
+        @Override
+        public TableWriteImpl<?> newWrite(String commitUser) {
+            throw new UnsupportedOperationException("Object table does not 
support Write.");
+        }
+
+        @Override
+        public TableWriteImpl<?> newWrite(String commitUser, 
ManifestCacheFilter manifestFilter) {
+            throw new UnsupportedOperationException("Object table does not 
support Write.");
+        }
+
+        @Override
+        public TableCommitImpl newCommit(String commitUser) {
+            throw new UnsupportedOperationException("Object table does not 
support Commit.");
+        }
+
+        @Override
+        public String objectLocation() {
+            return objectLocation;
+        }
+
+        @Override
+        public FileStoreTable underlyingTable() {
+            return wrapped;
+        }
+
+        @Override
+        public long refresh() {
+            try {
+                return ObjectRefresh.refresh(this);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public ObjectTable copy(Map<String, String> dynamicOptions) {
+            return new ObjectTableImpl(wrapped.copy(dynamicOptions), 
objectLocation);
+        }
+
+        @Override
+        public FileStoreTable copy(TableSchema newTableSchema) {
+            return new ObjectTableImpl(wrapped.copy(newTableSchema), 
objectLocation);
+        }
+
+        @Override
+        public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+            return new ObjectTableImpl(
+                    wrapped.copyWithoutTimeTravel(dynamicOptions), 
objectLocation);
+        }
+
+        @Override
+        public FileStoreTable copyWithLatestSchema() {
+            return new ObjectTableImpl(wrapped.copyWithLatestSchema(), 
objectLocation);
+        }
+
+        @Override
+        public FileStoreTable switchToBranch(String branchName) {
+            return new ObjectTableImpl(wrapped.switchToBranch(branchName), 
objectLocation);
+        }
+    }
+}
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
index 4d86c12a6..67027eaba 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
@@ -286,5 +286,15 @@ public abstract class HadoopCompliantFileIO implements 
FileIO {
         public long getModificationTime() {
             return status.getModificationTime();
         }
+
+        @Override
+        public long getAccessTime() {
+            return status.getAccessTime();
+        }
+
+        @Override
+        public String getOwner() {
+            return status.getOwner();
+        }
     }
 }
diff --git 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index abfe0fabb..80f3df582 100644
--- 
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++ 
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -286,5 +286,15 @@ public abstract class HadoopCompliantFileIO implements 
FileIO {
         public long getModificationTime() {
             return status.getModificationTime();
         }
+
+        @Override
+        public long getAccessTime() {
+            return status.getAccessTime();
+        }
+
+        @Override
+        public String getOwner() {
+            return status.getOwner();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index db66379f3..74512409b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -227,5 +227,10 @@ public class FlinkFileIO implements FileIO {
         public long getModificationTime() {
             return status.getModificationTime();
         }
+
+        @Override
+        public long getAccessTime() {
+            return status.getAccessTime();
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
new file mode 100644
index 000000000..97eb3095f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.table.object.ObjectTable;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
+
+/**
+ * Refresh Object Table procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.refresh_object_table('tableId')
+ * </code></pre>
+ */
+public class RefreshObjectTableProcedure extends ProcedureBase {
+
+    private static final String IDENTIFIER = "refresh_object_table";
+
+    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
+    public @DataTypeHint("ROW<file_number BIGINT>") Row[] call(
+            ProcedureContext procedureContext, String tableId)
+            throws Catalog.TableNotExistException {
+        ObjectTable table = (ObjectTable) table(tableId);
+        long fileNumber = table.refresh();
+        return new Row[] {Row.of(fileNumber)};
+    }
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6fe5e74eb..0ff3ac1f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -79,3 +79,4 @@ org.apache.paimon.flink.procedure.FastForwardProcedure
 org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
 org.apache.paimon.flink.procedure.CloneProcedure
 org.apache.paimon.flink.procedure.CompactManifestProcedure
+org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
new file mode 100644
index 000000000..b9e30035b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for object table. */
+public class ObjectTableITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testIllegalObjectTable() {
+        assertThatThrownBy(
+                        () ->
+                                sql(
+                                        "CREATE TABLE T (a INT, b INT, c INT) 
WITH ('type' = 'object-table')"))
+                .rootCause()
+                .hasMessageContaining("Schema of Object Table can be empty 
or");
+        assertThatThrownBy(() -> sql("CREATE TABLE T WITH ('type' = 
'object-table')"))
+                .rootCause()
+                .hasMessageContaining("Object table should have 
object-location option.");
+    }
+
+    @Test
+    public void testObjectTableRefresh() throws IOException {
+        Path objectLocation = new Path(path + "/object-location");
+        FileIO fileIO = LocalFileIO.create();
+        sql(
+                "CREATE TABLE T WITH ('type' = 'object-table', 
'object-location' = '%s')",
+                objectLocation);
+
+        // add new file
+        fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3");
+        sql("CALL sys.refresh_object_table('default.T')");
+        assertThat(sql("SELECT name, length FROM 
T")).containsExactlyInAnyOrder(Row.of("f0", 5L));
+
+        // add new file
+        fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6");
+        sql("CALL sys.refresh_object_table('default.T')");
+        assertThat(sql("SELECT name, length FROM T"))
+                .containsExactlyInAnyOrder(Row.of("f0", 5L), Row.of("f1", 5L));
+
+        // delete file
+        fileIO.deleteQuietly(new Path(objectLocation, "f0"));
+        sql("CALL sys.refresh_object_table('default.T')");
+        assertThat(sql("SELECT name, length FROM 
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
+
+        // time travel
+        assertThat(sql("SELECT name, length FROM T /*+ 
OPTIONS('scan.snapshot-id' = '1') */"))
+                .containsExactlyInAnyOrder(Row.of("f0", 5L));
+
+        // insert into
+        assertThatThrownBy(() -> sql("INSERT INTO T SELECT * FROM T"))
+                .rootCause()
+                .hasMessageContaining("Object table does not support Write.");
+        assertThat(sql("SELECT name, length FROM 
T")).containsExactlyInAnyOrder(Row.of("f1", 5L));
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index c93aad41a..35b65a7b5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -35,6 +35,7 @@ import org.apache.paimon.spark.procedure.MigrateFileProcedure;
 import org.apache.paimon.spark.procedure.MigrateTableProcedure;
 import org.apache.paimon.spark.procedure.Procedure;
 import org.apache.paimon.spark.procedure.ProcedureBuilder;
+import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
 import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
 import org.apache.paimon.spark.procedure.RenameTagProcedure;
 import org.apache.paimon.spark.procedure.RepairProcedure;
@@ -87,6 +88,7 @@ public class SparkProcedures {
         procedureBuilders.put("reset_consumer", 
ResetConsumerProcedure::builder);
         procedureBuilders.put("mark_partition_done", 
MarkPartitionDoneProcedure::builder);
         procedureBuilders.put("compact_manifest", 
CompactManifestProcedure::builder);
+        procedureBuilders.put("refresh_object_table", 
RefreshObjectTableProcedure::builder);
         return procedureBuilders.build();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
new file mode 100644
index 000000000..c6b6fdab4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RefreshObjectTableProcedure.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.table.object.ObjectTable;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Spark procedure to refresh Object Table. */
+public class RefreshObjectTableProcedure extends BaseProcedure {
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("file_number", DataTypes.LongType, 
false, Metadata.empty())
+                    });
+
+    protected RefreshObjectTableProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        return modifyPaimonTable(
+                tableIdent,
+                table -> {
+                    ObjectTable objectTable = (ObjectTable) table;
+                    long fileNumber = objectTable.refresh();
+                    InternalRow outputRow = newInternalRow(fileNumber);
+                    return new InternalRow[] {outputRow};
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<RefreshObjectTableProcedure>() {
+            @Override
+            public RefreshObjectTableProcedure doBuild() {
+                return new RefreshObjectTableProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "RefreshObjectTableProcedure";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
new file mode 100644
index 000000000..3a446e33d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/ObjectTableTest.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.fs.local.LocalFileIO
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class ObjectTableTest extends PaimonSparkTestBase {
+
+  test(s"Paimon object table") {
+    val objectLocation = new Path(tempDBDir + "/object-location")
+    val fileIO = LocalFileIO.create
+
+    spark.sql(s"""
+                 |CREATE TABLE T TBLPROPERTIES (
+                 |    'type' = 'object-table',
+                 |    'object-location' = '$objectLocation'
+                 |)
+                 |""".stripMargin)
+
+    // add new file
+    fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3")
+    spark.sql("CALL sys.refresh_object_table('test.T')")
+    checkAnswer(
+      spark.sql("SELECT name, length FROM T"),
+      Row("f0", 5L) :: Nil
+    )
+
+    // add new file
+    fileIO.overwriteFileUtf8(new Path(objectLocation, "f1"), "4,5,6")
+    spark.sql("CALL sys.refresh_object_table('test.T')")
+    checkAnswer(
+      spark.sql("SELECT name, length FROM T"),
+      Row("f0", 5L) :: Row("f1", 5L) :: Nil
+    )
+
+    // time travel
+    checkAnswer(
+      spark.sql("SELECT name, length FROM T VERSION AS OF 1"),
+      Row("f0", 5L) :: Nil
+    )
+  }
+}


Reply via email to