This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f701668ec [core][rest] Support schema validation and infer for 
external paimon table (#6501)
0f701668ec is described below

commit 0f701668ecfea45c5764dc4fd008a192d06442b3
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Nov 3 18:44:45 2025 +0800

    [core][rest] Support schema validation and infer for external paimon table 
(#6501)
---
 .../org/apache/paimon/catalog/CatalogUtils.java    |   3 +
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  34 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  88 ++--
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 127 ++++--
 .../apache/paimon/rest/RESTFileSystemCatalog.java  |  13 +
 .../spark/table/PaimonExternalTableTest.scala      | 490 +++++++++++++++++++++
 6 files changed, 682 insertions(+), 73 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index e9510e8d55..d82016f931 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.exceptions.NotImplementedException;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -323,6 +324,8 @@ public class CatalogUtils {
                         snapshot = optional.get();
                     }
                 } catch (Catalog.TableNotExistException ignored) {
+                } catch (NotImplementedException ignored) {
+                    // does not support supportsVersionManagement for external 
paimon table
                 }
             }
             tableAndSnapshots.add(Pair.of(table, snapshot));
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 0d34e17bc8..1bf8ff2f35 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.rest;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.PagedList;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
@@ -33,6 +34,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
@@ -48,6 +50,7 @@ import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
@@ -71,12 +74,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BRANCH;
 import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -443,7 +448,8 @@ public class RESTCatalog implements Catalog {
             checkNotSystemTable(identifier, "createTable");
             validateCreateTable(schema);
             createExternalTablePathIfNotExist(schema);
-            api.createTable(identifier, schema);
+            Schema newSchema = inferSchemaIfExternalPaimonTable(schema);
+            api.createTable(identifier, newSchema);
         } catch (AlreadyExistsException e) {
             if (!ignoreIfExists) {
                 throw new TableAlreadyExistException(identifier);
@@ -998,4 +1004,30 @@ public class RESTCatalog implements Catalog {
             }
         }
     }
