This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new babde31 [SPARK-38075][SQL] Fix `hasNext` in
`HiveScriptTransformationExec`'s process output iterator
babde31 is described below
commit babde316d86e06800d3fe90a024e889349d63749
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Jan 31 10:44:53 2022 -0800
[SPARK-38075][SQL] Fix `hasNext` in `HiveScriptTransformationExec`'s
process output iterator
### What changes were proposed in this pull request?
Fix hasNext in HiveScriptTransformationExec's process output iterator to
always return false if it had previously returned false.
### Why are the changes needed?
When hasNext on the process output iterator returns false, it leaves the
iterator in a state (i.e., scriptOutputWritable is not null) such that the next
call returns true.
The Guava Ordering used in TakeOrderedAndProjectExec will call hasNext on
the process output iterator even after an earlier call had returned false. This
results in fake rows when script transform is used with `order by` and `limit`.
For example:
```
create or replace temp view t as
select * from values
(1),
(2),
(3)
as t(a);
select transform(a)
USING 'cat' AS (a int)
FROM t order by a limit 10;
```
This returns:
```
NULL
NULL
NULL
1
2
3
```
### Does this PR introduce _any_ user-facing change?
No, other than removing the correctness issue.
### How was this patch tested?
New unit test.
Closes #35368 from bersprockets/script_transformation_issue.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 46885bef1a1254853ce9165862e3bd8f3a15071f)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../hive/execution/HiveScriptTransformationExec.scala | 7 ++++++-
.../hive/execution/HiveScriptTransformationSuite.scala | 17 +++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
index 219b1a2..beb5583 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
@@ -64,7 +64,7 @@ private[hive] case class HiveScriptTransformationExec(
outputSoi: StructObjectInspector,
hadoopConf: Configuration): Iterator[InternalRow] = {
new Iterator[InternalRow] with HiveInspectors {
- var curLine: String = null
+ private var completed = false
val scriptOutputStream = new DataInputStream(inputStream)
val scriptOutputReader =
@@ -78,6 +78,9 @@ private[hive] case class HiveScriptTransformationExec(
lazy val unwrappers =
outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor)
override def hasNext: Boolean = {
+ if (completed) {
+ return false
+ }
try {
if (scriptOutputWritable == null) {
scriptOutputWritable = reusedWritableObject
@@ -85,6 +88,7 @@ private[hive] case class HiveScriptTransformationExec(
if (scriptOutputReader != null) {
if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
checkFailureAndPropagate(writerThread, null, proc,
stderrBuffer)
+ completed = true
return false
}
} else {
@@ -97,6 +101,7 @@ private[hive] case class HiveScriptTransformationExec(
// there can be a lag between EOF being written out and the
process
// being terminated. So explicitly waiting for the process
to be done.
checkFailureAndPropagate(writerThread, null, proc,
stderrBuffer)
+ completed = true
return false
}
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
index 24743e8..52c3652 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
@@ -619,4 +619,21 @@ class HiveScriptTransformationSuite extends
BaseScriptTransformationSuite with T
assert(e.contains("java.lang.ArithmeticException: long overflow"))
}
}
+
+ test("SPARK-38075: ORDER BY with LIMIT should not add fake rows") {
+ withTempView("v") {
+ val df = Seq((1), (2), (3)).toDF("a")
+ df.createTempView("v")
+ checkAnswer(sql(
+ """
+ |SELECT TRANSFORM(a)
+ | USING 'cat' AS (a)
+ |FROM v
+ |ORDER BY a
+ |LIMIT 10
+ |""".stripMargin),
+ identity,
+ Row("1") :: Row("2") :: Row("3") :: Nil)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]