This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e9f15b2e2 [hive] Support Hive CREATE TABLE (#970)
e9f15b2e2 is described below
commit e9f15b2e29800998d66c79ecc75aaf2125c2067d
Author: Kerwin <[email protected]>
AuthorDate: Fri May 5 19:23:12 2023 +0800
[hive] Support Hive CREATE TABLE (#970)
---
docs/content/engines/hive.md | 84 +++++--
docs/content/engines/overview.md | 2 +-
docs/content/how-to/creating-tables.md | 65 +++++-
.../java/org/apache/paimon/hive/HiveTypeUtils.java | 94 ++++++++
.../java/org/apache/paimon/hive/HiveSchema.java | 101 +++++---
.../org/apache/paimon/hive/PaimonMetaHook.java | 100 +++++++-
.../apache/paimon/hive/PaimonStorageHandler.java | 2 +-
.../org/apache/paimon/hive/CreateTableITCase.java | 254 +++++++++++++++++++++
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 15 ++
.../apache/paimon/hive/HiveTableSchemaTest.java | 104 ++++++---
.../java/org/apache/paimon/hive/HiveTestBase.java | 105 +++++++++
11 files changed, 835 insertions(+), 91 deletions(-)
diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index 5016880a2..1234f0460 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -30,7 +30,7 @@ This documentation is a guide for using Paimon in Hive.
## Version
-Paimon currently supports Hive 2.1, 2.1-cdh-6.3, 2.2, 2.3 and 3.1.
+Paimon currently supports Hive 3.1, 2.3, 2.2, 2.1 and 2.1-cdh-6.3.
## Execution Engine
@@ -82,7 +82,7 @@ NOTE: If you are using HDFS, make sure that the environment
variable `HADOOP_HOM
## Quick Start with Paimon Hive Catalog
-By using paimon Hive catalog, you can create, drop and insert into paimon
tables from Flink. These operations directly affect the corresponding Hive
metastore. Tables created in this way can also be accessed directly from Hive.
+By using paimon Hive catalog, you can create, drop, select and insert into
paimon tables from Flink. These operations directly affect the corresponding
Hive metastore. Tables created in this way can also be accessed directly from
Hive.
**Step 1: Prepare Flink Hive Connector Bundled Jar**
@@ -132,36 +132,41 @@ SELECT * FROM test_table;
*/
```
-**Step 3: Query the Table in Hive**
-
-Run the following Hive SQL in Hive CLI to access the created table.
+Select the paimon table created in hive from flink. [creating a table with
Hive SQL]({{< ref "engines/hive#quick-start-with-hive-table" >}}).
```sql
--- Assume that paimon-hive-connector-<hive-version>-{{< version >}}.jar is
already in auxlib directory.
--- List tables in Hive
--- (you might need to switch to "default" database if you're not there by
default)
+-- Flink SQL CLI
+-- Define paimon Hive catalog
-SHOW TABLES;
+CREATE CATALOG my_hive WITH (
+ 'type' = 'paimon',
+ 'metastore' = 'hive',
+ 'uri' = 'thrift://<hive-metastore-host-name>:<port>',
+ 'warehouse' = '/path/to/table/store/warehouse'
+);
-/*
-OK
-test_table
-*/
+-- Use paimon Hive catalog
--- Read records from test_table
+USE CATALOG my_hive;
-SELECT a, b FROM test_table ORDER BY a;
+-- Read a table in paimon Hive catalog (use "default" database by default)
+-- Read records from hive_test_table
+
+SELECT * FROM hive_test_table;
/*
-OK
-1 Table
-2 Store
++---+--------+
+| a | b |
++---+--------+
+| 1 | Paimon |
++---+--------+
*/
+
```
-## Quick Start with External Table
+## Quick Start with Hive Table
-To access existing paimon table, you can also register them as external tables
in Hive. Run the following Hive SQL in Hive CLI.
+* To access existing paimon table, you can also register them as external and
internal tables in Hive. Run the following Hive SQL in Hive CLI.
```sql
-- Assume that paimon-hive-connector-{{< version >}}.jar is already in auxlib
directory.
@@ -213,6 +218,45 @@ OK
```
+* To create paimon tables that do not exist, you can create them as external
and internal tables in Hive. Run the following Hive SQL in Hive CLI.
+
+```sql
+-- Assume that paimon-hive-connector-{{< version >}}.jar is already in auxlib
directory.
+-- Let's create a new external table that doesn't exist in paimon.
+-- Need to specify the location to the path of table.
+
+CREATE EXTERNAL TABLE hive_test_table(
+ a INT COMMENT 'The a field',
+ b STRING COMMENT 'The b field'
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+LOCATION '/path/to/table/store/warehouse/default.db/hive_test_table';
+
+-- Insert records into hive_test_table
+
+INSERT INTO hive_test_table VALUES (1, 'Paimon');
+
+-- Read records from hive_test_table
+
+SELECT a, b FROM hive_test_table;
+
+/*
+OK
+1 Paimon
+*/
+
+-- Let's create a new internal table that doesn't exist in paimon.
+-- Need to specify the location to the path of table.
+
+CREATE TABLE hive_internal_test_table(
+ a INT COMMENT 'The a field',
+ b STRING COMMENT 'The b field'
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+LOCATION '/path/to/table/store/warehouse/default.db/hive_internal_test_table';
+
+```
+
## Hive Type Conversion
This section lists all supported type conversion between Hive and Flink.
diff --git a/docs/content/engines/overview.md b/docs/content/engines/overview.md
index 16200161a..883bd4a80 100644
--- a/docs/content/engines/overview.md
+++ b/docs/content/engines/overview.md
@@ -35,7 +35,7 @@ Apache Spark and Apache Hive.
| Engine | Version | Feature
| Read Pushdown |
|--------|-----------------------------|--------------------------------------------------------------------------------------|--------------------|
| Flink | 1.17/1.16/1.15/1.14 | batch/streaming read, batch/streaming
write, create/drop table, create/drop database | Projection, Filter |
-| Hive | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read, batch write
| Projection, Filter |
+| Hive | 3.1/2.3/2.2/2.1/2.1 CDH 6.3 | batch read, batch write, create table
| Projection, Filter |
| Spark | 3.4/3.3/3.2/3.1 | batch read, batch write, create/drop
table, create/drop database | Projection, Filter |
| Spark | 2.4 | batch read
| Projection, Filter |
| Trino | 388/358 | batch read, create/drop table,
create/drop database | Projection, Filter |
diff --git a/docs/content/how-to/creating-tables.md
b/docs/content/how-to/creating-tables.md
index cf72ff320..a3391537b 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -65,6 +65,22 @@ CREATE TABLE MyTable (
{{< /tab >}}
+{{< tab "Hive" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id'
+);
+```
+
+{{< /tab >}}
+
{{< /tabs >}}
{{< hint info >}}
@@ -108,6 +124,23 @@ CREATE TABLE MyTable (
{{< /tab >}}
+{{< tab "Hive" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'partition'='dt,hh'
+);
+```
+
+{{< /tab >}}
+
{{< /tabs >}}
{{< hint info >}}
@@ -309,6 +342,25 @@ CREATE TABLE MyTable (
{{< /tab >}}
+{{< tab "Hive" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'partition'='dt,hh',
+ 'bucket' = '2',
+ 'bucket-key' = 'user_id'
+);
+```
+
+{{< /tab >}}
+
{{< /tabs >}}
## Creating External Tables
@@ -363,7 +415,7 @@ val dataset =
spark.read.format("paimon").load("hdfs://path/to/table")
{{< tab "Hive" >}}
-Hive SQL only supports reading from an external table. The following SQL
creates an external table named `my_table`, where the base path of table files
is `hdfs://path/to/table`. As schemas are stored in table files, users do not
need to write column definitions.
+* To access existing paimon table, you can also register them as external and
internal tables in Hive. The following SQL creates an external table named
`my_table`, where the base path of table files is `hdfs://path/to/table`. As
schemas are stored in table files, users do not need to write column
definitions.
```sql
CREATE EXTERNAL TABLE my_table
@@ -371,6 +423,17 @@ STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
LOCATION 'hdfs://path/to/table';
```
+* To create paimon tables that do not exist, you can create them as external
and internal tables in Hive. The following SQL creates an external table named
`my_table`, where the base path of table files is `hdfs://path/to/table`.
+
+```sql
+CREATE EXTERNAL TABLE my_table(
+ a INT COMMENT 'The a field',
+ b STRING COMMENT 'The b field'
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+LOCATION 'hdfs://path/to/table';
+```
+
{{< /tab >}}
{{< /tabs >}}
diff --git
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
index e70b645f3..b2cdf6e75 100644
---
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
+++
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java
@@ -22,20 +22,37 @@ import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.paimon.types.VarBinaryType.MAX_LENGTH;
+
/** Utils for converting types related classes between Paimon and Hive. */
public class HiveTypeUtils {
+ /**
+ * Convert paimon data type {@link DataType} to hive data type {@link
TypeInfo}.
+ *
+ * @param logicalType paimon data type.
+ * @return hive type info.
+ */
public static TypeInfo logicalTypeToTypeInfo(DataType logicalType) {
switch (logicalType.getTypeRoot()) {
case BOOLEAN:
@@ -101,4 +118,81 @@ public class HiveTypeUtils {
"Unsupported logical type " +
logicalType.asSQLString());
}
}
+
+ /**
+ * Convert hive data type {@link TypeInfo} to paimon data type {@link
DataType}.
+ *
+ * @param type hive type string
+ * @return paimon data type
+ */
+ public static DataType typeInfoToLogicalType(String type) {
+ TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(type);
+ return typeInfoToLogicalType(typeInfo);
+ }
+
+ /**
+ * Convert hive data type {@link TypeInfo} to paimon data type {@link
DataType}.
+ *
+ * @param typeInfo hive type info
+ * @return paimon data type
+ */
+ public static DataType typeInfoToLogicalType(TypeInfo typeInfo) {
+ if (TypeInfoFactory.booleanTypeInfo.equals(typeInfo)) {
+ return DataTypes.BOOLEAN();
+ } else if (TypeInfoFactory.byteTypeInfo.equals(typeInfo)) {
+ return DataTypes.TINYINT();
+ } else if (TypeInfoFactory.shortTypeInfo.equals(typeInfo)) {
+ return DataTypes.SMALLINT();
+ } else if (TypeInfoFactory.intTypeInfo.equals(typeInfo)) {
+ return DataTypes.INT();
+ } else if (TypeInfoFactory.longTypeInfo.equals(typeInfo)) {
+ return DataTypes.BIGINT();
+ } else if (TypeInfoFactory.floatTypeInfo.equals(typeInfo)) {
+ return DataTypes.FLOAT();
+ } else if (TypeInfoFactory.doubleTypeInfo.equals(typeInfo)) {
+ return DataTypes.DOUBLE();
+ } else if (typeInfo instanceof DecimalTypeInfo) {
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
+ return DataTypes.DECIMAL(decimalTypeInfo.getPrecision(),
decimalTypeInfo.getScale());
+ } else if (typeInfo instanceof CharTypeInfo) {
+ return DataTypes.CHAR(((CharTypeInfo) typeInfo).getLength());
+ } else if (typeInfo instanceof VarcharTypeInfo) {
+ return DataTypes.VARCHAR(((VarcharTypeInfo) typeInfo).getLength());
+ } else if (TypeInfoFactory.stringTypeInfo.equals(typeInfo)) {
+ return DataTypes.VARCHAR(VarCharType.MAX_LENGTH);
+ } else if (TypeInfoFactory.binaryTypeInfo.equals(typeInfo)) {
+ return DataTypes.VARBINARY(MAX_LENGTH);
+ } else if (TypeInfoFactory.dateTypeInfo.equals(typeInfo)) {
+ return DataTypes.DATE();
+ } else if (TypeInfoFactory.timestampTypeInfo.equals(typeInfo)) {
+ return DataTypes.TIMESTAMP_MILLIS();
+ } else if (typeInfo instanceof ListTypeInfo) {
+ ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
+ return
DataTypes.ARRAY(typeInfoToLogicalType(listTypeInfo.getListElementTypeInfo()));
+ } else if (typeInfo instanceof MapTypeInfo) {
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
+ return DataTypes.MAP(
+ typeInfoToLogicalType(mapTypeInfo.getMapKeyTypeInfo()),
+ typeInfoToLogicalType(mapTypeInfo.getMapValueTypeInfo()));
+ } else if (typeInfo instanceof StructTypeInfo) {
+ StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
+ ArrayList<String> fieldNames =
structTypeInfo.getAllStructFieldNames();
+ ArrayList<TypeInfo> typeInfos =
structTypeInfo.getAllStructFieldTypeInfos();
+
+ int highestFieldId = -1;
+ DataField[] dataFields = new DataField[fieldNames.size()];
+ for (int i = 0; i < fieldNames.size(); i++) {
+ dataFields[i] =
+ new DataField(
+ ++highestFieldId,
+ fieldNames.get(i),
+ typeInfoToLogicalType(typeInfos.get(i)),
+ "");
+ }
+
+ return DataTypes.ROW(dataFields);
+ }
+
+ throw new UnsupportedOperationException("Unsupported hive type " +
typeInfo.getTypeName());
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index 5be40daf6..dba45389a 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -20,13 +20,18 @@ package org.apache.paimon.hive;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.base.Splitter;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -34,26 +39,30 @@ import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
+import static org.apache.paimon.hive.HiveTypeUtils.typeInfoToLogicalType;
+
/** Column names, types and comments of a Hive table. */
public class HiveSchema {
- private final TableSchema tableSchema;
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HiveSchema.class);
private final RowType rowType;
- private HiveSchema(TableSchema tableSchema) {
- this.tableSchema = tableSchema;
- this.rowType = new RowType(tableSchema.fields());
+ private HiveSchema(RowType rowType) {
+ this.rowType = rowType;
}
public RowType rowType() {
@@ -61,19 +70,19 @@ public class HiveSchema {
}
public List<String> fieldNames() {
- return tableSchema.fieldNames();
+ return rowType.getFieldNames();
}
public List<DataType> fieldTypes() {
- return tableSchema.logicalRowType().getFieldTypes();
+ return rowType.getFieldTypes();
}
public List<DataField> fields() {
- return tableSchema.fields();
+ return rowType.getFields();
}
public List<String> fieldComments() {
- return tableSchema.fields().stream()
+ return rowType.getFields().stream()
.map(DataField::description)
.collect(Collectors.toList());
}
@@ -86,34 +95,62 @@ public class HiveSchema {
throw new UnsupportedOperationException(
"Location property is missing for table "
+ tableName
- + ". Currently Paimon only supports external table
for Hive "
- + "so location property must be set.");
+ + ". Currently Paimon only supports hive table
location property must be set.");
}
Path path = new Path(location);
Options options = PaimonJobConf.extractCatalogConfig(configuration);
- options.set(CoreOptions.PATH, path.toUri().toString());
- CatalogContext catalogContext = CatalogContext.create(options,
configuration);
- TableSchema tableSchema =
FileStoreTableFactory.create(catalogContext).schema();
-
- if (properties.containsKey(serdeConstants.LIST_COLUMNS)
- && properties.containsKey(serdeConstants.LIST_COLUMN_TYPES)) {
- String columnNames =
properties.getProperty(serdeConstants.LIST_COLUMNS);
- String columnNameDelimiter =
- properties.getProperty(
- // serdeConstants.COLUMN_NAME_DELIMITER is not
defined in earlier Hive
- // versions, so we use a constant string instead
- "column.name.delimite",
String.valueOf(SerDeUtils.COMMA));
- List<String> names =
Arrays.asList(columnNames.split(columnNameDelimiter));
-
- String columnTypes =
properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
- List<TypeInfo> typeInfos =
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
-
- if (names.size() > 0 && typeInfos.size() > 0) {
- checkSchemaMatched(names, typeInfos, tableSchema);
+ options.set(CoreOptions.PATH, location);
+ CatalogContext context = CatalogContext.create(options, configuration);
+ Optional<TableSchema> tableSchema;
+ try {
+ tableSchema = new SchemaManager(FileIO.get(path, context),
path).latest();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ String columnProperty =
properties.getProperty(serdeConstants.LIST_COLUMNS);
+ // Create hive external table with empty ddl
+ if (StringUtils.isEmpty(columnProperty)) {
+ if (!tableSchema.isPresent()) {
+ throw new IllegalArgumentException(
+ "Schema file not found in location "
+ + location
+ + ". Please create table first.");
}
+ // Paimon external table can read schema from the specified
location
+ return new HiveSchema(new RowType(tableSchema.get().fields()));
}
- return new HiveSchema(tableSchema);
+ // Create hive external table with ddl
+ String columnNameDelimiter =
+ properties.getProperty(
+ // serdeConstants.COLUMN_NAME_DELIMITER is not defined
in earlier Hive
+ // versions, so we use a constant string instead
+ "column.name.delimite",
String.valueOf(SerDeUtils.COMMA));
+ List<String> columnNames =
Arrays.asList(columnProperty.split(columnNameDelimiter));
+ String columnTypes =
properties.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+ List<TypeInfo> typeInfos =
TypeInfoUtils.getTypeInfosFromTypeString(columnTypes);
+ List<String> comments =
+ Lists.newArrayList(
+
Splitter.on('\0').split(properties.getProperty("columns.comments")));
+ // Both Paimon table schema and Hive table schema exist
+ if (tableSchema.isPresent() && columnNames.size() > 0 &&
typeInfos.size() > 0) {
+ LOG.debug(
+ "Extract schema with exists DDL and exists paimon table,
table location:[{}].",
+ location);
+ checkSchemaMatched(columnNames, typeInfos, tableSchema.get());
+ // Use paimon table column comment when the paimon table exists.
+ comments =
+ tableSchema.get().fields().stream()
+ .map(DataField::description)
+ .collect(Collectors.toList());
+ }
+ RowType.Builder builder = RowType.builder();
+ for (int i = 0; i < columnNames.size(); i++) {
+ builder.field(
+ columnNames.get(i),
typeInfoToLogicalType(typeInfos.get(i)), comments.get(i));
+ }
+ return new HiveSchema(builder.build());
}
private static void checkSchemaMatched(
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index acc37a6d9..10bbc4483 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -18,34 +18,113 @@
package org.apache.paimon.hive;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.mapred.PaimonInputFormat;
import org.apache.paimon.hive.mapred.PaimonOutputFormat;
-import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.paimon.hive.HiveTypeUtils.typeInfoToLogicalType;
/**
* {@link HiveMetaHook} for paimon. Currently this class is only used to set
input and output
* formats.
*/
public class PaimonMetaHook implements HiveMetaHook {
+ private static final Logger LOG =
LoggerFactory.getLogger(PaimonMetaHook.class);
+ private static final String COMMENT = "comment";
+ private final Configuration conf;
+
+ public PaimonMetaHook(Configuration conf) {
+ this.conf = conf;
+ }
@Override
public void preCreateTable(Table table) throws MetaException {
- Preconditions.checkArgument(
- !table.isSetPartitionKeys() ||
table.getPartitionKeys().isEmpty(),
- "Paimon currently does not support creating partitioned table "
- + "with PARTITIONED BY clause. If you want to query
from a partitioned table, "
- + "please add partition columns into the ordinary
table columns.");
+ if (table.getPartitionKeysSize() != 0) {
+ throw new MetaException(
+ "Paimon currently does not support creating partitioned
table "
+ + "with PARTITIONED BY clause. If you want to
query from a partitioned table, "
+ + "please add partition columns into the ordinary
table columns.");
+ }
+
+ // hive ql parse cannot recognize input near '$' in table name, no
need to add paimon system
+ // table verification.
table.getSd().setInputFormat(PaimonInputFormat.class.getCanonicalName());
table.getSd().setOutputFormat(PaimonOutputFormat.class.getCanonicalName());
+
+ Path path = new Path(table.getSd().getLocation());
+ CatalogContext context = catalogContext(table);
+ FileIO fileIO;
+ try {
+ fileIO = FileIO.get(path, context);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ SchemaManager schemaManager = new SchemaManager(fileIO, path);
+ Optional<TableSchema> tableSchema = schemaManager.latest();
+ if (tableSchema.isPresent()) {
+ // paimon table already exists
+ return;
+ }
+ // create paimon table
+ List<FieldSchema> cols = table.getSd().getCols();
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .options(context.options().toMap())
+ .comment(table.getParameters().get(COMMENT));
+ cols.iterator()
+ .forEachRemaining(
+ fieldSchema ->
+ schemaBuilder.column(
+ fieldSchema.getName().toLowerCase(),
+
typeInfoToLogicalType(fieldSchema.getType()),
+ fieldSchema.getComment()));
+ try {
+ schemaManager.createTable(schemaBuilder.build());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
- public void rollbackCreateTable(Table table) throws MetaException {}
+ public void rollbackCreateTable(Table table) throws MetaException {
+ if (!MetaStoreUtils.isExternalTable(table)) {
+ return;
+ }
+
+ // we have created a paimon table, so we delete it to roll back;
+ Path path = new Path(table.getSd().getLocation());
+ CatalogContext context = catalogContext(table);
+ try {
+ FileIO fileIO = FileIO.get(path, context);
+ if (fileIO.exists(path)) {
+ fileIO.deleteDirectoryQuietly(path);
+ }
+ } catch (IOException e) {
+ LOG.error("Delete directory [{}] fail for the paimon table.",
path, e);
+ }
+ }
@Override
public void commitCreateTable(Table table) throws MetaException {}
@@ -58,4 +137,11 @@ public class PaimonMetaHook implements HiveMetaHook {
@Override
public void commitDropTable(Table table, boolean b) throws MetaException {}
+
+ private CatalogContext catalogContext(Table table) {
+ Options options = PaimonJobConf.extractCatalogConfig(conf);
+ options.set(CoreOptions.PATH, table.getSd().getLocation());
+ table.getParameters().forEach(options::set);
+ return CatalogContext.create(options, conf);
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
index 2fe0e9e49..0a6396aeb 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonStorageHandler.java
@@ -65,7 +65,7 @@ public class PaimonStorageHandler implements
HiveStoragePredicateHandler, HiveSt
@Override
public HiveMetaHook getMetaHook() {
- return new PaimonMetaHook();
+ return new PaimonMetaHook(this.conf);
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
new file mode 100644
index 000000000..37f2dfc8d
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.schema.TableSchema;
+
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.parse.ParseException;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** IT cases for testing create managed table ddl. */
+public class CreateTableITCase extends HiveTestBase {
+
+ @Test
+ public void testCreateTableWithEmptyDDLAndNoPaimonTable() {
+ // create table with empty DDL and no paimon table
+ String tableName = "empty_ddl_no_paimon_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'"));
+ assertThatThrownBy(() -> hiveShell.execute(hiveSql))
+ .hasRootCauseInstanceOf(IllegalArgumentException.class)
+ .hasRootCauseMessage(
+ "Schema file not found in location "
+ + path
+ + ". Please create table first.");
+ }
+
+ @Test
+ public void testCreateTableWithEmptyDDLAndExistsPaimonTable() throws
Exception {
+ String tableName = "empty_ddl_exists_paimon_table";
+ // create table with empty DDL and exists paimon table
+ createPaimonTable();
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " ",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'"));
+ assertThatCode(() ->
hiveShell.execute(hiveSql)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateTableWithExistsDDLAndNoPaimonTable() {
+ // create table with exists DDL and no paimon table
+ String tableName = "exists_ddl_no_paimon_table";
+ assertThatCode(() ->
hiveShell.execute(generateDefaultHiveSql(tableName)))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateTableWithExistsDDLAndExistsPaimonTable() throws
Exception {
+ String tableName = "exists_ddl_exists_paimon_table";
+ createPaimonTable();
+ assertThatCode(() ->
hiveShell.execute(generateDefaultHiveSql(tableName)))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateTableWithoutColumnComment() {
+ String tableName = "without_column_comment_table";
+ assertThatCode(() ->
hiveShell.execute(generateDefaultHiveSql(tableName)))
+ .doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateTableIsInternalTable() {
+ String tableName = "internal_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE TABLE " + tableName + " (",
+ "col1 "
+ +
TypeInfoFactory.intTypeInfo.getTypeName()
+ + " COMMENT 'The col1 field'",
+ ")",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'"));
+ assertThatCode(() ->
hiveShell.execute(hiveSql)).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void testCreateTableNotSupportPartitionTable() {
+ String tableName = "not_support_partition_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " (",
+ "col1 "
+ +
TypeInfoFactory.intTypeInfo.getTypeName()
+ + " COMMENT 'The col1 field'",
+ ")",
+ "PARTITIONED BY (dt "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + ")",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'"));
+ assertThatThrownBy(() -> hiveShell.execute(hiveSql))
+ .hasRootCauseInstanceOf(MetaException.class)
+ .hasRootCauseMessage(
+ "Paimon currently does not support creating
partitioned table "
+ + "with PARTITIONED BY clause. If you want to
query from a partitioned table, "
+ + "please add partition columns into the
ordinary table columns.");
+ }
+
+ @Test
+ public void testCreateTableWithPrimaryKey() {
+ String tableName = "primary_key_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " (",
+ "user_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The user_id field',",
+ "item_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The item_id field',",
+ "behavior "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The behavior field',",
+ "dt "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The dt field',",
+ "hh "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The hh field'",
+ ")",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'",
+ "TBLPROPERTIES (",
+ " 'primary-key'='dt,hh,user_id'",
+ ")"));
+ assertThatCode(() ->
hiveShell.execute(hiveSql)).doesNotThrowAnyException();
+ Optional<TableSchema> tableSchema = paimonTableSchema();
+ assertThat(tableSchema).isPresent();
+ assertThat(tableSchema.get().primaryKeys()).contains("dt", "hh",
"user_id");
+ assertThat(tableSchema.get().partitionKeys()).isEmpty();
+ }
+
+ @Test
+ public void testCreateTableWithPartition() {
+ String tableName = "partition_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " (",
+ "user_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The user_id field',",
+ "item_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The item_id field',",
+ "behavior "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The behavior field',",
+ "dt "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The dt field',",
+ "hh "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The hh field'",
+ ")",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'",
+ "TBLPROPERTIES (",
+ " 'partition'='dt,hh'",
+ ")"));
+ assertThatCode(() ->
hiveShell.execute(hiveSql)).doesNotThrowAnyException();
+ Optional<TableSchema> tableSchema = paimonTableSchema();
+ assertThat(tableSchema).isPresent();
+ assertThat(tableSchema.get().partitionKeys()).contains("dt", "hh");
+ assertThat(tableSchema.get().primaryKeys()).isEmpty();
+ }
+
+ @Test
+ public void testCreateTableWithPrimaryKeyAndPartition() {
+ String tableName = "primary_key_partition_table";
+ String hiveSql =
+ String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " (",
+ "user_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The user_id field',",
+ "item_id "
+ +
TypeInfoFactory.longTypeInfo.getTypeName()
+ + " COMMENT 'The item_id field',",
+ "behavior "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The behavior field',",
+ "dt "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The dt field',",
+ "hh "
+ +
TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The hh field'",
+ ")",
+ "STORED BY '" +
PaimonStorageHandler.class.getName() + "'",
+ "LOCATION '" + path + "'",
+ "TBLPROPERTIES (",
+ " 'primary-key'='dt,hh,user_id',",
+ " 'partition'='dt,hh'",
+ ")"));
+ assertThatCode(() ->
hiveShell.execute(hiveSql)).doesNotThrowAnyException();
+ Optional<TableSchema> tableSchema = paimonTableSchema();
+ assertThat(tableSchema).isPresent();
+ assertThat(tableSchema.get().primaryKeys()).contains("dt", "hh",
"user_id");
+ assertThat(tableSchema.get().partitionKeys()).contains("dt", "hh");
+ }
+
+ @Test
+ public void testCreateTableIsPaimonSystemTable() {
+ String tableName = "test$schema";
+ assertThatThrownBy(() ->
hiveShell.execute(generateDefaultHiveSql(tableName)))
+ .hasRootCauseInstanceOf(ParseException.class)
+ .hasMessageContaining(
+ "cannot recognize input near 'test' '$' 'schema' in
table name");
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 3528b4a92..530f856c7 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -264,6 +264,21 @@ public abstract class HiveCatalogITCaseBase {
}
}
+ @Test
+ public void testHiveCreateAndFlinkRead() throws Exception {
+ hiveShell.execute(
+ "CREATE EXTERNAL TABLE hive_test_table ( a INT, b STRING ) "
+ + "STORED BY '"
+ + PaimonStorageHandler.class.getName()
+ + "'"
+ + "LOCATION '"
+ + path
+ + "/test_db.db/hive_test_table'");
+ hiveShell.execute("INSERT INTO hive_test_table VALUES (1, 'Apache'),
(2, 'Paimon')");
+ List<Row> actual = collect("SELECT * FROM hive_test_table");
+ Assertions.assertThat(actual).contains(Row.of(1, "Apache"), Row.of(2,
"Paimon"));
+ }
+
@Test
public void testCreateTableAs() throws Exception {
tEnv.executeSql("CREATE TABLE t (a INT)").await();
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
index 3e9a63620..931b837d6 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTableSchemaTest.java
@@ -36,6 +36,7 @@ import java.util.HashMap;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.junit.jupiter.api.Assertions.assertThrows;
/** Tests for {@link HiveSchema}. */
@@ -51,41 +52,58 @@ public class HiveTableSchemaTest {
@TempDir java.nio.file.Path tempDir;
@Test
- public void testExtractSchema() throws Exception {
+ public void testExtractSchemaWithEmptyDDLAndNoPaimonTable() {
+ // Extract schema with empty DDL and no paimon table
+ Properties tableWithEmptyDDL = createTableWithEmptyDDL();
+
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> HiveSchema.extract(null, tableWithEmptyDDL))
+ .withMessage(
+ "Schema file not found in location "
+ + tempDir.toString()
+ + ". Please create table first.");
+ }
+
+ @Test
+ public void testExtractSchemaWithEmptyDDLAndExistsPaimonTable() throws
Exception {
+ // create a paimon table
createSchema();
+ // Extract schema with empty DDL and exists paimon table
+ Properties tableWithEmptyDDL = createTableWithEmptyDDL();
- Properties properties = new Properties();
- properties.setProperty("columns", "a,b,c");
- properties.setProperty(
- "columns.types",
- String.join(
- ":",
+ HiveSchema schema = HiveSchema.extract(null, tableWithEmptyDDL);
+ assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b",
"c"));
+ assertThat(schema.fieldTypes())
+ .isEqualTo(
Arrays.asList(
- TypeInfoFactory.intTypeInfo.getTypeName(),
- TypeInfoFactory.stringTypeInfo.getTypeName(),
- TypeInfoFactory.getDecimalTypeInfo(5,
3).getTypeName())));
- properties.setProperty("location", tempDir.toString());
+ DataTypes.INT(), DataTypes.STRING(),
DataTypes.DECIMAL(5, 3)));
+ assertThat(schema.fieldComments())
+ .isEqualTo(Arrays.asList("first comment", "second comment",
"last comment"));
+ }
+
+ @Test
+ public void testExtractSchemaWithExistsDDLAndNoPaimonTable() {
+ // Extract schema with exists DDL and no paimon table
+ Properties tableWithExistsDDL = createTableWithExistsDDL();
- HiveSchema schema = HiveSchema.extract(null, properties);
+ HiveSchema schema = HiveSchema.extract(null, tableWithExistsDDL);
assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b",
"c"));
assertThat(schema.fieldTypes())
.isEqualTo(
Arrays.asList(
DataTypes.INT(), DataTypes.STRING(),
DataTypes.DECIMAL(5, 3)));
assertThat(schema.fieldComments())
- .isEqualTo(Arrays.asList("first comment", "second comment",
"last comment"));
+ .isEqualTo(Arrays.asList("col1 comment", "col2 comment", "col3
comment"));
}
@Test
- public void testExtractSchemaWithEmptyDDL() throws Exception {
+ public void testExtractSchemaWithExistsDDLAndExistsPaimonTable() throws
Exception {
+ // create a paimon table
createSchema();
+ // Extract schema with exists DDL and exists paimon table
+ Properties tableWithExistsDDL = createTableWithExistsDDL();
- Properties properties = new Properties();
- properties.setProperty("columns", "");
- properties.setProperty("columns.types", "");
- properties.setProperty("location", tempDir.toString());
-
- HiveSchema schema = HiveSchema.extract(null, properties);
+ HiveSchema schema = HiveSchema.extract(null, tableWithExistsDDL);
assertThat(schema.fieldNames()).isEqualTo(Arrays.asList("a", "b",
"c"));
assertThat(schema.fieldTypes())
.isEqualTo(
@@ -109,6 +127,7 @@ public class HiveTableSchemaTest {
TypeInfoFactory.intTypeInfo.getTypeName(),
TypeInfoFactory.stringTypeInfo.getTypeName(),
TypeInfoFactory.getDecimalTypeInfo(6,
3).getTypeName())));
+ properties.setProperty("columns.comments", "\0\0");
properties.setProperty("location", tempDir.toString());
String expected =
@@ -139,7 +158,7 @@ public class HiveTableSchemaTest {
properties.setProperty("columns", "a");
properties.setProperty("columns.types",
TypeInfoFactory.intTypeInfo.getTypeName());
properties.setProperty("location", tempDir.toString());
-
+ properties.setProperty("columns.comments", "");
String expected =
String.join(
"\n",
@@ -154,10 +173,9 @@ public class HiveTableSchemaTest {
"Field #2",
"Hive DDL : null",
"Paimon Schema: c decimal(5,3)");
- IllegalArgumentException exception =
- assertThrows(
- IllegalArgumentException.class, () ->
HiveSchema.extract(null, properties));
- assertThat(exception).hasMessageContaining(expected);
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> HiveSchema.extract(null, properties))
+ .withMessageContaining(expected);
}
@Test
@@ -176,6 +194,7 @@ public class HiveTableSchemaTest {
TypeInfoFactory.getDecimalTypeInfo(5,
3).getTypeName(),
TypeInfoFactory.intTypeInfo.getTypeName(),
TypeInfoFactory.stringTypeInfo.getTypeName())));
+ properties.setProperty("columns.comments", "\0\0\0\0");
properties.setProperty("location", tempDir.toString());
String expected =
@@ -192,10 +211,9 @@ public class HiveTableSchemaTest {
"Field #4",
"Hive DDL : e string",
"Paimon Schema: null");
- IllegalArgumentException exception =
- assertThrows(
- IllegalArgumentException.class, () ->
HiveSchema.extract(null, properties));
- assertThat(exception).hasMessageContaining(expected);
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> HiveSchema.extract(null, properties))
+ .withMessageContaining(expected);
}
private void createSchema() throws Exception {
@@ -208,4 +226,32 @@ public class HiveTableSchemaTest {
new HashMap<>(),
""));
}
+
+ private Properties createTableWithEmptyDDL() {
+ String tableName = "empty_ddl_test_table";
+ Properties properties = new Properties();
+ properties.setProperty("name", tableName);
+ properties.setProperty("columns", "");
+ properties.setProperty("columns.types", "");
+ properties.setProperty("location", tempDir.toString());
+ return properties;
+ }
+
+ private Properties createTableWithExistsDDL() {
+ String tableName = "test_table";
+ Properties properties = new Properties();
+ properties.setProperty("name", tableName);
+ properties.setProperty("columns", "a,b,c");
+ properties.setProperty(
+ "columns.types",
+ String.join(
+ ":",
+ Arrays.asList(
+ TypeInfoFactory.intTypeInfo.getTypeName(),
+ TypeInfoFactory.stringTypeInfo.getTypeName(),
+ TypeInfoFactory.getDecimalTypeInfo(5,
3).getTypeName())));
+ properties.setProperty("columns.comments", "col1 comment\0col2
comment\0col3 comment");
+ properties.setProperty("location", tempDir.toString());
+ return properties;
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTestBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTestBase.java
new file mode 100644
index 000000000..997cea2a1
--- /dev/null
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveTestBase.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+/** hive engine execute paimon table test base. */
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public abstract class HiveTestBase {
+
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+ protected String path;
+
+ @HiveSQL(files = {})
+ protected static HiveShell hiveShell;
+
+ @Before
+ public void before() throws Exception {
+ hiveShell.execute("CREATE DATABASE IF NOT EXISTS test_db");
+ hiveShell.execute("USE test_db");
+ path = folder.newFolder().toString();
+ }
+
+ @After
+ public void after() {
+ hiveShell.execute("DROP DATABASE IF EXISTS test_db CASCADE");
+ }
+
+ protected static final Schema DEFAULT_TABLE_SCHEMA =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "col1", DataTypes.INT(), "first
comment"),
+ new DataField(1, "col2", DataTypes.STRING(),
"second comment"),
+ new DataField(2, "col3", DataTypes.DECIMAL(5, 3),
"last comment")),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Maps.newHashMap(),
+ "");
+
+ protected void createPaimonTable() throws Exception {
+ new SchemaManager(LocalFileIO.create(), new
Path(path)).createTable(DEFAULT_TABLE_SCHEMA);
+ }
+
+ protected Optional<TableSchema> paimonTableSchema() {
+ return new SchemaManager(LocalFileIO.create(), new
Path(path)).latest();
+ }
+
+ protected String generateDefaultHiveSql(String tableName) {
+ return String.join(
+ "\n",
+ Arrays.asList(
+ "CREATE EXTERNAL TABLE " + tableName + " (",
+ "col1 "
+ + TypeInfoFactory.intTypeInfo.getTypeName()
+ + " COMMENT 'The col1 field',",
+ "col2 "
+ + TypeInfoFactory.stringTypeInfo.getTypeName()
+ + " COMMENT 'The col2 field',",
+ "col3 "
+ + TypeInfoFactory.getDecimalTypeInfo(5,
3).getTypeName()
+ + " COMMENT 'The col3 field'",
+ ")",
+ "STORED BY '" + PaimonStorageHandler.class.getName() +
"'",
+ "LOCATION '" + path + "'"));
+ }
+}