This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6582704 [FLINK-20722][hive] HiveTableSink should copy the record when
converting RowData to Row
6582704 is described below
commit 65827041830e1332aeb373eb64ae31505a49f268
Author: Rui Li <[email protected]>
AuthorDate: Thu Dec 24 11:39:35 2020 +0800
[FLINK-20722][hive] HiveTableSink should copy the record when converting
RowData to Row
This closes #14484
---
.../hive/TableEnvHiveConnectorITCase.java | 42 ++++++++++++++++++++++
.../MultipleInputStreamOperatorBase.java | 7 ++--
2 files changed, 47 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 13e319a..9af50b4 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -74,6 +74,48 @@ public class TableEnvHiveConnectorITCase {
@ClassRule public static TemporaryFolder tempFolder = new
TemporaryFolder();
@Test
+ public void testMultiInputBroadcast() throws Exception {
+ TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
+ tableEnv.executeSql("create database db1");
+ try {
+ tableEnv.useDatabase("db1");
+ tableEnv.executeSql("create table src1(key string, val string)");
+ tableEnv.executeSql("create table src2(key string, val string)");
+ tableEnv.executeSql("create table dest(key string, val string)");
+ HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src1")
+ .addRow(new Object[] {"1", "val1"})
+ .addRow(new Object[] {"2", "val2"})
+ .addRow(new Object[] {"3", "val3"})
+ .commit();
+ HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "src2")
+ .addRow(new Object[] {"3", "val4"})
+ .addRow(new Object[] {"4", "val4"})
+ .commit();
+ tableEnv.executeSql(
+ "INSERT OVERWRITE dest\n"
+ + "SELECT j.*\n"
+ + "FROM (SELECT t1.key, p1.val\n"
+ + " FROM src2 t1\n"
+ + " LEFT OUTER JOIN src1 p1\n"
+ + " ON (t1.key = p1.key)\n"
+ + " UNION ALL\n"
+ + " SELECT t2.key, p2.val\n"
+ + " FROM src2 t2\n"
+ + " LEFT OUTER JOIN src1 p2\n"
+ + " ON (t2.key = p2.key)) j")
+ .await();
+ List<Row> results =
+ CollectionUtil.iteratorToList(
+ tableEnv.executeSql("select * from dest order by
key").collect());
+ assertEquals(
+ "[+I[3, val3], +I[3, val3], +I[4, null], +I[4, null]]",
results.toString());
+ } finally {
+ tableEnv.useDatabase("default");
+ tableEnv.executeSql("drop database db1 cascade");
+ }
+ }
+
+ @Test
public void testDefaultPartitionName() throws Exception {
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
tableEnv.executeSql("create database db1");
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
index 6a613be..32863f8 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
@@ -244,10 +244,13 @@ public abstract class MultipleInputStreamOperatorBase
extends AbstractStreamOper
if (outputs.length == 1) {
output = outputs[0];
} else {
+ // This is the inverse of creating the normal Output.
+ // In case of object reuse, we need to copy in the
broadcast output.
+ // Because user's operator may change the record passed to
it.
if (isObjectReuseEnabled) {
- output = new BroadcastingOutput(outputs);
- } else {
output = new CopyingBroadcastingOutput(outputs);
+ } else {
+ output = new BroadcastingOutput(outputs);
}
}
}