This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d32af52  [FLINK-13952][table-planner][hive] PartitionableTableSink can 
not work with OverwritableTableSink
d32af52 is described below

commit d32af521cbe83f88cd0b822c4d752a1b5102c47c
Author: Rui Li <li...@apache.org>
AuthorDate: Wed Sep 4 21:27:00 2019 +0800

    [FLINK-13952][table-planner][hive] PartitionableTableSink can not work with 
OverwritableTableSink
    
    To support insert overwrite partition.
    
    This closes #9615.
---
 .../flink/connectors/hive/TableEnvHiveConnectorTest.java | 16 ++++++++++++++++
 .../flink/table/planner/delegation/PlannerBase.scala     |  3 +++
 .../apache/flink/table/api/internal/TableEnvImpl.scala   |  5 ++++-
 .../org/apache/flink/table/planner/StreamPlanner.scala   |  5 ++++-
 4 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
index 07dd674..e39999a 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java
@@ -177,6 +177,7 @@ public class TableEnvHiveConnectorTest {
        public void testInsertOverwrite() throws Exception {
                hiveShell.execute("create database db1");
                try {
+                       // non-partitioned
                        hiveShell.execute("create table db1.dest (x int, y 
string)");
                        hiveShell.insertInto("db1", "dest").addRow(1, 
"a").addRow(2, "b").commit();
                        verifyHiveQueryResult("select * from db1.dest", 
Arrays.asList("1\ta", "2\tb"));
@@ -184,6 +185,21 @@ public class TableEnvHiveConnectorTest {
                        tableEnv.sqlUpdate("insert overwrite db1.dest values 
(3,'c')");
                        tableEnv.execute("test insert overwrite");
                        verifyHiveQueryResult("select * from db1.dest", 
Collections.singletonList("3\tc"));
+
+                       // static partition
+                       hiveShell.execute("create table db1.part(x int) 
partitioned by (y int)");
+                       hiveShell.insertInto("db1", "part").addRow(1, 
1).addRow(2, 2).commit();
+                       tableEnv = getTableEnvWithHiveCatalog();
+                       tableEnv.sqlUpdate("insert overwrite db1.part partition 
(y=1) select 100");
+                       tableEnv.execute("insert overwrite static partition");
+                       verifyHiveQueryResult("select * from db1.part", 
Arrays.asList("100\t1", "2\t2"));
+
+                       // dynamic partition
+                       tableEnv = getTableEnvWithHiveCatalog();
+                       tableEnv.sqlUpdate("insert overwrite db1.part values 
(200,2),(3,3)");
+                       tableEnv.execute("insert overwrite dynamic partition");
+                       // only overwrite dynamically matched partitions, other 
existing partitions remain intact
+                       verifyHiveQueryResult("select * from db1.part", 
Arrays.asList("100\t1", "200\t2", "3\t3"));
                } finally {
                        hiveShell.execute("drop database db1 cascade");
                }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index f0e18f0..90cdab9 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -182,6 +182,9 @@ abstract class PlannerBase(
               if partitionableSink.getPartitionFieldNames != null
                 && partitionableSink.getPartitionFieldNames.nonEmpty =>
               
partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+            case _ =>
+          }
+          sink match {
             case overwritableTableSink: OverwritableTableSink =>
               overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
             case _ =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 0dece49..0e00268 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -474,12 +474,15 @@ abstract class TableEnvImpl(
           objectIdentifier,
           tableSink)
         // set static partitions if it is a partitioned table sink
-        // set whether to overwrite if it's an OverwritableTableSink
         tableSink match {
           case partitionableSink: PartitionableTableSink
             if partitionableSink.getPartitionFieldNames != null
               && partitionableSink.getPartitionFieldNames.nonEmpty =>
             
partitionableSink.setStaticPartition(insertOptions.staticPartitions)
+          case _ =>
+        }
+        // set whether to overwrite if it's an OverwritableTableSink
+        tableSink match {
           case overwritableTableSink: OverwritableTableSink =>
             overwritableTableSink.setOverwrite(insertOptions.overwrite)
           case _ =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index 140198b..10a04de 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -160,12 +160,15 @@ class StreamPlanner(
               identifier,
               sink)
             // set static partitions if it is a partitioned sink
-            // set whether to overwrite if it's an OverwritableTableSink
             sink match {
               case partitionableSink: PartitionableTableSink
                 if partitionableSink.getPartitionFieldNames != null
                   && partitionableSink.getPartitionFieldNames.nonEmpty =>
                 
partitionableSink.setStaticPartition(catalogSink.getStaticPartitions)
+              case _ =>
+            }
+            // set whether to overwrite if it's an OverwritableTableSink
+            sink match {
               case overwritableTableSink: OverwritableTableSink =>
                 overwritableTableSink.setOverwrite(catalogSink.isOverwrite)
               case _ =>

Reply via email to