This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ef91473f [hive] Using hive to read data from 
metastore.partitioned-table cannot find the table schema (#1934)
2ef91473f is described below

commit 2ef91473fa68ff05452df2d3e80ee8fdc0d9a2d5
Author: Kerwin <[email protected]>
AuthorDate: Thu Sep 7 18:05:42 2023 +0800

    [hive] Using hive to read data from metastore.partitioned-table cannot find 
the table schema (#1934)
---
 .../apache/paimon/hive/LocationKeyExtractor.java   | 18 +++++++++
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 44 ++++++++++++++++++++++
 2 files changed, 62 insertions(+)

diff --git 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
index 3b0507492..c9bb88a5e 100644
--- 
a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
+++ 
b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/LocationKeyExtractor.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.hive;
 
+import org.apache.paimon.utils.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,6 +33,8 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -59,6 +63,7 @@ public class LocationKeyExtractor {
         // read what metastore tells us
         location = 
properties.getProperty(hive_metastoreConstants.META_TABLE_LOCATION);
         if (location != null) {
+            location = tableLocation(location, properties);
             if (conf != null) {
                 try {
                     return getDnsPath(new Path(location), conf).toString();
@@ -150,4 +155,17 @@ public class LocationKeyExtractor {
 
         return null;
     }
+
+    /** Get table location through partition location. */
+    private static String tableLocation(String location, Properties 
properties) {
+        String partitionProperty =
+                
properties.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+        if (StringUtils.isEmpty(partitionProperty)) {
+            return location;
+        }
+
+        List<String> partitionKeys =
+                
Arrays.asList(partitionProperty.split(org.apache.paimon.fs.Path.SEPARATOR));
+        return location.split(org.apache.paimon.fs.Path.SEPARATOR + 
partitionKeys.get(0) + "=")[0];
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 51be412c7..f31463cbf 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -329,6 +329,50 @@ public abstract class HiveCatalogITCaseBase {
                         "Cannot find table '`my_hive`.`test_db`.`hive_table`' 
in any of the catalogs [default_catalog, my_hive], nor as a temporary table.");
     }
 
+    /**
+     * Test flink writing and hive reading to compare partitions and 
non-partitions table results.
+     */
+    @Test
+    public void testFlinkWriteAndHiveReadToCompare() throws Exception {
+        // Use flink to create a partitioned table and write data, hive read.
+        tEnv.executeSql(
+                        "create table students\n"
+                                + "(id decimal(20,0)\n"
+                                + ",upload_insert TIMESTAMP\n"
+                                + ",dt string\n"
+                                + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+                                + ") PARTITIONED BY (dt)\n"
+                                + "WITH (\n"
+                                + "'bucket' = '-1',\n"
+                                + "'file.format' = 'parquet',\n"
+                                + "'metastore.partitioned-table' = 'true'\n"
+                                + ");")
+                .await();
+        tEnv.executeSql(
+                        "insert into students select cast(1 as decimal(20,0)) 
as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as 
dt;")
+                .await();
+        List<String> partitionedTableResult = hiveShell.executeQuery("SELECT * 
from students");
+
+        // Use flink to create a non-partitioned table and write data, hive 
read.
+        tEnv.executeSql(
+                        "create table students1\n"
+                                + "(id decimal(20,0)\n"
+                                + ",upload_insert TIMESTAMP\n"
+                                + ",dt string\n"
+                                + ",PRIMARY KEY(id,dt) NOT ENFORCED\n"
+                                + ") PARTITIONED BY (dt)\n"
+                                + "WITH (\n"
+                                + "'bucket' = '-1',\n"
+                                + "'file.format' = 'parquet'\n"
+                                + ");")
+                .await();
+        tEnv.executeSql(
+                        "insert into students1 select cast(1 as decimal(20,0)) 
as id,to_timestamp('2023-08-01 14:03:00.123456') as upload_insert,'20230801' as 
dt;")
+                .await();
+        List<String> nonPartitionedTableResult = 
hiveShell.executeQuery("SELECT * from students1");
+        
assertThat(partitionedTableResult).containsAll(nonPartitionedTableResult);
+    }
+
     @Test
     public void testHiveCreateAndFlinkRead() throws Exception {
         hiveShell.execute("SET hive.metastore.warehouse.dir=" + path);

Reply via email to