This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cdaf66df [hive] Enable Format Table by default (#4461)
4cdaf66df is described below

commit 4cdaf66dfbe492d7b6d3c18fbd251b19d39c1f7a
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 7 15:24:25 2024 +0800

    [hive] Enable Format Table by default (#4461)
---
 .../generated/catalog_configuration.html           |  6 +++
 .../generated/hive_catalog_configuration.html      |  6 ---
 .../org/apache/paimon/options/CatalogOptions.java  |  9 ++++
 .../apache/paimon/table/FormatTableOptions.java    |  5 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java | 47 ++++++++++++++++++
 .../java/org/apache/paimon/flink/FlinkCatalog.java |  4 +-
 .../paimon/flink/FlinkGenericCatalogFactory.java   |  1 +
 .../apache/paimon/flink/FormatCatalogTable.java    | 25 +++++-----
 .../apache/paimon/flink/CatalogTableITCase.java    |  3 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   | 57 +++++++++++++++-------
 .../org/apache/paimon/hive/HiveCatalogOptions.java |  9 ----
 .../org/apache/paimon/hive/HiveTableUtils.java     | 41 ++++++----------
 .../org/apache/paimon/hive/HiveCatalogTest.java    |  5 ++
 .../apache/paimon/hive/Hive23CatalogITCase.java    |  3 +-
 .../apache/paimon/hive/Hive31CatalogITCase.java    |  1 -
 .../hive/HiveCatalogFormatTableITCaseBase.java     |  4 +-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 24 +++------
 .../java/org/apache/paimon/spark/SparkCatalog.java |  6 ++-
 .../apache/paimon/spark/SparkGenericCatalog.java   |  1 +
 .../paimon/spark/SparkCatalogWithHiveTest.java     | 44 ++++++++---------
 20 files changed, 179 insertions(+), 122 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html 
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 8954e898f..3686fa20c 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -74,6 +74,12 @@ under the License.
             <td>Integer</td>
             <td>Configure the size of the connection pool.</td>
         </tr>
+        <tr>
+            <td><h5>format-table.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to support format tables, format table corresponds to 
a regular csv, parquet or orc table, allowing read and write operations. 
However, during these processes, it does not connect to the metastore; hence, 
newly added partitions will not be reflected in the metastore and need to be 
manually added as separate partition operations.</td>
+        </tr>
         <tr>
             <td><h5>lineage-meta</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html 
b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
index 076a46232..e0257d301 100644
--- a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
@@ -39,12 +39,6 @@ under the License.
             <td>String</td>
             <td>Specify client cache key, multiple elements separated by 
commas.<br /><ul><li>"ugi":  the Hadoop UserGroupInformation instance that 
represents the current user using the cache.</li></ul><ul><li>"user_name" 
similar to UGI but only includes the user's name determined by 
UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary 
configuration. The value of the configuration will be extracted from catalog 
properties and added to the cache key. A conf elem [...]
         </tr>
-        <tr>
-            <td><h5>format-table.enabled</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to support format tables, format table corresponds to 
a regular Hive table, allowing read and write operations. However, during these 
processes, it does not connect to the metastore; hence, newly added partitions 
will not be reflected in the metastore and need to be manually added as 
separate partition operations.</td>
-        </tr>
         <tr>
             <td><h5>hadoop-conf-dir</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index ace4daf5e..6ad9f3350 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -156,4 +156,13 @@ public class CatalogOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Sync all table properties to hive 
metastore");
+
+    public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
+            ConfigOptions.key("format-table.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to support format tables, format table 
corresponds to a regular csv, parquet or orc table, allowing read and write 
operations. "
+                                    + "However, during these processes, it 
does not connect to the metastore; hence, newly added partitions will not be 
reflected in"
+                                    + " the metastore and need to be manually 
added as separate partition operations.");
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
index 64f134d07..b4010209c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
@@ -25,8 +25,9 @@ import org.apache.paimon.options.ConfigOptions;
 public class FormatTableOptions {
 
     public static final ConfigOption<String> FIELD_DELIMITER =
-            ConfigOptions.key("csv.field-delimiter")
+            ConfigOptions.key("field-delimiter")
                     .stringType()
                     .defaultValue(",")
-                    .withDescription("Optional field delimiter character (',' 
by default)");
+                    .withDescription(
+                            "Optional field delimiter character for CSV (',' 
by default).");
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 27992b56f..dbeedcfe5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
@@ -885,4 +886,50 @@ public abstract class CatalogTestBase {
         assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
                 .isInstanceOf(Catalog.ViewNotExistException.class);
     }
+
+    protected boolean supportsFormatTable() {
+        return false;
+    }
+
+    @Test
+    public void testFormatTable() throws Exception {
+        if (!supportsFormatTable()) {
+            return;
+        }
+
+        Identifier identifier = new Identifier("format_db", "my_format");
+        catalog.createDatabase(identifier.getDatabaseName(), false);
+
+        // create table
+        Schema schema =
+                Schema.newBuilder()
+                        .column("str", DataTypes.STRING())
+                        .column("int", DataTypes.INT())
+                        .option("type", "format-table")
+                        .option("file.format", "csv")
+                        .build();
+        catalog.createTable(identifier, schema, false);
+        assertThat(catalog.listTables(identifier.getDatabaseName()))
+                .contains(identifier.getTableName());
+        
assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class);
+
+        // alter table
+        SchemaChange schemaChange = SchemaChange.addColumn("new_col", 
DataTypes.STRING());
+        assertThatThrownBy(() -> catalog.alterTable(identifier, schemaChange, 
false))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessage("Only data table support alter table.");
+
+        // drop table
+        catalog.dropTable(identifier, false);
+        assertThatThrownBy(() -> catalog.getTable(identifier))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+
+        // rename table
+        catalog.createTable(identifier, schema, false);
+        Identifier newIdentifier = new Identifier("format_db", "new_format");
+        catalog.renameTable(identifier, newIdentifier, false);
+        assertThatThrownBy(() -> catalog.getTable(identifier))
+                .isInstanceOf(Catalog.TableNotExistException.class);
+        
assertThat(catalog.getTable(newIdentifier)).isInstanceOf(FormatTable.class);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ec485d2eb..cae6e6f0e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -768,7 +768,9 @@ public class FlinkCatalog extends AbstractCatalog {
             throw new TableNotExistException(getName(), tablePath);
         }
 
-        Preconditions.checkArgument(table instanceof FileStoreTable, "Can't 
alter system table.");
+        checkArgument(
+                table instanceof FileStoreTable,
+                "Only support alter data table, but is: " + table.getClass());
         validateAlterTable(toCatalogTable(table), newTable);
         Map<String, Integer> oldTableNonPhysicalColumnIndex =
                 FlinkCatalogPropertiesUtil.nonPhysicalColumns(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
index dc2a0f06b..7c3a13c6f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -89,6 +89,7 @@ public class FlinkGenericCatalogFactory implements 
CatalogFactory {
             ClassLoader cl, Map<String, String> optionMap, String name, 
Catalog flinkCatalog) {
         Options options = Options.fromMap(optionMap);
         options.set(CatalogOptions.METASTORE, "hive");
+        options.set(CatalogOptions.FORMAT_TABLE_ENABLED, false);
         FlinkCatalog paimon =
                 new FlinkCatalog(
                         org.apache.paimon.catalog.CatalogFactory.createCatalog(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 95aff5d84..2e944f930 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.table.FormatTable;
 
-import org.apache.flink.connector.file.table.FileSystemTableFactory;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -30,17 +29,16 @@ import org.apache.flink.table.factories.FactoryUtil;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 
 import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 
 /** A {@link CatalogTable} to represent format table. */
 public class FormatCatalogTable implements CatalogTable {
@@ -83,18 +81,17 @@ public class FormatCatalogTable implements CatalogTable {
     public Map<String, String> getOptions() {
         if (cachedOptions == null) {
             cachedOptions = new HashMap<>();
-            FileSystemTableFactory fileSystemFactory = new 
FileSystemTableFactory();
-            Set<String> validOptions = new HashSet<>();
-            fileSystemFactory.requiredOptions().forEach(o -> 
validOptions.add(o.key()));
-            fileSystemFactory.optionalOptions().forEach(o -> 
validOptions.add(o.key()));
             String format = table.format().name().toLowerCase();
-            table.options()
-                    .forEach(
-                            (k, v) -> {
-                                if (validOptions.contains(k) || 
k.startsWith(format + ".")) {
-                                    cachedOptions.put(k, v);
-                                }
-                            });
+            Map<String, String> options = table.options();
+            options.forEach(
+                    (k, v) -> {
+                        if (k.startsWith(format + ".")) {
+                            cachedOptions.put(k, v);
+                        }
+                    });
+            if (options.containsKey(FIELD_DELIMITER.key())) {
+                cachedOptions.put("csv.field-delimiter", 
options.get(FIELD_DELIMITER.key()));
+            }
             cachedOptions.put(CONNECTOR.key(), "filesystem");
             cachedOptions.put(PATH.key(), table.location());
             cachedOptions.put(FORMAT.key(), format);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 9c1a2f4e3..439cdf958 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -189,7 +189,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
     public void testChangeTableInSystemDatabase() {
         sql("USE sys");
         assertThatCode(() -> sql("ALTER TABLE all_table_options SET 
('bucket-num' = '5')"))
-                .hasRootCauseMessage("Can't alter system table.");
+                .rootCause()
+                .hasMessageContaining("Only support alter data table, but is: 
");
     }
 
     @Test
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ce1607e8d..7c9bbde95 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.CatalogTableType;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -97,13 +98,13 @@ import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
-import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
 import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
 import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
 import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
 import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
 import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -111,6 +112,7 @@ import static 
org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
 
 /** A catalog implementation for Hive. */
@@ -172,8 +174,8 @@ public class HiveCatalog extends AbstractCatalog {
         this.clients = new CachedClientPool(hiveConf, options, 
clientClassName);
     }
 
-    private boolean formatTableEnabled() {
-        return options.get(FORMAT_TABLE_ENABLED);
+    private boolean formatTableDisabled() {
+        return !options.get(FORMAT_TABLE_ENABLED);
     }
 
     @Override
@@ -607,7 +609,7 @@ public class HiveCatalog extends AbstractCatalog {
         } catch (TableNotExistException ignore) {
         }
 
-        if (!formatTableEnabled()) {
+        if (formatTableDisabled()) {
             throw new TableNotExistException(identifier);
         }
 
@@ -620,7 +622,7 @@ public class HiveCatalog extends AbstractCatalog {
 
     @Override
     public void createFormatTable(Identifier identifier, Schema schema) {
-        if (!formatTableEnabled()) {
+        if (formatTableDisabled()) {
             throw new UnsupportedOperationException(
                     "Format table is not enabled for " + identifier);
         }
@@ -641,7 +643,7 @@ public class HiveCatalog extends AbstractCatalog {
                         schema.comment());
         try {
             Path location = getTableLocation(identifier, null);
-            Table hiveTable = createHiveTable(identifier, newSchema, location);
+            Table hiveTable = createHiveFormatTable(identifier, newSchema, 
location);
             clients.execute(client -> client.createTable(hiveTable));
         } catch (Exception e) {
             // we don't need to delete directories since HMS will roll back db 
and fs if failed.
@@ -727,12 +729,10 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     private Table createHiveTable(Identifier identifier, TableSchema 
tableSchema, Path location) {
+        checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) != 
FORMAT_TABLE);
+
         Map<String, String> tblProperties;
-        String provider = PAIMON_TABLE_TYPE_VALUE;
-        if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) {
-            provider = tableSchema.options().get(FILE_FORMAT.key());
-        }
-        if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) {
+        if (syncAllProperties()) {
             tblProperties = new HashMap<>(tableSchema.options());
 
             // add primary-key, partition-key to tblproperties
@@ -748,8 +748,32 @@ public class HiveCatalog extends AbstractCatalog {
             }
         }
 
+        Table table = newHmsTable(identifier, tblProperties, 
PAIMON_TABLE_TYPE_VALUE);
+        updateHmsTable(table, identifier, tableSchema, 
PAIMON_TABLE_TYPE_VALUE, location);
+        return table;
+    }
+
+    private Table createHiveFormatTable(
+            Identifier identifier, TableSchema tableSchema, Path location) {
+        Options options = Options.fromMap(tableSchema.options());
+        checkArgument(options.get(TYPE) == FORMAT_TABLE);
+
+        String provider = tableSchema.options().get(FILE_FORMAT.key());
+        checkNotNull(provider, FILE_FORMAT.key() + " should be configured.");
+        // valid supported format
+        FormatTable.Format.valueOf(provider.toUpperCase());
+
+        Map<String, String> tblProperties = new HashMap<>();
+
         Table table = newHmsTable(identifier, tblProperties, provider);
         updateHmsTable(table, identifier, tableSchema, provider, location);
+
+        if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
+            table.getSd()
+                    .getSerdeInfo()
+                    .getParameters()
+                    .put(FIELD_DELIM, options.get(FIELD_DELIMITER));
+        }
         return table;
     }
 
@@ -796,6 +820,11 @@ public class HiveCatalog extends AbstractCatalog {
     @Override
     protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
+        Table table = getHmsTable(identifier);
+        if (!isPaimonTable(identifier, table)) {
+            throw new UnsupportedOperationException("Only data table support 
alter table.");
+        }
+
         final SchemaManager schemaManager = schemaManager(identifier, 
getTableLocation(identifier));
         // first commit changes to underlying files
         TableSchema schema = schemaManager.commitChanges(changes);
@@ -805,12 +834,6 @@ public class HiveCatalog extends AbstractCatalog {
             return;
         }
         try {
-            Table table =
-                    clients.run(
-                            client ->
-                                    client.getTable(
-                                            identifier.getDatabaseName(),
-                                            identifier.getTableName()));
             alterTableToHms(table, identifier, schema);
         } catch (Exception te) {
             schemaManager.deleteSchema(schema.id());
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
index c74fa447e..38f73bc6b 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
@@ -85,14 +85,5 @@ public final class HiveCatalogOptions {
                                                             + "E.g. specifying 
\"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with 
different default catalog wouldn't share the same client pool. Multiple conf 
elements can be specified."))
                                     .build());
 
-    public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
-            ConfigOptions.key("format-table.enabled")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to support format tables, format table 
corresponds to a regular Hive table, allowing read and write operations. "
-                                    + "However, during these processes, it 
does not connect to the metastore; hence, newly added partitions will not be 
reflected in"
-                                    + " the metastore and need to be manually 
added as separate partition operations.");
-
     private HiveCatalogOptions() {}
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 0c88107c0..fef2d3952 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTable.Format;
 import org.apache.paimon.types.DataType;
@@ -38,9 +37,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
-import static org.apache.paimon.CoreOptions.FILE_FORMAT;
-import static org.apache.paimon.CoreOptions.TYPE;
-import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
 import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
 
@@ -57,32 +53,25 @@ class HiveTableUtils {
         RowType rowType = createRowType(hiveTable);
         String comment = options.remove(COMMENT_PROP);
         String location = hiveTable.getSd().getLocation();
+
         Format format;
         SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo();
-        if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) {
-            format = 
Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase());
-            if (format.equals(Format.CSV)) {
-                options.put(
-                        FIELD_DELIMITER.key(),
-                        serdeInfo.getParameters().getOrDefault(FIELD_DELIM, 
"\u0001"));
-            }
+        String serLib = serdeInfo.getSerializationLib().toLowerCase();
+        String inputFormat = hiveTable.getSd().getInputFormat();
+        if (serLib.contains("parquet")) {
+            format = Format.PARQUET;
+        } else if (serLib.contains("orc")) {
+            format = Format.ORC;
+        } else if (inputFormat.contains("Text")) {
+            format = Format.CSV;
+            // hive default field delimiter is '\u0001'
+            options.put(
+                    FIELD_DELIMITER.key(),
+                    serdeInfo.getParameters().getOrDefault(FIELD_DELIM, 
"\u0001"));
         } else {
-            String serLib = serdeInfo.getSerializationLib().toLowerCase();
-            String inputFormat = hiveTable.getSd().getInputFormat();
-            if (serLib.contains("parquet")) {
-                format = Format.PARQUET;
-            } else if (serLib.contains("orc")) {
-                format = Format.ORC;
-            } else if (inputFormat.contains("Text")) {
-                format = Format.CSV;
-                // hive default field delimiter is '\u0001'
-                options.put(
-                        FIELD_DELIMITER.key(),
-                        serdeInfo.getParameters().getOrDefault(FIELD_DELIM, 
"\u0001"));
-            } else {
-                throw new UnsupportedOperationException("Unsupported table: " 
+ hiveTable);
-            }
+            throw new UnsupportedOperationException("Unsupported table: " + 
hiveTable);
         }
+
         return FormatTable.builder()
                 .identifier(identifier)
                 .rowType(rowType)
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index cb902605c..0f1218aeb 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -273,4 +273,9 @@ public class HiveCatalogTest extends CatalogTestBase {
     protected boolean supportsView() {
         return true;
     }
+
+    @Override
+    protected boolean supportsFormatTable() {
+        return true;
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
index c24e4a608..8a4745a09 100644
--- 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
@@ -113,7 +113,7 @@ public class Hive23CatalogITCase extends 
HiveCatalogITCaseBase {
     }
 
     @Test
-    public void testCreateExistTableInHive() throws Exception {
+    public void testCreateExistTableInHive() {
         tEnv.executeSql(
                 String.join(
                         "\n",
@@ -133,7 +133,6 @@ public class Hive23CatalogITCase extends 
HiveCatalogITCaseBase {
                                 tEnv.executeSql(
                                                 "CREATE TABLE hive_table(a 
INT, b INT, c INT, d INT)")
                                         .await())
-                .isInstanceOf(TableException.class)
                 .hasMessage(
                         "Could not execute CreateTable in path 
`my_hive_custom_client`.`test_db`.`hive_table`");
         assertThat(
diff --git 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
index 1a77723e9..48d41d27e 100644
--- 
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
@@ -133,7 +133,6 @@ public class Hive31CatalogITCase extends 
HiveCatalogITCaseBase {
                                 tEnv.executeSql(
                                                 "CREATE TABLE hive_table(a 
INT, b INT, c INT, d INT)")
                                         .await())
-                .isInstanceOf(TableException.class)
                 .hasMessage(
                         "Could not execute CreateTable in path 
`my_hive_custom_client`.`test_db`.`hive_table`");
         assertThat(
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
index cd748c9ac..fc58ad595 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
@@ -44,7 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for using Paimon {@link HiveCatalog}. */
@@ -147,7 +147,7 @@ public abstract class HiveCatalogFormatTableITCaseBase {
     @Test
     public void testFlinkCreateFormatTableWithDelimiter() throws Exception {
         tEnv.executeSql(
-                "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with 
('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')");
+                "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with 
('type'='format-table', 'file.format'='csv', 'field-delimiter'=';')");
         doTestFormatTable("flink_csv_table_delimiter");
     }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 37601f4f8..74d2d7e1c 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -39,7 +39,6 @@ import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -275,7 +274,8 @@ public abstract class HiveCatalogITCaseBase {
                 .await();
         tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH ( 
'file.format' = 'avro' )")
                 .await();
-        assertThat(collect("SHOW 
TABLES")).isEqualTo(Arrays.asList(Row.of("s"), Row.of("t")));
+        assertThat(collect("SHOW TABLES"))
+                .containsExactlyInAnyOrder(Row.of("s"), Row.of("t"), 
Row.of("hive_table"));
 
         tEnv.executeSql(
                         "CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH 
( 'file.format' = 'avro' )")
@@ -294,17 +294,14 @@ public abstract class HiveCatalogITCaseBase {
         Path tablePath = new Path(path, "test_db.db/s");
         assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
         tEnv.executeSql("DROP TABLE s").await();
-        assertThat(collect("SHOW 
TABLES")).isEqualTo(Collections.singletonList(Row.of("t")));
+        assertThat(collect("SHOW TABLES"))
+                .containsExactlyInAnyOrder(Row.of("t"), Row.of("hive_table"));
         assertThat(tablePath.getFileSystem().exists(tablePath)).isFalse();
         tEnv.executeSql("DROP TABLE IF EXISTS s").await();
         assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE s").await())
                 .isInstanceOf(ValidationException.class)
                 .hasMessage("Table with identifier 'my_hive.test_db.s' does 
not exist.");
 
-        assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE 
hive_table").await())
-                .isInstanceOf(ValidationException.class)
-                .hasMessage("Table with identifier 
'my_hive.test_db.hive_table' does not exist.");
-
         // alter table
         tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' = 
'16MB' )").await();
         List<Row> actual = collect("SHOW CREATE TABLE t");
@@ -329,9 +326,9 @@ public abstract class HiveCatalogITCaseBase {
                                 tEnv.executeSql(
                                                 "ALTER TABLE hive_table SET ( 
'manifest.target-file-size' = '16MB' )")
                                         .await())
-                .isInstanceOf(RuntimeException.class)
+                .rootCause()
                 .hasMessage(
-                        "Table `my_hive`.`test_db`.`hive_table` doesn't exist 
or is a temporary table.");
+                        "Only support alter data table, but is: class 
org.apache.paimon.table.FormatTable$FormatTableImpl");
     }
 
     @Test
@@ -656,15 +653,6 @@ public abstract class HiveCatalogITCaseBase {
                         Arrays.asList(
                                 
"true\t1\t1\t1\t1234567890123456789\t1.23\t3.14159\t1234.56\tABC\tv1\tHello, 
World!\t01\t010203\t2023-01-01\t2023-01-01 
12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t1",
                                 
"false\t2\t2\t2\t234567890123456789\t2.34\t2.111111\t2345.67\tDEF\tv2\tApache 
Paimon\t04\t040506\t2023-02-01\t2023-02-01 
12:00:00.456\t[\"value4\",\"value5\",\"value6\"]\tvalue4\tvalue11\tvalue22\t{\"f0\":\"v2\",\"f1\":2}\tv2\t2"));
-
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                                "INSERT INTO hive_table VALUES 
(1, 'Hi'), (2, 'Hello')")
-                                        .await())
-                .isInstanceOf(TableException.class)
-                .hasMessage(
-                        "Cannot find table '`my_hive`.`test_db`.`hive_table`' 
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 2e4a2eaec..b500da8f1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.spark.catalog.SparkBaseCatalog;
 import org.apache.paimon.spark.catalog.SupportFunction;
 import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.FormatTableOptions;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -511,8 +512,11 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction {
         StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
         List<String> pathList = new ArrayList<>();
         pathList.add(formatTable.location());
-        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(formatTable.options());
+        Options options = Options.fromMap(formatTable.options());
+        CaseInsensitiveStringMap dsOptions = new 
CaseInsensitiveStringMap(options.toMap());
         if (formatTable.format() == FormatTable.Format.CSV) {
+            options.set("sep", 
options.get(FormatTableOptions.FIELD_DELIMITER));
+            dsOptions = new CaseInsensitiveStringMap(options.toMap());
             return new CSVTable(
                     ident.name(),
                     SparkSession.active(),
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 12407f261..4741bfd00 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -317,6 +317,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
                 options.put(METASTORE.key(), metastore);
             }
         }
+        options.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false");
         String sessionCatalogDefaultDatabase = 
SQLConfUtils.defaultDatabase(sqlConf);
         if (options.containsKey(DEFAULT_DATABASE.key())) {
             String userDefineDefaultDatabase = 
options.get(DEFAULT_DATABASE.key());
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 04159b94a..d42e84b9c 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.TestHiveMetastore;
 
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -28,12 +29,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.FileNotFoundException;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base tests for spark read. */
 public class SparkCatalogWithHiveTest {
+
     private static TestHiveMetastore testHiveMetastore;
 
     private static final int PORT = 9087;
@@ -51,7 +54,6 @@ public class SparkCatalogWithHiveTest {
 
     @Test
     public void testCreateFormatTable(@TempDir java.nio.file.Path tempDir) {
-        // firstly, we use hive metastore to create table, and check the 
result.
         Path warehousePath = new Path("file:" + tempDir.toString());
         SparkSession spark =
                 SparkSession.builder()
@@ -73,6 +75,9 @@ public class SparkCatalogWithHiveTest {
 
         spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
         spark.sql("USE spark_catalog.my_db1");
+
+        // test orc table
+
         spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c 
STRING) USING orc");
 
         assertThat(
@@ -80,35 +85,30 @@ public class SparkCatalogWithHiveTest {
                                 .map(s -> s.get(1))
                                 .map(Object::toString))
                 .containsExactlyInAnyOrder("table_orc");
-        spark.close();
 
-        SparkSession spark1 =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
-                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
-                        .config(
-                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                                "thrift://localhost:" + PORT)
-                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
-                        .config(
-                                "spark.sql.catalog.spark_catalog.warehouse",
-                                warehousePath.toString())
-                        .master("local[2]")
-                        .getOrCreate();
-        spark1.sql("USE spark_catalog.my_db1");
         assertThat(
-                        spark1.sql("EXPLAIN EXTENDED SELECT * from 
table_orc").collectAsList()
+                        spark.sql("EXPLAIN EXTENDED SELECT * from 
table_orc").collectAsList()
                                 .stream()
                                 .map(s -> s.get(0))
                                 .map(Object::toString)
                                 .filter(s -> s.contains("OrcScan"))
                                 .count())
                 .isGreaterThan(0);
-        spark1.close();
+
+        // test csv table
+
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c 
STRING) USING csv OPTIONS ('field-delimiter' ';')");
+        spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, 
'2')").collect();
+        assertThat(spark.sql("DESCRIBE FORMATTED 
table_csv").collectAsList().toString())
+                .contains("sep=;");
+        assertThat(
+                        spark.sql("SELECT * FROM 
table_csv").collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+
+        spark.close();
     }
 
     @Test


Reply via email to