This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 273fd84  [FLINK-18529][hive] Query Hive table and filter by timestamp 
partition can fail
273fd84 is described below

commit 273fd847c5cd2ac76e87aa1aa47a8517b78d860b
Author: Rui Li <[email protected]>
AuthorDate: Tue Jul 14 14:05:40 2020 +0800

    [FLINK-18529][hive] Query Hive table and filter by timestamp partition can 
fail
    
    This closes #12856
---
 .../table/catalog/hive/util/HiveTableUtil.java      | 21 ++++++++-------------
 .../connectors/hive/HiveTableSourceITCase.java      | 14 +++++++++++++-
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
index d5d5413..3a04f1a 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTableUtil.java
@@ -46,6 +46,7 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -453,22 +454,16 @@ public class HiveTableUtil {
                        if (value == null) {
                                return "null";
                        }
+                       LogicalTypeRoot typeRoot = 
dataType.getLogicalType().getTypeRoot();
+                       if 
(typeRoot.getFamilies().contains(LogicalTypeFamily.DATETIME)) {
+                               // hive not support partition filter push down 
with these types.
+                               return null;
+                       }
                        value = 
HiveInspectors.getConversion(HiveInspectors.getObjectInspector(dataType), 
dataType.getLogicalType(), hiveShim)
                                        .toHiveObject(value);
                        String res = value.toString();
-                       LogicalTypeRoot typeRoot = 
dataType.getLogicalType().getTypeRoot();
-                       switch (typeRoot) {
-                               case CHAR:
-                               case VARCHAR:
-                                       res = "'" + res.replace("'", "''") + 
"'";
-                                       break;
-                               case DATE:
-                               case TIMESTAMP_WITHOUT_TIME_ZONE:
-                               case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                                       // hive not support partition filter 
push down with these types.
-                                       return null;
-                               default:
-                                       break;
+                       if (typeRoot == LogicalTypeRoot.CHAR || typeRoot == 
LogicalTypeRoot.VARCHAR) {
+                               res = "'" + res.replace("'", "''") + "'";
                        }
                        return res;
                }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 5170920..b2badf9 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -289,6 +289,14 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                        assertTrue(optimizedPlan, 
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
                        results = Lists.newArrayList(query.execute().collect());
                        assertEquals("[4]", results.toString());
+
+                       query = tableEnv.sqlQuery("select x from db1.part where 
'' = p2");
+                       explain = query.explain().split("==.*==\n");
+                       assertFalse(catalog.fallback);
+                       optimizedPlan = explain[2];
+                       assertTrue(optimizedPlan, 
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 0"));
+                       results = Lists.newArrayList(query.execute().collect());
+                       assertEquals("[]", results.toString());
                } finally {
                        tableEnv.executeSql("drop database db1 cascade");
                }
@@ -319,7 +327,11 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
                        assertTrue(optimizedPlan, 
optimizedPlan.contains("PartitionPruned: true, PartitionNums: 1"));
                        List<Row> results = 
Lists.newArrayList(query.execute().collect());
                        assertEquals("[3]", results.toString());
-                       System.out.println(results);
+
+                       // filter by timestamp partition
+                       query = tableEnv.sqlQuery("select x from db1.part where 
timestamp '2018-08-08 08:08:09' = p2");
+                       results = Lists.newArrayList(query.execute().collect());
+                       assertEquals("[2]", results.toString());
                } finally {
                        tableEnv.executeSql("drop database db1 cascade");
                }

Reply via email to