This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2ef91473f [hive] Using hive to read data from
metastore.partitioned-table cannot find the table schema (#1934)
2ef91473f is described below
commit 2ef91473fa68ff05452df2d3e80ee8fdc0d9a2d5
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 7 18:05:42 2023 +0800
[hive] Using hive to read data from metastore.partitioned-table cannot find
the table schema (#1934)
---
.../apache/paimon/hive/LocationKeyExtractor.java | 18 +++++++++
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 44 ++++++++++++++++++++++
2 files changed, 62 insertions(+)
diff --git
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
index 3b0507492..c9bb88a5e 100644
---
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
+++
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
@@ -18,6 +18,8 @@
package org.apache.paimon.hive;
+import org.apache.paimon.utils.StringUtils;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -31,6 +33,8 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -59,6 +63,7 @@ public class LocationKeyExtractor {
// read what metastore tells us
location =
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
if (location != null) {
+ location = tableLocation(location, properties);
if (conf != null) {
try {
return getDnsPath(new Path(location), conf).toString();
@@ -150,4 +155,17 @@ public class LocationKeyExtractor {
return null;
}
+
+ /** Get table location through partition location. */
+ private static String tableLocation(String location, Properties
properties) {
+ String partitionProperty =
+
properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ if (StringUtils.isEmpty(partitionProperty)) {
+ return location;
+ }
+
+ List<String> partitionKeys =
+
Arrays.asList(partitionProperty.split(org.apache.paimon.fs.Path.SEPARATOR));
+ return location.split(org.apache.paimon.fs.Path.SEPARATOR +
partitionKeys.get(0) + "=")[0];
+ }
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 51be412c7..f31463cbf 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -329,6 +329,50 @@ public abstract class HiveCatalogITCaseBase {
"Cannot find table '`my_hive`.`test_db`.`hive_table`'
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
}
+ /**
+ * Test flink writing and hive reading to compare partitions and
non-partitions table results.
+ */
+ @Test
+ public void testFlinkWriteAndHiveReadToCompare() throws Exception {
+ // Use flink to create a partitioned table and write data, hive read.
+ tEnv.executeSql(
+ "create table students\n"
+ + "(id decimal(20,0)\n"
+ + ",upload_insert TIMESTAMP\n"
+ + ",dt string\n"
+ + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt)\n"
+ + "WITH (\n"
+ + "'bucket' = '-1',\n"
+ + "'file.format' = 'parquet',\n"
+ + "'metastore.partitioned-table' = 'true'\n"
+ + ");")
+ .await();
+ tEnv.executeSql(
+ "insert into students select cast(1 as decimal(20,0))
as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as
dt;")
+ .await();
+ List<String> partitionedTableResult = hiveShell.executeQuery("SELECT *
from students");
+
+ // Use flink to create a non-partitioned table and write data, hive
read.
+ tEnv.executeSql(
+ "create table students1\n"
+ + "(id decimal(20,0)\n"
+ + ",upload_insert TIMESTAMP\n"
+ + ",dt string\n"
+ + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt)\n"
+ + "WITH (\n"
+ + "'bucket' = '-1',\n"
+ + "'file.format' = 'parquet'\n"
+ + ");")
+ .await();
+ tEnv.executeSql(
+ "insert into students1 select cast(1 as decimal(20,0))
as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as
dt;")
+ .await();
+ List<String> nonPartitionedTableResult =
hiveShell.executeQuery("SELECT * from students1");
+
assertThat(partitionedTableResult).containsAll(nonPartitionedTableResult);
+ }
+
@Test
public void testHiveCreateAndFlinkRead() throws Exception {
hiveShell.execute("SET hive.metastore.warehouse.dir=" + path);