This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new 3c6e11e08dd [FLINK-29013][hive] Fix fail to use BinaryRecordReader in 
"transform using" syntax with Hive dialect (#20643)
3c6e11e08dd is described below

commit 3c6e11e08dde0f4d613928cfd14aecb0e7725adb
Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn>
AuthorDate: Mon Sep 5 17:11:36 2022 +0800

    [FLINK-29013][hive] Fix fail to use BinaryRecordReader in "transform using" 
syntax with Hive dialect (#20643)
---
 .../operators/hive/script/HiveScriptTransformOperator.java       | 3 ---
 .../operators/hive/script/HiveScriptTransformOutReadThread.java  | 4 +---
 .../org/apache/flink/connectors/hive/HiveDialectQueryITCase.java | 9 +++++++++
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java
index e9e437a2d56..427f7251c54 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOperator.java
@@ -302,15 +302,12 @@ public class HiveScriptTransformOperator extends 
TableStreamOperator<RowData>
     private void initScriptOutPutReadThread() throws Exception {
         StructObjectInspector outputStructObjectInspector =
                 (StructObjectInspector) outputSerDe.getObjectInspector();
-        Writable reusedWritableObject =
-                
outputSerDe.getSerializedClass().getConstructor().newInstance();
         outReadThread =
                 new HiveScriptTransformOutReadThread(
                         recordReader,
                         outputType,
                         outputSerDe,
                         outputStructObjectInspector,
-                        reusedWritableObject,
                         collector);
         outReadThread.start();
     }
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java
index 56028ea75f7..ed2750c8f13 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/runtime/operators/hive/script/HiveScriptTransformOutReadThread.java
@@ -52,14 +52,12 @@ public class HiveScriptTransformOutReadThread extends 
Thread {
     private final HiveShim hiveShim;
     private final StructObjectInspector outputStructObjectInspector;
     private final List<? extends StructField> structFields;
-    private final Writable reusedWritableObject;
 
     public HiveScriptTransformOutReadThread(
             RecordReader recordReader,
             LogicalType outputType,
             AbstractSerDe outSerDe,
             StructObjectInspector structObjectInspector,
-            Writable reusedWritableObject,
             StreamRecordCollector<RowData> collector) {
         this.recordReader = recordReader;
         this.outSerDe = outSerDe;
@@ -71,7 +69,6 @@ public class HiveScriptTransformOutReadThread extends Thread {
         }
         this.hiveShim = 
HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
         this.outputStructObjectInspector = structObjectInspector;
-        this.reusedWritableObject = reusedWritableObject;
         this.structFields = 
outputStructObjectInspector.getAllStructFieldRefs();
         this.collector = collector;
         setDaemon(true);
@@ -81,6 +78,7 @@ public class HiveScriptTransformOutReadThread extends Thread {
     @Override
     public void run() {
         try {
+            Writable reusedWritableObject = recordReader.createRow();
             while (true) {
                 long bytes = recordReader.next(reusedWritableObject);
                 if (bytes <= 0) {
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
index 1c8c7c6990e..54bdb88a820 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectQueryITCase.java
@@ -588,6 +588,15 @@ public class HiveDialectQueryITCase {
                                     defaultPartitionName,
                                     defaultPartitionName,
                                     defaultPartitionName));
+
+            // test use binary record reader
+            result =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.executeSql(
+                                            "select transform(key) using 'cat' 
as (tkey)"
+                                                    + " RECORDREADER 
'org.apache.hadoop.hive.ql.exec.BinaryRecordReader' from src")
+                                    .collect());
+            assertThat(result.toString()).isEqualTo("[+I[1\n2\n3\n]]");
         } finally {
             tableEnv.executeSql("drop table dest1");
             tableEnv.executeSql("drop table destp1");

Reply via email to