This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4cdaf66df [hive] Enable Format Table by default (#4461)
4cdaf66df is described below
commit 4cdaf66dfbe492d7b6d3c18fbd251b19d39c1f7a
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 7 15:24:25 2024 +0800
[hive] Enable Format Table by default (#4461)
---
.../generated/catalog_configuration.html | 6 +++
.../generated/hive_catalog_configuration.html | 6 ---
.../org/apache/paimon/options/CatalogOptions.java | 9 ++++
.../apache/paimon/table/FormatTableOptions.java | 5 +-
.../org/apache/paimon/catalog/CatalogTestBase.java | 47 ++++++++++++++++++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 4 +-
.../paimon/flink/FlinkGenericCatalogFactory.java | 1 +
.../apache/paimon/flink/FormatCatalogTable.java | 25 +++++-----
.../apache/paimon/flink/CatalogTableITCase.java | 3 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 57 +++++++++++++++-------
.../org/apache/paimon/hive/HiveCatalogOptions.java | 9 ----
.../org/apache/paimon/hive/HiveTableUtils.java | 41 ++++++----------
.../org/apache/paimon/hive/HiveCatalogTest.java | 5 ++
.../apache/paimon/hive/Hive23CatalogITCase.java | 3 +-
.../apache/paimon/hive/Hive31CatalogITCase.java | 1 -
.../hive/HiveCatalogFormatTableITCaseBase.java | 4 +-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 24 +++------
.../java/org/apache/paimon/spark/SparkCatalog.java | 6 ++-
.../apache/paimon/spark/SparkGenericCatalog.java | 1 +
.../paimon/spark/SparkCatalogWithHiveTest.java | 44 ++++++++---------
20 files changed, 179 insertions(+), 122 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index 8954e898f..3686fa20c 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -74,6 +74,12 @@ under the License.
<td>Integer</td>
<td>Configure the size of the connection pool.</td>
</tr>
+ <tr>
+ <td><h5>format-table.enabled</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>Whether to support format tables, format table corresponds to
a regular csv, parquet or orc table, allowing read and write operations.
However, during these processes, it does not connect to the metastore; hence,
newly added partitions will not be reflected in the metastore and need to be
manually added as separate partition operations.</td>
+ </tr>
<tr>
<td><h5>lineage-meta</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
index 076a46232..e0257d301 100644
--- a/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/hive_catalog_configuration.html
@@ -39,12 +39,6 @@ under the License.
<td>String</td>
<td>Specify client cache key, multiple elements separated by
commas.<br /><ul><li>"ugi": the Hadoop UserGroupInformation instance that
represents the current user using the cache.</li></ul><ul><li>"user_name"
similar to UGI but only includes the user's name determined by
UserGroupInformation#getUserName.</li></ul><ul><li>"conf": name of an arbitrary
configuration. The value of the configuration will be extracted from catalog
properties and added to the cache key. A conf elem [...]
</tr>
- <tr>
- <td><h5>format-table.enabled</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to support format tables, format table corresponds to
a regular Hive table, allowing read and write operations. However, during these
processes, it does not connect to the metastore; hence, newly added partitions
will not be reflected in the metastore and need to be manually added as
separate partition operations.</td>
- </tr>
<tr>
<td><h5>hadoop-conf-dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index ace4daf5e..6ad9f3350 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -156,4 +156,13 @@ public class CatalogOptions {
.booleanType()
.defaultValue(false)
.withDescription("Sync all table properties to hive
metastore");
+
+ public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
+ ConfigOptions.key("format-table.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to support format tables, format table
corresponds to a regular csv, parquet or orc table, allowing read and write
operations. "
+ + "However, during these processes, it
does not connect to the metastore; hence, newly added partitions will not be
reflected in"
+ + " the metastore and need to be manually
added as separate partition operations.");
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
index 64f134d07..b4010209c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTableOptions.java
@@ -25,8 +25,9 @@ import org.apache.paimon.options.ConfigOptions;
public class FormatTableOptions {
public static final ConfigOption<String> FIELD_DELIMITER =
- ConfigOptions.key("csv.field-delimiter")
+ ConfigOptions.key("field-delimiter")
.stringType()
.defaultValue(",")
- .withDescription("Optional field delimiter character (','
by default)");
+ .withDescription(
+ "Optional field delimiter character for CSV (','
by default).");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 27992b56f..dbeedcfe5 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
@@ -885,4 +886,50 @@ public abstract class CatalogTestBase {
assertThatThrownBy(() -> catalog.dropView(newIdentifier, false))
.isInstanceOf(Catalog.ViewNotExistException.class);
}
+
+ protected boolean supportsFormatTable() {
+ return false;
+ }
+
+ @Test
+ public void testFormatTable() throws Exception {
+ if (!supportsFormatTable()) {
+ return;
+ }
+
+ Identifier identifier = new Identifier("format_db", "my_format");
+ catalog.createDatabase(identifier.getDatabaseName(), false);
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .column("str", DataTypes.STRING())
+ .column("int", DataTypes.INT())
+ .option("type", "format-table")
+ .option("file.format", "csv")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ assertThat(catalog.listTables(identifier.getDatabaseName()))
+ .contains(identifier.getTableName());
+
assertThat(catalog.getTable(identifier)).isInstanceOf(FormatTable.class);
+
+ // alter table
+ SchemaChange schemaChange = SchemaChange.addColumn("new_col",
DataTypes.STRING());
+ assertThatThrownBy(() -> catalog.alterTable(identifier, schemaChange,
false))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Only data table support alter table.");
+
+ // drop table
+ catalog.dropTable(identifier, false);
+ assertThatThrownBy(() -> catalog.getTable(identifier))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+
+ // rename table
+ catalog.createTable(identifier, schema, false);
+ Identifier newIdentifier = new Identifier("format_db", "new_format");
+ catalog.renameTable(identifier, newIdentifier, false);
+ assertThatThrownBy(() -> catalog.getTable(identifier))
+ .isInstanceOf(Catalog.TableNotExistException.class);
+
assertThat(catalog.getTable(newIdentifier)).isInstanceOf(FormatTable.class);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index ec485d2eb..cae6e6f0e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -768,7 +768,9 @@ public class FlinkCatalog extends AbstractCatalog {
throw new TableNotExistException(getName(), tablePath);
}
- Preconditions.checkArgument(table instanceof FileStoreTable, "Can't
alter system table.");
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Only support alter data table, but is: " + table.getClass());
validateAlterTable(toCatalogTable(table), newTable);
Map<String, Integer> oldTableNonPhysicalColumnIndex =
FlinkCatalogPropertiesUtil.nonPhysicalColumns(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
index dc2a0f06b..7c3a13c6f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalogFactory.java
@@ -89,6 +89,7 @@ public class FlinkGenericCatalogFactory implements
CatalogFactory {
ClassLoader cl, Map<String, String> optionMap, String name,
Catalog flinkCatalog) {
Options options = Options.fromMap(optionMap);
options.set(CatalogOptions.METASTORE, "hive");
+ options.set(CatalogOptions.FORMAT_TABLE_ENABLED, false);
FlinkCatalog paimon =
new FlinkCatalog(
org.apache.paimon.catalog.CatalogFactory.createCatalog(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
index 95aff5d84..2e944f930 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FormatCatalogTable.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink;
import org.apache.paimon.table.FormatTable;
-import org.apache.flink.connector.file.table.FileSystemTableFactory;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
@@ -30,17 +29,16 @@ import org.apache.flink.table.factories.FactoryUtil;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import static
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
/** A {@link CatalogTable} to represent format table. */
public class FormatCatalogTable implements CatalogTable {
@@ -83,18 +81,17 @@ public class FormatCatalogTable implements CatalogTable {
public Map<String, String> getOptions() {
if (cachedOptions == null) {
cachedOptions = new HashMap<>();
- FileSystemTableFactory fileSystemFactory = new
FileSystemTableFactory();
- Set<String> validOptions = new HashSet<>();
- fileSystemFactory.requiredOptions().forEach(o ->
validOptions.add(o.key()));
- fileSystemFactory.optionalOptions().forEach(o ->
validOptions.add(o.key()));
String format = table.format().name().toLowerCase();
- table.options()
- .forEach(
- (k, v) -> {
- if (validOptions.contains(k) ||
k.startsWith(format + ".")) {
- cachedOptions.put(k, v);
- }
- });
+ Map<String, String> options = table.options();
+ options.forEach(
+ (k, v) -> {
+ if (k.startsWith(format + ".")) {
+ cachedOptions.put(k, v);
+ }
+ });
+ if (options.containsKey(FIELD_DELIMITER.key())) {
+ cachedOptions.put("csv.field-delimiter",
options.get(FIELD_DELIMITER.key()));
+ }
cachedOptions.put(CONNECTOR.key(), "filesystem");
cachedOptions.put(PATH.key(), table.location());
cachedOptions.put(FORMAT.key(), format);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 9c1a2f4e3..439cdf958 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -189,7 +189,8 @@ public class CatalogTableITCase extends CatalogITCaseBase {
public void testChangeTableInSystemDatabase() {
sql("USE sys");
assertThatCode(() -> sql("ALTER TABLE all_table_options SET
('bucket-num' = '5')"))
- .hasRootCauseMessage("Can't alter system table.");
+ .rootCause()
+ .hasMessageContaining("Only support alter data table, but is:
");
}
@Test
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index ce1607e8d..7c9bbde95 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.CatalogTableType;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.FormatTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -97,13 +98,13 @@ import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
-import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
import static org.apache.paimon.hive.HiveTableUtils.convertToFormatTable;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
+import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.apache.paimon.options.CatalogOptions.SYNC_ALL_PROPERTIES;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
@@ -111,6 +112,7 @@ import static
org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.HadoopUtils.addHadoopConfIfFound;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly;
/** A catalog implementation for Hive. */
@@ -172,8 +174,8 @@ public class HiveCatalog extends AbstractCatalog {
this.clients = new CachedClientPool(hiveConf, options,
clientClassName);
}
- private boolean formatTableEnabled() {
- return options.get(FORMAT_TABLE_ENABLED);
+ private boolean formatTableDisabled() {
+ return !options.get(FORMAT_TABLE_ENABLED);
}
@Override
@@ -607,7 +609,7 @@ public class HiveCatalog extends AbstractCatalog {
} catch (TableNotExistException ignore) {
}
- if (!formatTableEnabled()) {
+ if (formatTableDisabled()) {
throw new TableNotExistException(identifier);
}
@@ -620,7 +622,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public void createFormatTable(Identifier identifier, Schema schema) {
- if (!formatTableEnabled()) {
+ if (formatTableDisabled()) {
throw new UnsupportedOperationException(
"Format table is not enabled for " + identifier);
}
@@ -641,7 +643,7 @@ public class HiveCatalog extends AbstractCatalog {
schema.comment());
try {
Path location = getTableLocation(identifier, null);
- Table hiveTable = createHiveTable(identifier, newSchema, location);
+ Table hiveTable = createHiveFormatTable(identifier, newSchema,
location);
clients.execute(client -> client.createTable(hiveTable));
} catch (Exception e) {
// we don't need to delete directories since HMS will roll back db
and fs if failed.
@@ -727,12 +729,10 @@ public class HiveCatalog extends AbstractCatalog {
}
private Table createHiveTable(Identifier identifier, TableSchema
tableSchema, Path location) {
+ checkArgument(Options.fromMap(tableSchema.options()).get(TYPE) !=
FORMAT_TABLE);
+
Map<String, String> tblProperties;
- String provider = PAIMON_TABLE_TYPE_VALUE;
- if (Options.fromMap(tableSchema.options()).get(TYPE) == FORMAT_TABLE) {
- provider = tableSchema.options().get(FILE_FORMAT.key());
- }
- if (syncAllProperties() || !provider.equals(PAIMON_TABLE_TYPE_VALUE)) {
+ if (syncAllProperties()) {
tblProperties = new HashMap<>(tableSchema.options());
// add primary-key, partition-key to tblproperties
@@ -748,8 +748,32 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ Table table = newHmsTable(identifier, tblProperties,
PAIMON_TABLE_TYPE_VALUE);
+ updateHmsTable(table, identifier, tableSchema,
PAIMON_TABLE_TYPE_VALUE, location);
+ return table;
+ }
+
+ private Table createHiveFormatTable(
+ Identifier identifier, TableSchema tableSchema, Path location) {
+ Options options = Options.fromMap(tableSchema.options());
+ checkArgument(options.get(TYPE) == FORMAT_TABLE);
+
+ String provider = tableSchema.options().get(FILE_FORMAT.key());
+ checkNotNull(provider, FILE_FORMAT.key() + " should be configured.");
+ // valid supported format
+ FormatTable.Format.valueOf(provider.toUpperCase());
+
+ Map<String, String> tblProperties = new HashMap<>();
+
Table table = newHmsTable(identifier, tblProperties, provider);
updateHmsTable(table, identifier, tableSchema, provider, location);
+
+ if (FormatTable.Format.CSV.toString().equalsIgnoreCase(provider)) {
+ table.getSd()
+ .getSerdeInfo()
+ .getParameters()
+ .put(FIELD_DELIM, options.get(FIELD_DELIMITER));
+ }
return table;
}
@@ -796,6 +820,11 @@ public class HiveCatalog extends AbstractCatalog {
@Override
protected void alterTableImpl(Identifier identifier, List<SchemaChange>
changes)
throws TableNotExistException, ColumnAlreadyExistException,
ColumnNotExistException {
+ Table table = getHmsTable(identifier);
+ if (!isPaimonTable(identifier, table)) {
+ throw new UnsupportedOperationException("Only data table support
alter table.");
+ }
+
final SchemaManager schemaManager = schemaManager(identifier,
getTableLocation(identifier));
// first commit changes to underlying files
TableSchema schema = schemaManager.commitChanges(changes);
@@ -805,12 +834,6 @@ public class HiveCatalog extends AbstractCatalog {
return;
}
try {
- Table table =
- clients.run(
- client ->
- client.getTable(
- identifier.getDatabaseName(),
- identifier.getTableName()));
alterTableToHms(table, identifier, schema);
} catch (Exception te) {
schemaManager.deleteSchema(schema.id());
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
index c74fa447e..38f73bc6b 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalogOptions.java
@@ -85,14 +85,5 @@ public final class HiveCatalogOptions {
+ "E.g. specifying
\"conf:a.b.c\" will add \"a.b.c\" to the key, and so that configurations with
different default catalog wouldn't share the same client pool. Multiple conf
elements can be specified."))
.build());
- public static final ConfigOption<Boolean> FORMAT_TABLE_ENABLED =
- ConfigOptions.key("format-table.enabled")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to support format tables, format table
corresponds to a regular Hive table, allowing read and write operations. "
- + "However, during these processes, it
does not connect to the metastore; hence, newly added partitions will not be
reflected in"
- + " the metastore and need to be manually
added as separate partition operations.");
-
private HiveCatalogOptions() {}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
index 0c88107c0..fef2d3952 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java
@@ -19,7 +19,6 @@
package org.apache.paimon.hive;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTable.Format;
import org.apache.paimon.types.DataType;
@@ -38,9 +37,6 @@ import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hive.serde.serdeConstants.FIELD_DELIM;
-import static org.apache.paimon.CoreOptions.FILE_FORMAT;
-import static org.apache.paimon.CoreOptions.TYPE;
-import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.catalog.Catalog.COMMENT_PROP;
import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER;
@@ -57,32 +53,25 @@ class HiveTableUtils {
RowType rowType = createRowType(hiveTable);
String comment = options.remove(COMMENT_PROP);
String location = hiveTable.getSd().getLocation();
+
Format format;
SerDeInfo serdeInfo = hiveTable.getSd().getSerdeInfo();
- if (Options.fromMap(options).get(TYPE) == FORMAT_TABLE) {
- format =
Format.valueOf(options.get(FILE_FORMAT.key()).toUpperCase());
- if (format.equals(Format.CSV)) {
- options.put(
- FIELD_DELIMITER.key(),
- serdeInfo.getParameters().getOrDefault(FIELD_DELIM,
"\u0001"));
- }
+ String serLib = serdeInfo.getSerializationLib().toLowerCase();
+ String inputFormat = hiveTable.getSd().getInputFormat();
+ if (serLib.contains("parquet")) {
+ format = Format.PARQUET;
+ } else if (serLib.contains("orc")) {
+ format = Format.ORC;
+ } else if (inputFormat.contains("Text")) {
+ format = Format.CSV;
+ // hive default field delimiter is '\u0001'
+ options.put(
+ FIELD_DELIMITER.key(),
+ serdeInfo.getParameters().getOrDefault(FIELD_DELIM,
"\u0001"));
} else {
- String serLib = serdeInfo.getSerializationLib().toLowerCase();
- String inputFormat = hiveTable.getSd().getInputFormat();
- if (serLib.contains("parquet")) {
- format = Format.PARQUET;
- } else if (serLib.contains("orc")) {
- format = Format.ORC;
- } else if (inputFormat.contains("Text")) {
- format = Format.CSV;
- // hive default field delimiter is '\u0001'
- options.put(
- FIELD_DELIMITER.key(),
- serdeInfo.getParameters().getOrDefault(FIELD_DELIM,
"\u0001"));
- } else {
- throw new UnsupportedOperationException("Unsupported table: "
+ hiveTable);
- }
+ throw new UnsupportedOperationException("Unsupported table: " +
hiveTable);
}
+
return FormatTable.builder()
.identifier(identifier)
.rowType(rowType)
diff --git
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index cb902605c..0f1218aeb 100644
---
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -273,4 +273,9 @@ public class HiveCatalogTest extends CatalogTestBase {
protected boolean supportsView() {
return true;
}
+
+ @Override
+ protected boolean supportsFormatTable() {
+ return true;
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
index c24e4a608..8a4745a09 100644
---
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
+++
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogITCase.java
@@ -113,7 +113,7 @@ public class Hive23CatalogITCase extends
HiveCatalogITCaseBase {
}
@Test
- public void testCreateExistTableInHive() throws Exception {
+ public void testCreateExistTableInHive() {
tEnv.executeSql(
String.join(
"\n",
@@ -133,7 +133,6 @@ public class Hive23CatalogITCase extends
HiveCatalogITCaseBase {
tEnv.executeSql(
"CREATE TABLE hive_table(a
INT, b INT, c INT, d INT)")
.await())
- .isInstanceOf(TableException.class)
.hasMessage(
"Could not execute CreateTable in path
`my_hive_custom_client`.`test_db`.`hive_table`");
assertThat(
diff --git
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
index 1a77723e9..48d41d27e 100644
---
a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
+++
b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/hive/Hive31CatalogITCase.java
@@ -133,7 +133,6 @@ public class Hive31CatalogITCase extends
HiveCatalogITCaseBase {
tEnv.executeSql(
"CREATE TABLE hive_table(a
INT, b INT, c INT, d INT)")
.await())
- .isInstanceOf(TableException.class)
.hasMessage(
"Could not execute CreateTable in path
`my_hive_custom_client`.`test_db`.`hive_table`");
assertThat(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
index cd748c9ac..fc58ad595 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
@@ -44,7 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.paimon.hive.HiveCatalogOptions.FORMAT_TABLE_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.FORMAT_TABLE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for using Paimon {@link HiveCatalog}. */
@@ -147,7 +147,7 @@ public abstract class HiveCatalogFormatTableITCaseBase {
@Test
public void testFlinkCreateFormatTableWithDelimiter() throws Exception {
tEnv.executeSql(
- "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with
('type'='format-table', 'file.format'='csv', 'csv.field-delimiter'=';')");
+ "CREATE TABLE flink_csv_table_delimiter (a INT, b STRING) with
('type'='format-table', 'file.format'='csv', 'field-delimiter'=';')");
doTestFormatTable("flink_csv_table_delimiter");
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 37601f4f8..74d2d7e1c 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -39,7 +39,6 @@ import org.apache.flink.core.fs.Path;
import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -275,7 +274,8 @@ public abstract class HiveCatalogITCaseBase {
.await();
tEnv.executeSql("CREATE TABLE s ( a INT, b STRING ) WITH (
'file.format' = 'avro' )")
.await();
- assertThat(collect("SHOW
TABLES")).isEqualTo(Arrays.asList(Row.of("s"), Row.of("t")));
+ assertThat(collect("SHOW TABLES"))
+ .containsExactlyInAnyOrder(Row.of("s"), Row.of("t"),
Row.of("hive_table"));
tEnv.executeSql(
"CREATE TABLE IF NOT EXISTS s ( a INT, b STRING ) WITH
( 'file.format' = 'avro' )")
@@ -294,17 +294,14 @@ public abstract class HiveCatalogITCaseBase {
Path tablePath = new Path(path, "test_db.db/s");
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
tEnv.executeSql("DROP TABLE s").await();
- assertThat(collect("SHOW
TABLES")).isEqualTo(Collections.singletonList(Row.of("t")));
+ assertThat(collect("SHOW TABLES"))
+ .containsExactlyInAnyOrder(Row.of("t"), Row.of("hive_table"));
assertThat(tablePath.getFileSystem().exists(tablePath)).isFalse();
tEnv.executeSql("DROP TABLE IF EXISTS s").await();
assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE s").await())
.isInstanceOf(ValidationException.class)
.hasMessage("Table with identifier 'my_hive.test_db.s' does
not exist.");
- assertThatThrownBy(() -> tEnv.executeSql("DROP TABLE
hive_table").await())
- .isInstanceOf(ValidationException.class)
- .hasMessage("Table with identifier
'my_hive.test_db.hive_table' does not exist.");
-
// alter table
tEnv.executeSql("ALTER TABLE t SET ( 'manifest.target-file-size' =
'16MB' )").await();
List<Row> actual = collect("SHOW CREATE TABLE t");
@@ -329,9 +326,9 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql(
"ALTER TABLE hive_table SET (
'manifest.target-file-size' = '16MB' )")
.await())
- .isInstanceOf(RuntimeException.class)
+ .rootCause()
.hasMessage(
- "Table `my_hive`.`test_db`.`hive_table` doesn't exist
or is a temporary table.");
+ "Only support alter data table, but is: class
org.apache.paimon.table.FormatTable$FormatTableImpl");
}
@Test
@@ -656,15 +653,6 @@ public abstract class HiveCatalogITCaseBase {
Arrays.asList(
"true\t1\t1\t1\t1234567890123456789\t1.23\t3.14159\t1234.56\tABC\tv1\tHello,
World!\t01\t010203\t2023-01-01\t2023-01-01
12:00:00.123\t[\"value1\",\"value2\",\"value3\"]\tvalue1\tvalue1\tvalue2\t{\"f0\":\"v1\",\"f1\":1}\tv1\t1",
"false\t2\t2\t2\t234567890123456789\t2.34\t2.111111\t2345.67\tDEF\tv2\tApache
Paimon\t04\t040506\t2023-02-01\t2023-02-01
12:00:00.456\t[\"value4\",\"value5\",\"value6\"]\tvalue4\tvalue11\tvalue22\t{\"f0\":\"v2\",\"f1\":2}\tv2\t2"));
-
- assertThatThrownBy(
- () ->
- tEnv.executeSql(
- "INSERT INTO hive_table VALUES
(1, 'Hi'), (2, 'Hello')")
- .await())
- .isInstanceOf(TableException.class)
- .hasMessage(
- "Cannot find table '`my_hive`.`test_db`.`hive_table`'
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
}
@Test
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 2e4a2eaec..b500da8f1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -28,6 +28,7 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.spark.catalog.SparkBaseCatalog;
import org.apache.paimon.spark.catalog.SupportFunction;
import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.FormatTableOptions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -511,8 +512,11 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction {
StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
List<String> pathList = new ArrayList<>();
pathList.add(formatTable.location());
- CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(formatTable.options());
+ Options options = Options.fromMap(formatTable.options());
+ CaseInsensitiveStringMap dsOptions = new
CaseInsensitiveStringMap(options.toMap());
if (formatTable.format() == FormatTable.Format.CSV) {
+ options.set("sep",
options.get(FormatTableOptions.FIELD_DELIMITER));
+ dsOptions = new CaseInsensitiveStringMap(options.toMap());
return new CSVTable(
ident.name(),
SparkSession.active(),
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 12407f261..4741bfd00 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -317,6 +317,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
options.put(METASTORE.key(), metastore);
}
}
+ options.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false");
String sessionCatalogDefaultDatabase =
SQLConfUtils.defaultDatabase(sqlConf);
if (options.containsKey(DEFAULT_DATABASE.key())) {
String userDefineDefaultDatabase =
options.get(DEFAULT_DATABASE.key());
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 04159b94a..d42e84b9c 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -28,12 +29,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.FileNotFoundException;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base tests for spark read. */
public class SparkCatalogWithHiveTest {
+
private static TestHiveMetastore testHiveMetastore;
private static final int PORT = 9087;
@@ -51,7 +54,6 @@ public class SparkCatalogWithHiveTest {
@Test
public void testCreateFormatTable(@TempDir java.nio.file.Path tempDir) {
- // firstly, we use hive metastore to create table, and check the
result.
Path warehousePath = new Path("file:" + tempDir.toString());
SparkSession spark =
SparkSession.builder()
@@ -73,6 +75,9 @@ public class SparkCatalogWithHiveTest {
spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
spark.sql("USE spark_catalog.my_db1");
+
+ // test orc table
+
spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c
STRING) USING orc");
assertThat(
@@ -80,35 +85,30 @@ public class SparkCatalogWithHiveTest {
.map(s -> s.get(1))
.map(Object::toString))
.containsExactlyInAnyOrder("table_orc");
- spark.close();
- SparkSession spark1 =
- SparkSession.builder()
- .config("spark.sql.warehouse.dir",
warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
- .config(
-
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
-
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
- .config(
- "spark.sql.catalog.spark_catalog.warehouse",
- warehousePath.toString())
- .master("local[2]")
- .getOrCreate();
- spark1.sql("USE spark_catalog.my_db1");
assertThat(
- spark1.sql("EXPLAIN EXTENDED SELECT * from
table_orc").collectAsList()
+ spark.sql("EXPLAIN EXTENDED SELECT * from
table_orc").collectAsList()
.stream()
.map(s -> s.get(0))
.map(Object::toString)
.filter(s -> s.contains("OrcScan"))
.count())
.isGreaterThan(0);
- spark1.close();
+
+ // test csv table
+
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('field-delimiter' ';')");
+ spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2,
'2')").collect();
+ assertThat(spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString())
+ .contains("sep=;");
+ assertThat(
+ spark.sql("SELECT * FROM
table_csv").collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
+
+ spark.close();
}
@Test