This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 029393a0ef30 [SPARK-54054][CONNECT] Support row position for
SparkConnectResultSet
029393a0ef30 is described below
commit 029393a0ef30cef4cda03e47135a28df5e8344f1
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Oct 29 15:48:58 2025 +0900
[SPARK-54054][CONNECT] Support row position for SparkConnectResultSet
### What changes were proposed in this pull request?
This PR implements below methods of the `java.sql.ResultSet` interface for
`SparkConnectResultSet`
```
boolean isBeforeFirst() throws SQLException;
boolean isAfterLast() throws SQLException;
boolean isFirst() throws SQLException;
boolean isLast() throws SQLException;
int getRow() throws SQLException;
void setFetchDirection(int direction) throws SQLException;
int getFetchDirection() throws SQLException;
```
### Why are the changes needed?
Implement more JDBC APIs.
### Does this PR introduce _any_ user-facing change?
No, it's new feature.
### How was this patch tested?
New UTs are added.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52756 from pan3793/SPARK-54054.
Lead-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kousuke Saruta <[email protected]>
---
.../client/jdbc/SparkConnectResultSet.scala | 60 ++++++++--
.../connect/client/jdbc/util/JdbcErrorUtils.scala | 16 +++
.../client/jdbc/SparkConnectResultSetSuite.scala | 125 +++++++++++++++++++++
3 files changed, 191 insertions(+), 10 deletions(-)
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
index 38417b0de217..0745ddc09911 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
@@ -25,6 +25,7 @@ import java.util.Calendar
import org.apache.spark.sql.Row
import org.apache.spark.sql.connect.client.SparkResult
+import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils
class SparkConnectResultSet(
sparkResult: SparkResult[Row],
@@ -34,16 +35,25 @@ class SparkConnectResultSet(
private var currentRow: Row = _
- private var _wasNull: Boolean = false
+ // cursor is 1-based, range in [0, length + 1]
+ // - 0 means beforeFirstRow
+ // - value in [1, length] means the row number
+ // - length + 1 means afterLastRow
+ private var cursor: Int = 0
+ private var _wasNull: Boolean = false
override def wasNull: Boolean = _wasNull
override def next(): Boolean = {
val hasNext = iterator.hasNext
if (hasNext) {
currentRow = iterator.next()
+ cursor += 1
} else {
currentRow = null
+ if (cursor > 0 && cursor == sparkResult.length) {
+ cursor += 1
+ }
}
hasNext
}
@@ -253,13 +263,25 @@ class SparkConnectResultSet(
override def getBigDecimal(columnLabel: String): java.math.BigDecimal =
throw new SQLFeatureNotSupportedException
- override def isBeforeFirst: Boolean = throw new
SQLFeatureNotSupportedException
+ override def isBeforeFirst: Boolean = {
+ checkOpen()
+ cursor < 1 && sparkResult.length > 0
+ }
- override def isAfterLast: Boolean = throw new SQLFeatureNotSupportedException
+ override def isFirst: Boolean = {
+ checkOpen()
+ cursor == 1
+ }
- override def isFirst: Boolean = throw new SQLFeatureNotSupportedException
+ override def isLast: Boolean = {
+ checkOpen()
+ cursor > 0 && cursor == sparkResult.length
+ }
- override def isLast: Boolean = throw new SQLFeatureNotSupportedException
+ override def isAfterLast: Boolean = {
+ checkOpen()
+ cursor > 0 && cursor > sparkResult.length
+ }
override def beforeFirst(): Unit = throw new SQLFeatureNotSupportedException
@@ -269,7 +291,15 @@ class SparkConnectResultSet(
override def last(): Boolean = throw new SQLFeatureNotSupportedException
- override def getRow: Int = throw new SQLFeatureNotSupportedException
+ override def getRow: Int = {
+ checkOpen()
+
+ if (cursor < 1 || cursor > sparkResult.length) {
+ 0
+ } else {
+ cursor
+ }
+ }
override def absolute(row: Int): Boolean = throw new
SQLFeatureNotSupportedException
@@ -277,11 +307,21 @@ class SparkConnectResultSet(
override def previous(): Boolean = throw new SQLFeatureNotSupportedException
- override def setFetchDirection(direction: Int): Unit =
- throw new SQLFeatureNotSupportedException
+ override def setFetchDirection(direction: Int): Unit = {
+ checkOpen()
+ assert(this.getType == ResultSet.TYPE_FORWARD_ONLY)
- override def getFetchDirection: Int =
- throw new SQLFeatureNotSupportedException
+ if (direction != ResultSet.FETCH_FORWARD) {
+ throw new SQLException(
+ s"Fetch direction ${JdbcErrorUtils.stringifyFetchDirection(direction)}
is not supported " +
+ s"for
${JdbcErrorUtils.stringifyResultSetType(ResultSet.TYPE_FORWARD_ONLY)} result
set.")
+ }
+ }
+
+ override def getFetchDirection: Int = {
+ checkOpen()
+ ResultSet.FETCH_FORWARD
+ }
override def setFetchSize(rows: Int): Unit =
throw new SQLFeatureNotSupportedException
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
index cb941b7420a7..3d9f72d87d15 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
@@ -37,4 +37,20 @@ private[jdbc] object JdbcErrorUtils {
case _ =>
throw new IllegalArgumentException(s"Invalid holdability: $holdability")
}
+
+ def stringifyResultSetType(typ: Int): String = typ match {
+ case ResultSet.TYPE_FORWARD_ONLY => "FORWARD_ONLY"
+ case ResultSet.TYPE_SCROLL_INSENSITIVE => "SCROLL_INSENSITIVE"
+ case ResultSet.TYPE_SCROLL_SENSITIVE => "SCROLL_SENSITIVE"
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid ResultSet type: $typ")
+ }
+
+ def stringifyFetchDirection(direction: Int): String = direction match {
+ case ResultSet.FETCH_FORWARD => "FETCH_FORWARD"
+ case ResultSet.FETCH_REVERSE => "FETCH_REVERSE"
+ case ResultSet.FETCH_UNKNOWN => "FETCH_UNKNOWN"
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid fetch direction:
$direction")
+ }
}
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
new file mode 100644
index 000000000000..ac2866837e93
--- /dev/null
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+
+class SparkConnectResultSetSuite extends ConnectFunSuite with
RemoteSparkSession
+ with JdbcHelper {
+
+ def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
+
+ test("type, concurrency, fetch direction of result set") {
+ withExecuteQuery("SELECT 1") { rs =>
+ rs.getType === ResultSet.TYPE_FORWARD_ONLY
+ rs.getConcurrency === ResultSet.CONCUR_READ_ONLY
+ rs.getFetchDirection === ResultSet.FETCH_FORWARD
+ Seq(ResultSet.FETCH_FORWARD, ResultSet.FETCH_REVERSE,
+ ResultSet.FETCH_UNKNOWN).foreach { direction =>
+ if (direction == ResultSet.FETCH_FORWARD) {
+ rs.setFetchDirection(direction)
+ } else {
+ intercept[SQLException] {
+ rs.setFetchDirection(direction)
+ }
+ }
+ }
+ }
+ }
+
+ test("row position for empty result set") {
+ withExecuteQuery("SELECT * FROM range(0)") { rs =>
+ assert(rs.getRow === 0)
+ assert(!rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(!rs.next())
+
+ assert(rs.getRow === 0)
+ assert(!rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(!rs.isAfterLast)
+ }
+ }
+
+ test("row position for one row result set") {
+ withExecuteQuery("SELECT * FROM range(1)") { rs =>
+ assert(rs.getRow === 0)
+ assert(rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(rs.next())
+
+ assert(rs.getRow === 1)
+ assert(!rs.isBeforeFirst)
+ assert(rs.isFirst)
+ assert(rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(!rs.next())
+
+ assert(rs.getRow === 0)
+ assert(!rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(rs.isAfterLast)
+ }
+ }
+
+ test("row position for multiple rows result set") {
+ withExecuteQuery("SELECT * FROM range(2)") { rs =>
+ assert(rs.getRow === 0)
+ assert(rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(rs.next())
+
+ assert(rs.getRow === 1)
+ assert(!rs.isBeforeFirst)
+ assert(rs.isFirst)
+ assert(!rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(rs.next())
+
+ assert(rs.getRow === 2)
+ assert(!rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(rs.isLast)
+ assert(!rs.isAfterLast)
+
+ assert(!rs.next())
+
+ assert(rs.getRow === 0)
+ assert(!rs.isBeforeFirst)
+ assert(!rs.isFirst)
+ assert(!rs.isLast)
+ assert(rs.isAfterLast)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]