This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6582704  [FLINK-20722][hive] HiveTableSink should copy the record when 
converting RowData to Row
6582704 is described below

commit 65827041830e1332aeb373eb64ae31505a49f268
Author: Rui Li <[email protected]>
AuthorDate: Thu Dec 24 11:39:35 2020 +0800

    [FLINK-20722][hive] HiveTableSink should copy the record when converting 
RowData to Row
    
    This closes #14484
---
 .../hive/TableEnvHiveConnectorITCase.java          | 42 ++++++++++++++++++++++
 .../MultipleInputStreamOperatorBase.java           |  7 ++--
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 13e319a..9af50b4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -74,6 +74,48 @@ public class TableEnvHiveConnectorITCase {
     @ClassRule public static TemporaryFolder tempFolder = new 
TemporaryFolder();
 
     @Test
+    public void testMultiInputBroadcast() throws Exception {
+        TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+        tableEnv.executeSql("create database db1");
+        try {
+            tableEnv.useDatabase("db1");
+            tableEnv.executeSql("create table src1(key string, val string)");
+            tableEnv.executeSql("create table src2(key string, val string)");
+            tableEnv.executeSql("create table dest(key string, val string)");
+            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
+                    .addRow(new Object[] {"1", "val1"})
+                    .addRow(new Object[] {"2", "val2"})
+                    .addRow(new Object[] {"3", "val3"})
+                    .commit();
+            HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src2")
+                    .addRow(new Object[] {"3", "val4"})
+                    .addRow(new Object[] {"4", "val4"})
+                    .commit();
+            tableEnv.executeSql(
+                            "INSERT OVERWRITE dest\n"
+                                    + "SELECT j.*\n"
+                                    + "FROM (SELECT t1.key, p1.val\n"
+                                    + "      FROM src2 t1\n"
+                                    + "      LEFT OUTER JOIN src1 p1\n"
+                                    + "      ON (t1.key = p1.key)\n"
+                                    + "      UNION ALL\n"
+                                    + "      SELECT t2.key, p2.val\n"
+                                    + "      FROM src2 t2\n"
+                                    + "      LEFT OUTER JOIN src1 p2\n"
+                                    + "      ON (t2.key = p2.key)) j")
+                    .await();
+            List<Row> results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql("select * from dest order by 
key").collect());
+            assertEquals(
+                    "[+I[3, val3], +I[3, val3], +I[4, null], +I[4, null]]", 
results.toString());
+        } finally {
+            tableEnv.useDatabase("default");
+            tableEnv.executeSql("drop database db1 cascade");
+        }
+    }
+
+    @Test
     public void testDefaultPartitionName() throws Exception {
         TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
         tableEnv.executeSql("create database db1");
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
index 6a613be..32863f8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
@@ -244,10 +244,13 @@ public abstract class MultipleInputStreamOperatorBase 
extends AbstractStreamOper
                 if (outputs.length == 1) {
                     output = outputs[0];
                 } else {
+                    // This is the inverse of creating the normal Output.
+                    // In case of object reuse, we need to copy in the 
broadcast output.
+                    // Because user's operator may change the record passed to 
it.
                     if (isObjectReuseEnabled) {
-                        output = new BroadcastingOutput(outputs);
-                    } else {
                         output = new CopyingBroadcastingOutput(outputs);
+                    } else {
+                        output = new BroadcastingOutput(outputs);
                     }
                 }
             }

Reply via email to