+
+    private Schema inferSchemaIfExternalPaimonTable(Schema schema) throws 
Exception {
+        TableType tableType = Options.fromMap(schema.options()).get(TYPE);
+        String externalLocation = schema.options().get(PATH.key());
+
+        if (TableType.TABLE.equals(tableType) && 
Objects.nonNull(externalLocation)) {
+            Path externalPath = new Path(externalLocation);
+            SchemaManager schemaManager =
+                    new SchemaManager(fileIOFromOptions(externalPath), 
externalPath);
+            Optional<TableSchema> latest = schemaManager.latest();
+            if (latest.isPresent()) {
+                // Note we just validate schema here, will not create a new 
table
+                schemaManager.createTable(schema, true);
+                Schema existsSchema = latest.get().toSchema();
+                // use `owner` and `path` from the user provide schema
+                if (Objects.nonNull(schema.options().get(Catalog.OWNER_PROP))) 
{
+                    existsSchema
+                            .options()
+                            .put(Catalog.OWNER_PROP, 
schema.options().get(Catalog.OWNER_PROP));
+                }
+                existsSchema.options().put(PATH.key(), 
schema.options().get(PATH.key()));
+                return existsSchema;
+            }
+        }
+        return schema;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 51d1218b2e..08fc1c9778 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -683,6 +683,19 @@ public class RESTCatalogServer {
     }
 
     private MockResponse snapshotHandle(Identifier identifier) throws 
Exception {
+        if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+        TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+        if (tableMetadata.isExternal()) {
+            ErrorResponse response =
+                    new ErrorResponse(
+                            ErrorResponse.RESOURCE_TYPE_TABLE,
+                            identifier.getFullName(),
+                            "external paimon table does not support get table 
snapshot in rest server",
+                            501);
+            return mockResponse(response, 404);
+        }
         RESTResponse response;
         Optional<TableSnapshot> snapshotOptional =
                 
Optional.ofNullable(tableLatestSnapshotStore.get(identifier.getFullName()));
@@ -714,6 +727,7 @@ public class RESTCatalogServer {
     }
 
     private MockResponse loadSnapshot(Identifier identifier, String version) 
throws Exception {
+
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
         SnapshotManager snapshotManager = table.snapshotManager();
         Snapshot snapshot = null;
@@ -1279,13 +1293,16 @@ public class RESTCatalogServer {
                         tableMetadata = createObjectTable(identifier, schema);
                     } else {
                         catalog.createTable(identifier, schema, false);
+                        boolean isExternal =
+                                schema.options() != null
+                                        && 
schema.options().containsKey(PATH.key());
                         tableMetadata =
                                 createTableMetadata(
                                         requestBody.getIdentifier(),
                                         0L,
                                         requestBody.getSchema(),
                                         UUID.randomUUID().toString(),
-                                        false);
+                                        isExternal);
                     }
                     tableMetadataStore.put(
                             requestBody.getIdentifier().getFullName(), 
tableMetadata);
@@ -1510,10 +1527,16 @@ public class RESTCatalogServer {
                 alterTableImpl(identifier, requestBody.getChanges());
                 return new MockResponse().setResponseCode(200);
             case "DELETE":
-                try {
-                    catalog.dropTable(identifier, false);
-                } catch (Exception e) {
-                    System.out.println(e.getMessage());
+                if (!tableMetadataStore.containsKey(identifier.getFullName())) 
{
+                    return new MockResponse().setResponseCode(404);
+                }
+                tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+                if (!tableMetadata.isExternal()) {
+                    try {
+                        catalog.dropTable(identifier, false);
+                    } catch (Exception e) {
+                        System.out.println(e.getMessage());
+                    }
                 }
                 tableMetadataStore.remove(identifier.getFullName());
                 tableLatestSnapshotStore.remove(identifier.getFullName());
@@ -1532,7 +1555,7 @@ public class RESTCatalogServer {
             throw new Catalog.TableNoPermissionException(fromTable);
         } else if (tableMetadataStore.containsKey(fromTable.getFullName())) {
             TableMetadata tableMetadata = 
tableMetadataStore.get(fromTable.getFullName());
-            if (!isFormatTable(tableMetadata.schema().toSchema())) {
+            if (!isFormatTable(tableMetadata.schema().toSchema()) && 
!tableMetadata.isExternal()) {
                 catalog.renameTable(requestBody.getSource(), 
requestBody.getDestination(), false);
             }
             if (tableMetadataStore.containsKey(toTable.getFullName())) {
@@ -2066,6 +2089,17 @@ public class RESTCatalogServer {
             Snapshot snapshot,
             List<PartitionStatistics> statistics)
             throws Catalog.TableNotExistException {
+        if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+        boolean isExternal = 
tableMetadataStore.get(identifier.getFullName()).isExternal();
+        if (isExternal) {
+            new ErrorResponse(
+                    ErrorResponse.RESOURCE_TYPE_TABLE,
+                    identifier.getFullName(),
+                    "external paimon table does not support commit in rest 
server",
+                    501);
+        }
         FileStoreTable table = getFileTable(identifier);
         if (!tableId.equals(table.catalogEnvironment().uuid())) {
             throw new Catalog.TableNotExistException(identifier);
@@ -2223,7 +2257,10 @@ public class RESTCatalogServer {
     private TableMetadata createTableMetadata(
             Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
         Map<String, String> options = new HashMap<>(schema.options());
-        Path path = catalog.getTableLocation(identifier);
+        Path path =
+                isExternal && Objects.nonNull(schema.options().get(PATH.key()))
+                        ? new Path(schema.options().get(PATH.key()))
+                        : catalog.getTableLocation(identifier);
         String restPath = path.toString();
         if (this.configResponse
                 .getDefaults()
@@ -2261,27 +2298,22 @@ public class RESTCatalogServer {
         return createTableMetadata(identifier, 1L, newSchema, 
UUID.randomUUID().toString(), false);
     }
 
-    private FileStoreTable getFileTable(Identifier identifier)
-            throws Catalog.TableNotExistException {
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
-            TableSchema schema = tableMetadata.schema();
-            CatalogEnvironment catalogEnv =
-                    new CatalogEnvironment(
-                            identifier,
-                            tableMetadata.uuid(),
-                            catalog.catalogLoader(),
-                            catalog.lockFactory().orElse(null),
-                            catalog.lockContext().orElse(null),
-                            catalogContext,
-                            false);
-            Path path = new Path(schema.options().get(PATH.key()));
-            FileIO dataFileIO = catalog.fileIO();
-            FileStoreTable table =
-                    FileStoreTableFactory.create(dataFileIO, path, schema, 
catalogEnv);
-            return table;
-        }
-        throw new Catalog.TableNotExistException(identifier);
+    private FileStoreTable getFileTable(Identifier identifier) {
+        TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+        TableSchema schema = tableMetadata.schema();
+        CatalogEnvironment catalogEnv =
+                new CatalogEnvironment(
+                        identifier,
+                        tableMetadata.uuid(),
+                        catalog.catalogLoader(),
+                        catalog.lockFactory().orElse(null),
+                        catalog.lockContext().orElse(null),
+                        catalogContext,
+                        false);
+        Path path = new Path(schema.options().get(PATH.key()));
+        FileIO dataFileIO = catalog.fileIO();
+        FileStoreTable table = FileStoreTableFactory.create(dataFileIO, path, 
schema, catalogEnv);
+        return table;
     }
 
     private static int getMaxResults(Map<String, String> parameters) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 5d76946023..2a7c1945bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.PagedList;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TableType;
@@ -2686,66 +2687,102 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         Table tableAgain = catalog.getTable(identifier);
         assertThat(tableAgain).isNotNull();
         assertThat(tableAgain.comment()).isEqualTo(Optional.of("External table 
for testing"));
+    }
+
+    @Test
+    public void testCreateExternalTableWithSchemaInference(@TempDir 
java.nio.file.Path path)
+            throws Exception {
+        Path externalTablePath = new Path(path.toString(), 
"external_table_inference_location");
+        DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(), 
externalTablePath.toString());
+        restCatalog.createDatabase("test_schema_inference_db", true);
+        Identifier identifier =
+                Identifier.create("test_schema_inference_db", 
"external_inference_table");
+        try {
+            catalog.dropTable(identifier, true);
+        } catch (Exception e) {
+            // Ignore drop errors
+        }
+
+        createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA);
+        Schema emptySchema =
+                new Schema(
+                        Lists.newArrayList(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        DEFAULT_TABLE_SCHEMA.options(),
+                        "");
+        catalog.createTable(identifier, emptySchema, false);
 
-        testReadSystemTables();
+        Table table = catalog.getTable(identifier);
+        assertThat(table).isNotNull();
+        assertThat(table.rowType().getFieldCount()).isEqualTo(3);
+        assertThat(table.rowType().getFieldNames()).containsExactly("pk", 
"col1", "col2");
 
-        // Verify external table path still exists after operations
-        assertTrue(
-                fileIO.exists(externalTablePath),
-                "External table path should still exist after operations");
+        Schema clientProvidedSchema =
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "pk", DataTypes.INT()),
+                                new DataField(1, "col1", DataTypes.STRING())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        DEFAULT_TABLE_SCHEMA.options(),
+                        "");
+        // schema mismatch should throw an exception
+        Assertions.assertThrows(
+                RuntimeException.class,
+                () -> catalog.createTable(identifier, clientProvidedSchema, 
false));
+        DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
+    }
 
-        // Test dropping external table - data should remain
-        catalog.dropTable(identifier, false);
+    @Test
+    public void testReadSystemTablesWithExternalTable(@TempDir 
java.nio.file.Path path)
+            throws Exception {
+        // Create an external table
+        Path externalTablePath = new Path(path.toString(), 
"external_sys_table_location");
+        DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(), 
externalTablePath.toString());
 
-        // Verify external table path still exists after drop (external table 
behavior)
-        assertTrue(
-                fileIO.exists(externalTablePath),
-                "External table path should still exist after drop");
+        restCatalog.createDatabase("test_sys_table_db", true);
+        Identifier identifier = Identifier.create("test_sys_table_db", 
"external_sys_table");
 
-        // Clean up
         try {
-            fileIO.deleteQuietly(externalTablePath);
+            catalog.dropTable(identifier, true);
         } catch (Exception e) {
-            // Ignore cleanup errors
+            // Ignore drop errors
         }
-    }
 
-    private void testReadSystemTables() throws IOException, 
Catalog.TableNotExistException {
+        createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA);
+        catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
+
+        // Test reading system table with external table
         Identifier allTablesIdentifier = Identifier.create("sys", "tables");
         Table allTablesTable = catalog.getTable(allTablesIdentifier);
+        assertThat(allTablesTable).isNotNull();
 
-        if (allTablesTable != null) {
-            ReadBuilder allTablesReadBuilder = allTablesTable.newReadBuilder();
-            TableRead allTablesRead = allTablesReadBuilder.newRead();
-            List<Split> allTablesSplits = 
allTablesReadBuilder.newScan().plan().splits();
+        ReadBuilder readBuilder = allTablesTable.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        List<Split> splits = readBuilder.newScan().plan().splits();
 
-            List<InternalRow> allTablesResults = new ArrayList<>();
-            for (Split split : allTablesSplits) {
-                try (RecordReader<InternalRow> reader = 
allTablesRead.createReader(split)) {
-                    reader.forEachRemaining(allTablesResults::add);
-                }
+        List<InternalRow> results = new ArrayList<>();
+        for (Split split : splits) {
+            try (RecordReader<InternalRow> reader = read.createReader(split)) {
+                reader.forEachRemaining(results::add);
             }
+        }
 
-            // Verify that our external table appears in ALL_TABLES
-            assertThat(allTablesResults).isNotEmpty();
-
-            // Find our external table in the results
-            boolean foundExternalTable = false;
-            for (InternalRow row : allTablesResults) {
-                String tableName = row.getString(1).toString(); // table_name 
column
-                String databaseName = row.getString(0).toString(); // 
database_name column
-                if ("external_test_table".equals(tableName)
-                        && "test_external_table_db".equals(databaseName)) {
-                    foundExternalTable = true;
-                    // Verify table properties
-                    String tableType = row.getString(2).toString(); // 
table_type column
-                    assertThat(tableType)
-                            .isEqualTo("table"); // External tables are still 
MANAGED type
-                    break;
-                }
+        // Verify external table appears in system table
+        assertThat(results).isNotEmpty();
+        boolean foundExternalTable = false;
+        for (InternalRow row : results) {
+            String databaseName = row.getString(0).toString();
+            String tableName = row.getString(1).toString();
+            if ("test_sys_table_db".equals(databaseName)
+                    && "external_sys_table".equals(tableName)) {
+                foundExternalTable = true;
+                break;
             }
-            assertThat(foundExternalTable).isTrue();
         }
+        assertThat(foundExternalTable).isTrue();
+        DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
     }
 
     protected void createTable(
@@ -2828,7 +2865,9 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
     private void createExternalTableDirectory(Path externalTablePath, Schema 
schema)
             throws Exception {
         // Create external table directory structure
-        FileIO fileIO = FileIO.get(externalTablePath, 
CatalogContext.create(new Options()));
+        FileIO fileIO =
+                FileIO.get(
+                        externalTablePath, CatalogContext.create(new 
Options(catalog.options())));
 
         // Create the external table directory
         if (!fileIO.exists(externalTablePath)) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
index 26a6315347..5fd3aaf748 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
@@ -20,8 +20,12 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+
+import static org.apache.paimon.CoreOptions.PATH;
 
 /**
  * A FileSystemCatalog that supports custom table paths for REST catalog 
server. This allows REST
@@ -37,4 +41,13 @@ public class RESTFileSystemCatalog extends FileSystemCatalog 
{
     protected boolean allowCustomTablePath() {
         return true;
     }
+
+    @Override
+    public void createTable(Identifier identifier, Schema schema, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException {
+        boolean isExternal = schema.options() != null && 
schema.options().containsKey(PATH.key());
+        if (!isExternal) {
+            super.createTable(identifier, schema, ignoreIfExists);
+        }
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
new file mode 100644
index 0000000000..1ea9b2ee81
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.table
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.fs.local.LocalFileIO
+import org.apache.paimon.schema.{Schema, SchemaManager}
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataTypes
+import org.apache.paimon.utils.StringUtils
+
+import org.apache.spark.sql.Row
+
+class PaimonExternalTableTest extends PaimonSparkTestWithRestCatalogBase {
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    sql("USE paimon")
+    sql("CREATE DATABASE IF NOT EXISTS test_db")
+    sql("USE test_db")
+    // Clean up any existing tables from previous test runs
+    sql("DROP TABLE IF EXISTS external_tbl")
+    sql("DROP TABLE IF EXISTS managed_tbl")
+    sql("DROP TABLE IF EXISTS external_tbl_renamed")
+    sql("DROP TABLE IF EXISTS t1")
+    sql("DROP TABLE IF EXISTS t2")
+  }
+
+  test("PaimonExternalTable: create and drop external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl", "managed_tbl") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+          // Ensure table doesn't exist before starting
+          sql("DROP TABLE IF EXISTS external_tbl")
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("id", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          // create external table
+          sql(
+            s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon 
LOCATION '$externalTbLocation'")
+          sql("INSERT INTO external_tbl VALUES (1, 'Alice'), (2, 'Bob')")
+          checkAnswer(
+            sql("SELECT * FROM external_tbl ORDER BY id"),
+            Seq(Row(1, "Alice"), Row(2, "Bob"))
+          )
+
+          val table = paimonCatalog
+            .getTable(Identifier.create("test_db", "external_tbl"))
+            .asInstanceOf[FileStoreTable]
+          val fileIO = table.fileIO()
+          val actualTbLocation = table.location()
+
+          // For REST catalog, the path might be managed internally, but the 
table should still function as external
+          // Verify that the table has a location and is accessible
+          assert(actualTbLocation != null, "External table should have a 
location")
+
+          // Verify data is accessible
+          assert(fileIO.exists(actualTbLocation), "External table location 
should exist")
+
+          // drop external table - data should still exist (this is the key 
characteristic of external tables)
+          sql("DROP TABLE external_tbl")
+          assert(fileIO.exists(actualTbLocation), "External table data should 
exist after drop")
+
+          // Invalidate catalog cache to ensure table is fully removed
+          try {
+            paimonCatalog.invalidateTable(Identifier.create("test_db", 
"external_tbl"))
+          } catch {
+            case _: Exception => // Ignore if table doesn't exist in cache
+          }
+
+          // Wait a bit and ensure table is fully dropped before recreating
+          Thread.sleep(100) // Give catalog time to fully process the drop
+          sql("DROP TABLE IF EXISTS external_tbl")
+
+          // Schema already exists in filesystem from initial creation, no 
need to recreate
+          // create external table again using the same location - should be 
able to read existing data
+          sql(
+            s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon 
LOCATION '$externalTbLocation'")
+          checkAnswer(
+            sql("SELECT * FROM external_tbl ORDER BY id"),
+            Seq(Row(1, "Alice"), Row(2, "Bob"))
+          )
+
+          // create managed table for comparison
+          sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon")
+          sql("INSERT INTO managed_tbl VALUES (3, 'Charlie')")
+          val managedTable = paimonCatalog
+            .getTable(Identifier.create("test_db", "managed_tbl"))
+            .asInstanceOf[FileStoreTable]
+          val managedTbLocation = managedTable.location()
+
+          // drop managed table - data should be deleted
+          sql("DROP TABLE managed_tbl")
+          assert(
+            !fileIO.exists(managedTbLocation),
+            "Managed table data should not exist after drop")
+        }
+    }
+  }
+
+  test("PaimonExternalTable: partitioned external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("id", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .column("value", DataTypes.DOUBLE())
+            .column("dept", DataTypes.STRING())
+            .partitionKeys("dept")
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          sql(s"""
+                 |CREATE TABLE external_tbl (id INT, name STRING, value 
DOUBLE) USING paimon
+                 |PARTITIONED BY (dept STRING)
+                 |LOCATION '$externalTbLocation'
+                 |""".stripMargin)
+
+          sql(
+            "INSERT INTO external_tbl VALUES " +
+              "(1, 'Alice', 10.5, 'Engineering')," +
+              "(2, 'Bob', 20.7, 'Engineering')," +
+              "(3, 'Charlie', 30.9, 'Sales')," +
+              "(4, 'David', 25.3, 'Sales')")
+
+          // Test reading all data
+          checkAnswer(
+            sql("SELECT * FROM external_tbl ORDER BY id"),
+            Seq(
+              Row(1, "Alice", 10.5, "Engineering"),
+              Row(2, "Bob", 20.7, "Engineering"),
+              Row(3, "Charlie", 30.9, "Sales"),
+              Row(4, "David", 25.3, "Sales")
+            )
+          )
+
+          // Test partition filtering
+          checkAnswer(
+            sql("SELECT * FROM external_tbl WHERE dept = 'Engineering' ORDER 
BY id"),
+            Seq(
+              Row(1, "Alice", 10.5, "Engineering"),
+              Row(2, "Bob", 20.7, "Engineering")
+            )
+          )
+
+          // Test column projection with partition filtering
+          checkAnswer(
+            sql("SELECT name, value FROM external_tbl WHERE dept = 'Sales' 
ORDER BY id"),
+            Seq(
+              Row("Charlie", 30.9),
+              Row("David", 25.3)
+            )
+          )
+
+          // Verify this is an external table - drop and check data exists
+          val table = paimonCatalog
+            .getTable(Identifier.create("test_db", "external_tbl"))
+            .asInstanceOf[FileStoreTable]
+          val fileIO = table.fileIO()
+          val actualTbLocation = table.location()
+
+          sql("DROP TABLE external_tbl")
+          assert(fileIO.exists(actualTbLocation), "External table data should 
exist after drop")
+        }
+    }
+  }
+
+  test("PaimonExternalTable: rename external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl", "external_tbl_renamed") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("id", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          // create external table
+          sql(
+            s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon 
LOCATION '$externalTbLocation'")
+          sql("INSERT INTO external_tbl VALUES (1, 'Alice')")
+          val originalLocation = paimonCatalog
+            .getTable(Identifier.create("test_db", "external_tbl"))
+            .asInstanceOf[FileStoreTable]
+            .location()
+
+          // rename external table, location should not change
+          sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed")
+          checkAnswer(
+            sql("SELECT * FROM external_tbl_renamed"),
+            Seq(Row(1, "Alice"))
+          )
+
+          val renamedTable = paimonCatalog
+            .getTable(Identifier.create("test_db", "external_tbl_renamed"))
+            .asInstanceOf[FileStoreTable]
+          val renamedLocation = renamedTable.location()
+          assert(
+            renamedLocation.toString.equals(originalLocation.toString),
+            "External table location should not change after rename"
+          )
+        }
+    }
+  }
+
+  test("PaimonExternalTable: create external table without schema") {
+    withTempDir {
+      tbLocation =>
+        withTable("t1", "t2") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("id", DataTypes.INT())
+            .column("pt", DataTypes.INT())
+            .partitionKeys("pt")
+            .primaryKey("id")
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          // First create a table with schema and data
+          sql(s"""
+                 |CREATE TABLE t1 (id INT, pt INT) USING paimon
+                 |PARTITIONED BY (pt)
+                 |TBLPROPERTIES('primary-key' = 'id')
+                 |LOCATION '$externalTbLocation'
+                 |""".stripMargin)
+          sql("INSERT INTO t1 VALUES (1, 1), (2, 2)")
+
+          // create external table without schema - should infer from existing 
table
+          sql(s"CREATE TABLE t2 USING paimon LOCATION '$externalTbLocation'")
+          checkAnswer(
+            sql("SELECT * FROM t2 ORDER BY id"),
+            Seq(Row(1, 1), Row(2, 2))
+          )
+
+          val table2 =
+            paimonCatalog.getTable(Identifier.create("test_db", 
"t2")).asInstanceOf[FileStoreTable]
+          val table2Location = table2.location()
+          // Verify table2 can access the data from table1's location
+          assert(table2Location != null, "Table t2 should have a location")
+          // The key point is that t2 can read data from the same location as 
t1
+          assert(table2.fileIO().exists(table2Location), "Table t2 location 
should exist")
+        }
+    }
+  }
+
+  test("PaimonExternalTable: create external table on managed table location") 
{
+    withTable("external_tbl", "managed_tbl") {
+      // Create managed table first
+      sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon")
+      sql("INSERT INTO managed_tbl VALUES (1, 'Alice'), (2, 'Bob')")
+      checkAnswer(
+        sql("SELECT * FROM managed_tbl ORDER BY id"),
+        Seq(Row(1, "Alice"), Row(2, "Bob"))
+      )
+
+      val managedTable = paimonCatalog
+        .getTable(Identifier.create("test_db", "managed_tbl"))
+        .asInstanceOf[FileStoreTable]
+      val managedLocation = managedTable.location()
+      // Extract actual file system path, removing any scheme prefix (e.g., 
"file:", "rest:")
+      val tablePath = if (managedLocation.toString.contains(":")) {
+        // Path has scheme, extract the path part after the first ":"
+        val parts = managedLocation.toString.split(":", 2)
+        if (parts.length == 2 && parts(0).equals("file")) {
+          parts(1) // For file: scheme, use the path directly
+        } else {
+          // For other schemes or if parsing fails, try to get canonical path
+          try {
+            new java.io.File(managedLocation.toString.replaceFirst("^[^:]+:", 
"")).getCanonicalPath
+          } catch {
+            case _: Exception => 
managedLocation.toString.replaceFirst("^[^:]+:", "")
+          }
+        }
+      } else {
+        managedLocation.toString
+      }
+
+      // For REST catalog, managed table already has schema, no need to create 
schema again
+      // Create external table pointing to managed table location
+      sql(s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon 
LOCATION '$tablePath'")
+      checkAnswer(
+        sql("SELECT * FROM external_tbl ORDER BY id"),
+        Seq(Row(1, "Alice"), Row(2, "Bob"))
+      )
+
+      val externalTable = paimonCatalog
+        .getTable(Identifier.create("test_db", "external_tbl"))
+        .asInstanceOf[FileStoreTable]
+      assert(
+        StringUtils.replace(externalTable.location().toString, "file:", 
"").equals(tablePath),
+        "External table should point to managed table location"
+      )
+
+      // Drop managed table - managed table deletion will delete data files
+      // since external table points to the same location, data will be deleted
+      sql("DROP TABLE managed_tbl")
+      val fileIO = externalTable.fileIO()
+      assert(
+        !fileIO.exists(externalTable.location()),
+        "Data should be deleted after dropping managed table since external 
table points to managed table location"
+      )
+
+      // External table cannot read data anymore since data was deleted with 
managed table
+      // This demonstrates that external table pointing to managed table 
location shares the same data
+      checkAnswer(
+        sql("SELECT * FROM external_tbl ORDER BY id"),
+        Seq.empty
+      )
+    }
+  }
+
+  test("PaimonExternalTable: insert overwrite on external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("age", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          sql(
+            s"CREATE TABLE external_tbl (age INT, name STRING) USING paimon 
LOCATION '$externalTbLocation'")
+
+          sql("INSERT INTO external_tbl VALUES (5, 'Ben'), (7, 'Larry')")
+          checkAnswer(
+            sql("SELECT age, name FROM external_tbl ORDER BY age"),
+            Seq(Row(5, "Ben"), Row(7, "Larry"))
+          )
+
+          sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry'), (7, 'Tom')")
+          checkAnswer(
+            sql("SELECT age, name FROM external_tbl ORDER BY age"),
+            Seq(Row(5, "Jerry"), Row(7, "Tom"))
+          )
+        }
+    }
+  }
+
+  test("PaimonExternalTable: insert overwrite on partitioned external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("age", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .column("id", DataTypes.INT())
+            .partitionKeys("id")
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          sql(s"""
+                 |CREATE TABLE external_tbl (age INT, name STRING) USING paimon
+                 |PARTITIONED BY (id INT)
+                 |LOCATION '$externalTbLocation'
+                 |""".stripMargin)
+
+          sql("INSERT INTO external_tbl PARTITION (id = 1) VALUES (5, 'Ben'), 
(7, 'Larry')")
+          sql("INSERT OVERWRITE external_tbl PARTITION (id = 1) VALUES (5, 
'Jerry'), (7, 'Tom')")
+          checkAnswer(
+            sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"),
+            Seq(Row(1, 5, "Jerry"), Row(1, 7, "Tom"))
+          )
+
+          sql("INSERT INTO external_tbl PARTITION (id = 3) VALUES (5, 
'Alice')")
+          // Use dynamic partition overwrite mode to only overwrite partitions 
present in data
+          withSparkSQLConf("spark.sql.sources.partitionOverwriteMode" -> 
"dynamic") {
+            sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry', 1), (7, 
'Tom', 2)")
+          }
+          checkAnswer(
+            sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"),
+            Seq(Row(1, 5, "Jerry"), Row(2, 7, "Tom"), Row(3, 5, "Alice"))
+          )
+        }
+    }
+  }
+
+  test("PaimonExternalTable: show partitions on external table") {
+    withTempDir {
+      tbLocation =>
+        withTable("external_tbl") {
+          val externalTbLocation = tbLocation.getCanonicalPath
+
+          // For REST catalog external tables, schema must be created in 
filesystem first
+          val schemaTablePath = new Path(externalTbLocation)
+          val schemaFileIO = LocalFileIO.create()
+          val schema = Schema
+            .newBuilder()
+            .column("id", DataTypes.INT())
+            .column("p1", DataTypes.INT())
+            .column("p2", DataTypes.STRING())
+            .partitionKeys("p1", "p2")
+            .option("path", externalTbLocation)
+            .option("type", "table")
+            .build()
+          new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema, 
true)
+
+          sql(s"""
+                 |CREATE TABLE external_tbl (id INT, p1 INT, p2 STRING) USING 
paimon
+                 |PARTITIONED BY (p1, p2)
+                 |LOCATION '$externalTbLocation'
+                 |""".stripMargin)
+
+          sql("INSERT INTO external_tbl VALUES (1, 1, '1')")
+          sql("INSERT INTO external_tbl VALUES (2, 1, '1')")
+          sql("INSERT INTO external_tbl VALUES (3, 2, '1')")
+          sql("INSERT INTO external_tbl VALUES (3, 2, '2')")
+
+          checkAnswer(
+            sql("SHOW PARTITIONS external_tbl"),
+            Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+          checkAnswer(
+            sql("SHOW PARTITIONS external_tbl PARTITION (p1=2)"),
+            Seq(Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+          checkAnswer(
+            sql("SHOW PARTITIONS external_tbl PARTITION (p1=2, p2='2')"),
+            Seq(Row("p1=2/p2=2")))
+        }
+    }
+  }
+}


Reply via email to