This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fcd52611d [core] Support create table column in-sensitive in
HiveCatalog (#3822)
fcd52611d is described below
commit fcd52611db7b85708d186c41a590610cccf2f66b
Author: xuzifu666 <[email protected]>
AuthorDate: Tue Jul 30 11:06:42 2024 +0800
[core] Support create table column in-sensitive in HiveCatalog (#3822)
---
.../generated/catalog_configuration.html | 6 +++
.../org/apache/paimon/options/CatalogOptions.java | 7 +++
.../java/org/apache/paimon/hive/HiveCatalog.java | 3 +-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 53 ++++++++++++++++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 16 +++++--
.../apache/paimon/spark/SparkGenericCatalog.java | 5 ++
.../spark/SparkGenericCatalogWithHiveTest.java | 56 +++++++++++++++++++++-
7 files changed, 141 insertions(+), 5 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html
b/docs/layouts/shortcodes/generated/catalog_configuration.html
index cab6e731e..53ba7d0ce 100644
--- a/docs/layouts/shortcodes/generated/catalog_configuration.html
+++ b/docs/layouts/shortcodes/generated/catalog_configuration.html
@@ -26,6 +26,12 @@ under the License.
</tr>
</thead>
<tbody>
+ <tr>
+ <td><h5>assert-upper-case</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If column name contains upper-case during create table would
not support when spark.sql.caseSensitive=true, you can set
spark.sql.caseSensitive=false to support upper-case condition. </td>
+ </tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
index f00a35a75..91c024a34 100644
--- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java
@@ -110,4 +110,11 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can
implement LineageMetaFactory and LineageMeta to store lineage information in
customized storage."))
.build());
+
+ public static final ConfigOption<Boolean> ASSERT_UPPER_CASE =
+ ConfigOptions.key("assert-upper-case")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If column name contains upper-case during create
table would not support when spark.sql.caseSensitive=true, you can set
spark.sql.caseSensitive=false to support upper-case condition. ");
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 7edb49f58..cd78ad821 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -91,6 +91,7 @@ import static
org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR;
import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER;
import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -567,7 +568,7 @@ public class HiveCatalog extends AbstractCatalog {
@Override
public boolean caseSensitive() {
- return false;
+ return catalogOptions.get(ASSERT_UPPER_CASE);
}
@Override
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index e42d48c8b..39571005b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -363,6 +363,59 @@ public abstract class HiveCatalogITCaseBase {
assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
}
+ @Test
+ public void testCreateInsensitiveTable() throws Exception {
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG paimon_catalog_01 WITH (",
+ " 'type' = 'paimon',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true',",
+ " 'table.type' = 'EXTERNAL',",
+ " 'assert-upper-case' = 'true'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
+ tEnv.executeSql("USE test_db").await();
+ tEnv.executeSql("CREATE TABLE t ( aa INT, Bb STRING ) WITH (
'file.format' = 'avro' )")
+ .await();
+ assertThat(
+ hiveShell
+ .executeQuery("DESC FORMATTED t")
+ .contains("Table Type:
\tEXTERNAL_TABLE \tNULL"))
+ .isTrue();
+ tEnv.executeSql("DROP TABLE t").await();
+ Path tablePath = new Path(path, "test_db.db/t");
+ assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
+
+ tEnv.executeSql(
+ String.join(
+ "\n",
+ "CREATE CATALOG paimon_catalog_02 WITH (",
+ " 'type' = 'paimon',",
+ " 'metastore' = 'hive',",
+ " 'uri' = '',",
+ " 'warehouse' = '" + path + "',",
+ " 'lock.enabled' = 'true',",
+ " 'table.type' = 'EXTERNAL',",
+ " 'assert-upper-case' = 'false'",
+ ")"))
+ .await();
+ tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
+ tEnv.executeSql("USE test_db").await();
+
+ // set case-sensitive = false would throw exception out
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ tEnv.executeSql(
+ "CREATE TABLE t1 ( aa INT, Bb STRING )
WITH ( 'file.format' = 'avro' )")
+ .await());
+ }
+
@Test
public void testFlinkWriteAndHiveRead() throws Exception {
tEnv.executeSql(
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 4e8d8eaf7..c28a65cc8 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.internal.SessionState;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -53,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -71,10 +73,18 @@ public class SparkCatalog extends SparkBaseCatalog {
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
+ Map<String, String> newOptions = new
HashMap<>(options.asCaseSensitiveMap());
+ SessionState sessionState = SparkSession.active().sessionState();
+
CatalogContext catalogContext =
- CatalogContext.create(
- Options.fromMap(options),
- SparkSession.active().sessionState().newHadoopConf());
+ CatalogContext.create(Options.fromMap(options),
sessionState.newHadoopConf());
+
+ // add case-insensitive from sql conf
+ newOptions.put(
+ ASSERT_UPPER_CASE.key(),
+
Boolean.toString(!sessionState.conf().caseSensitiveAnalysis()));
+ options = new CaseInsensitiveStringMap(newOptions);
+
this.catalog = CatalogFactory.createCatalog(catalogContext);
this.defaultDatabase =
options.getOrDefault(DEFAULT_DATABASE.key(),
DEFAULT_DATABASE.defaultValue());
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index 96b87701b..4860301af 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -61,6 +61,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
+import static org.apache.paimon.options.CatalogOptions.ASSERT_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.METASTORE;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static
org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -284,6 +285,10 @@ public class SparkGenericCatalog extends SparkBaseCatalog
implements CatalogExte
Map<String, String> newOptions = new
HashMap<>(options.asCaseSensitiveMap());
fillAliyunConfigurations(newOptions, hadoopConf);
fillCommonConfigurations(newOptions, sqlConf);
+
+ // add case-insensitive from sql conf
+ newOptions.put(ASSERT_UPPER_CASE.key(),
Boolean.toString(!sqlConf.caseSensitiveAnalysis()));
+
return new CaseInsensitiveStringMap(newOptions);
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 4377bc94a..b0f1749df 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
/** Base tests for spark read. */
public class SparkGenericCatalogWithHiveTest {
@@ -44,6 +45,59 @@ public class SparkGenericCatalogWithHiveTest {
testHiveMetastore.stop();
}
+ @Test
+ public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path
tempDir) {
+ // firstly, we use hive metastore to creata table, and check the
result.
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ // with case-sensitive false
+ .config("spark.sql.caseSensitive", "false")
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE my_db1");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING
paimon TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ assertThat(
+ spark.sql("SHOW TABLES").collectAsList().stream()
+ .map(s -> s.get(1))
+ .map(Object::toString))
+ .containsExactlyInAnyOrder("t2");
+ spark.close();
+
+ SparkSession spark1 =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ // with case-sensitive true
+ .config("spark.sql.caseSensitive", "true")
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark1.sql("USE my_db1");
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ spark1.sql(
+ "CREATE TABLE IF NOT EXISTS t3 (a INT, Bb INT,
c STRING) USING paimon TBLPROPERTIES"
+ + " ('file.format'='avro')"));
+ spark1.close();
+ }
+
@Test
public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
// firstly, we use hive metastore to creata table, and check the
result.
@@ -66,7 +120,7 @@ public class SparkGenericCatalogWithHiveTest {
+ " ('file.format'='avro')");
assertThat(spark.sql("SHOW
NAMESPACES").collectAsList().stream().map(Object::toString))
- .containsExactlyInAnyOrder("[default]", "[my_db]");
+ .containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");
assertThat(
spark.sql("SHOW TABLES").collectAsList().stream()