This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new 3c6e11e08dd [FLINK-29013][hive] Fix fail to use BinaryRecordReader in "transform using" syntax with Hive dialect (#20643) 3c6e11e08dd is described below commit 3c6e11e08dde0f4d613928cfd14aecb0e7725adb Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Mon Sep 5 17:11:36 2022 +0800 [FLINK-29013][hive] Fix fail to use BinaryRecordReader in "transform using" syntax with Hive dialect (#20643) --- .../operators/hive/script/HiveScriptTransformOperator.java | 3 --- .../operators/hive/script/HiveScriptTransformOutReadThread.java | 4 +--- .../org/apache/flink/connectors/hive/HiveDialectQueryITCase.java | 9 +++++++++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java index e9e437a2d56..427f7251c54 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java @@ -302,15 +302,12 @@ public class HiveScriptTransformOperator extends TableStreamOperator<RowData> private void initScriptOutPutReadThread() throws Exception { StructObjectInspector outputStructObjectInspector = (StructObjectInspector) outputSerDe.getObjectInspector(); - Writable reusedWritableObject = - outputSerDe.getSerializedClass().getConstructor().newInstance(); outReadThread = new HiveScriptTransformOutReadThread( recordReader, outputType, outputSerDe, outputStructObjectInspector, - reusedWritableObject, collector); outReadThread.start(); } diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java index 56028ea75f7..ed2750c8f13 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java @@ -52,14 +52,12 @@ public class HiveScriptTransformOutReadThread extends Thread { private final HiveShim hiveShim; private final StructObjectInspector outputStructObjectInspector; private final List<? extends StructField> structFields; - private final Writable reusedWritableObject; public HiveScriptTransformOutReadThread( RecordReader recordReader, LogicalType outputType, AbstractSerDe outSerDe, StructObjectInspector structObjectInspector, - Writable reusedWritableObject, StreamRecordCollector<RowData> collector) { this.recordReader = recordReader; this.outSerDe = outSerDe; @@ -71,7 +69,6 @@ public class HiveScriptTransformOutReadThread extends Thread { } this.hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion()); this.outputStructObjectInspector = structObjectInspector; - this.reusedWritableObject = reusedWritableObject; this.structFields = outputStructObjectInspector.getAllStructFieldRefs(); this.collector = collector; setDaemon(true); @@ -81,6 +78,7 @@ public class HiveScriptTransformOutReadThread extends Thread { @Override public void run() { try { + Writable reusedWritableObject = recordReader.createRow(); while (true) { long bytes = recordReader.next(reusedWritableObject); if (bytes <= 0) { diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java index 1c8c7c6990e..54bdb88a820 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java @@ -588,6 +588,15 @@ public class HiveDialectQueryITCase { defaultPartitionName, defaultPartitionName, defaultPartitionName)); + + // test use binary record reader + result = + CollectionUtil.iteratorToList( + tableEnv.executeSql( + "select transform(key) using 'cat' as (tkey)" + + " RECORDREADER 'org.apache.hadoop.hive.ql.exec.BinaryRecordReader' from src") + .collect()); + assertThat(result.toString()).isEqualTo("[+I[1\n2\n3\n]]"); } finally { tableEnv.executeSql("drop table dest1"); tableEnv.executeSql("drop table destp1");