This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8c9fad7bb77b [SPARK-53934][CONNECT] Initial implement Connect JDBC
driver
8c9fad7bb77b is described below
commit 8c9fad7bb77ba99027acd4e8b8837720ad605c96
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Oct 23 12:41:17 2025 -0700
[SPARK-53934][CONNECT] Initial implement Connect JDBC driver
### What changes were proposed in this pull request?
This is the initial implementation of the Connect JDBC driver. In detail,
this PR implements the essential JDBC interfaces listed below.
- `java.sql.Driver`
- `java.sql.Connection`
- `java.sql.Statement`
- `java.sql.ResultSet`
- `java.sql.ResultSetMetaData`
- `java.sql.DatabaseMetaData`
At the first step, this PR only supports `NullType`, `BooleanType`,
`ByteType`, `ShortType`, `IntegerType`, `LongType`, `FloatType`, `DoubleType`,
and `StringType`.
### Why are the changes needed?
Basically implement the feature proposed in [SPIP: JDBC Driver for Spark
Connect](https://issues.apache.org/jira/browse/SPARK-53484)
### Does this PR introduce _any_ user-facing change?
It's a new feature.
### How was this patch tested?
New UTs are added.
And I have also cross-verified BeeLine cases with SPARK-54002
(https://github.com/apache/spark/pull/52706)
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52705 from pan3793/SPARK-53934.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
sql/connect/client/jdbc/pom.xml | 7 +
.../jdbc/NonRegisteringSparkConnectDriver.scala | 8 +-
.../client/jdbc/SparkConnectConnection.scala | 280 +++++++++
.../client/jdbc/SparkConnectDatabaseMetaData.scala | 600 ++++++++++++++++++
.../client/jdbc/SparkConnectResultSet.scala | 682 +++++++++++++++++++++
.../jdbc/SparkConnectResultSetMetaData.scala | 84 +++
.../client/jdbc/SparkConnectStatement.scala | 222 +++++++
.../connect/client/jdbc/util/JdbcErrorUtils.scala | 40 ++
.../connect/client/jdbc/util/JdbcTypeUtils.scala | 96 +++
.../client/jdbc/SparkConnectDriverSuite.scala | 67 +-
.../jdbc/SparkConnectJdbcDataTypeSuite.scala | 218 +++++++
.../JdbcHelper.scala} | 26 +-
12 files changed, 2313 insertions(+), 17 deletions(-)
diff --git a/sql/connect/client/jdbc/pom.xml b/sql/connect/client/jdbc/pom.xml
index 9f2ba011004d..c2dda12b1e63 100644
--- a/sql/connect/client/jdbc/pom.xml
+++ b/sql/connect/client/jdbc/pom.xml
@@ -111,6 +111,13 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-connect-client-jvm_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
<!-- Use mima to perform the compatibility check -->
<dependency>
<groupId>com.typesafe</groupId>
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/NonRegisteringSparkConnectDriver.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/NonRegisteringSparkConnectDriver.scala
index 1052f6d3e560..09e386835b7e 100644
---
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/NonRegisteringSparkConnectDriver.scala
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/NonRegisteringSparkConnectDriver.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.connect.client.jdbc
-import java.sql.{Connection, Driver, DriverPropertyInfo,
SQLFeatureNotSupportedException}
+import java.sql.{Connection, Driver, DriverPropertyInfo, SQLException,
SQLFeatureNotSupportedException}
import java.util.Properties
import java.util.logging.Logger
@@ -29,7 +29,11 @@ class NonRegisteringSparkConnectDriver extends Driver {
override def acceptsURL(url: String): Boolean = url.startsWith("jdbc:sc://")
override def connect(url: String, info: Properties): Connection = {
- throw new UnsupportedOperationException("TODO(SPARK-53934)")
+ if (url == null) {
+ throw new SQLException("url must not be null")
+ }
+
+ if (this.acceptsURL(url)) new SparkConnectConnection(url, info) else null
}
override def getPropertyInfo(url: String, info: Properties):
Array[DriverPropertyInfo] =
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala
new file mode 100644
index 000000000000..95ec956771db
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectConnection.scala
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => JdbcArray, _}
+import java.util
+import java.util.Properties
+import java.util.concurrent.Executor
+
+import org.apache.spark.sql.connect.SparkSession
+import org.apache.spark.sql.connect.client.SparkConnectClient
+import org.apache.spark.sql.connect.client.jdbc.util.JdbcErrorUtils._
+
+class SparkConnectConnection(val url: String, val info: Properties) extends
Connection {
+
+ private[jdbc] val client = SparkConnectClient
+ .builder()
+ .loadFromEnvironment()
+ .userAgent("Spark Connect JDBC")
+ .connectionString(url.stripPrefix("jdbc:"))
+ .build()
+
+ private[jdbc] val spark = SparkSession.builder().client(client).create()
+
+ @volatile private var closed: Boolean = false
+
+ override def isClosed: Boolean = closed
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ spark.close()
+ closed = true
+ }
+ }
+
+ private[jdbc] def checkOpen(): Unit = {
+ if (closed) {
+ throw new SQLException("JDBC Connection is closed.")
+ }
+ if (!client.isSessionValid) {
+ throw new SQLException(s"Spark Connect Session ${client.sessionId} is
invalid.")
+ }
+ }
+
+ override def isValid(timeout: Int): Boolean = !closed &&
client.isSessionValid
+
+ override def setCatalog(catalog: String): Unit = {
+ checkOpen()
+ spark.catalog.setCurrentCatalog(catalog)
+ }
+
+ override def getCatalog: String = {
+ checkOpen()
+ spark.catalog.currentCatalog()
+ }
+
+ override def setSchema(schema: String): Unit = {
+ checkOpen()
+ spark.catalog.setCurrentDatabase(schema)
+ }
+
+ override def getSchema: String = {
+ checkOpen()
+ spark.catalog.currentDatabase
+ }
+
+ override def getMetaData: DatabaseMetaData = {
+ checkOpen()
+ new SparkConnectDatabaseMetaData(this)
+ }
+
+ override def createStatement(): Statement = {
+ checkOpen()
+ new SparkConnectStatement(this)
+ }
+
+ override def prepareStatement(sql: String): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareCall(sql: String): CallableStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def createStatement(
+ resultSetType: Int,
+ resultSetConcurrency: Int,
+ resultSetHoldability: Int): Statement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareStatement(
+ sql: String,
+ resultSetType: Int,
+ resultSetConcurrency: Int,
+ resultSetHoldability: Int): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareCall(
+ sql: String,
+ resultSetType: Int,
+ resultSetConcurrency: Int,
+ resultSetHoldability: Int): CallableStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareStatement(
+ sql: String, autoGeneratedKeys: Int): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareStatement(
+ sql: String, columnIndexes: Array[Int]): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareStatement(
+ sql: String, columnNames: Array[String]): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def createStatement(
+ resultSetType: Int, resultSetConcurrency: Int): Statement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareStatement(
+ sql: String,
+ resultSetType: Int,
+ resultSetConcurrency: Int): PreparedStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def prepareCall(
+ sql: String,
+ resultSetType: Int,
+ resultSetConcurrency: Int): CallableStatement =
+ throw new SQLFeatureNotSupportedException
+
+ override def nativeSQL(sql: String): String =
+ throw new SQLFeatureNotSupportedException
+
+ override def setAutoCommit(autoCommit: Boolean): Unit = {
+ checkOpen()
+ if (!autoCommit) {
+ throw new SQLFeatureNotSupportedException("Only auto-commit mode is
supported")
+ }
+ }
+
+ override def getAutoCommit: Boolean = {
+ checkOpen()
+ true
+ }
+
+ override def commit(): Unit = {
+ checkOpen()
+ throw new SQLException("Connection is in auto-commit mode")
+ }
+
+ override def rollback(): Unit = {
+ checkOpen()
+ throw new SQLException("Connection is in auto-commit mode")
+ }
+
+ override def setReadOnly(readOnly: Boolean): Unit = {
+ checkOpen()
+ if (readOnly) {
+ throw new SQLFeatureNotSupportedException("Read-only mode is not
supported")
+ }
+ }
+
+ override def isReadOnly: Boolean = {
+ checkOpen()
+ false
+ }
+
+ override def setTransactionIsolation(level: Int): Unit = {
+ checkOpen()
+ if (level != Connection.TRANSACTION_NONE) {
+ throw new SQLFeatureNotSupportedException(
+ "Requested transaction isolation level " +
+ s"${stringfiyTransactionIsolationLevel(level)} is not supported")
+ }
+ }
+
+ override def getTransactionIsolation: Int = {
+ checkOpen()
+ Connection.TRANSACTION_NONE
+ }
+
+ override def getWarnings: SQLWarning = null
+
+ override def clearWarnings(): Unit = {}
+
+ override def getTypeMap: util.Map[String, Class[_]] =
+ throw new SQLFeatureNotSupportedException
+
+ override def setTypeMap(map: util.Map[String, Class[_]]): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def setHoldability(holdability: Int): Unit = {
+ if (holdability != ResultSet.HOLD_CURSORS_OVER_COMMIT) {
+ throw new SQLFeatureNotSupportedException(
+ s"Holdability ${stringfiyHoldability(holdability)} is not supported")
+ }
+ }
+
+ override def getHoldability: Int = ResultSet.HOLD_CURSORS_OVER_COMMIT
+
+ override def setSavepoint(): Savepoint =
+ throw new SQLFeatureNotSupportedException
+
+ override def setSavepoint(name: String): Savepoint =
+ throw new SQLFeatureNotSupportedException
+
+ override def rollback(savepoint: Savepoint): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def releaseSavepoint(savepoint: Savepoint): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def createClob(): Clob =
+ throw new SQLFeatureNotSupportedException
+
+ override def createBlob(): Blob =
+ throw new SQLFeatureNotSupportedException
+
+ override def createNClob(): NClob =
+ throw new SQLFeatureNotSupportedException
+
+ override def createSQLXML(): SQLXML =
+ throw new SQLFeatureNotSupportedException
+
+ override def setClientInfo(name: String, value: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def setClientInfo(properties: Properties): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getClientInfo(name: String): String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getClientInfo: Properties =
+ throw new SQLFeatureNotSupportedException
+
+ override def createArrayOf(typeName: String, elements: Array[AnyRef]):
JdbcArray =
+ throw new SQLFeatureNotSupportedException
+
+ override def createStruct(typeName: String, attributes: Array[AnyRef]):
Struct =
+ throw new SQLFeatureNotSupportedException
+
+ override def abort(executor: Executor): Unit = {
+ if (executor == null) {
+ throw new SQLException("executor can not be null")
+ }
+ if (!closed) {
+ executor.execute { () => this.close() }
+ }
+ }
+
+ override def setNetworkTimeout(executor: Executor, milliseconds: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNetworkTimeout: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
+ iface.asInstanceOf[T]
+ } else {
+ throw new SQLException(s"${this.getClass.getName} not unwrappable from
${iface.getName}")
+ }
+
+ override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
new file mode 100644
index 000000000000..4efbd2b8f917
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDatabaseMetaData.scala
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
+import org.apache.spark.util.VersionUtils
+
+class SparkConnectDatabaseMetaData(conn: SparkConnectConnection) extends
DatabaseMetaData {
+
+ override def allProceduresAreCallable: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def allTablesAreSelectable: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getURL: String = conn.url
+
+ override def getUserName: String = conn.spark.client.configuration.userName
+
+ override def isReadOnly: Boolean = false
+
+ override def nullsAreSortedHigh: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def nullsAreSortedLow: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def nullsAreSortedAtStart: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def nullsAreSortedAtEnd: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDatabaseProductName: String = "Apache Spark Connect Server"
+
+ override def getDatabaseProductVersion: String = conn.spark.version
+
+ override def getDriverName: String = "Apache Spark Connect JDBC Driver"
+
+ override def getDriverVersion: String = SPARK_VERSION
+
+ override def getDriverMajorVersion: Int =
VersionUtils.majorVersion(SPARK_VERSION)
+
+ override def getDriverMinorVersion: Int =
VersionUtils.minorVersion(SPARK_VERSION)
+
+ override def usesLocalFiles: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def usesLocalFilePerTable: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMixedCaseIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesUpperCaseIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesLowerCaseIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesMixedCaseIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMixedCaseQuotedIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesUpperCaseQuotedIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesLowerCaseQuotedIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def storesMixedCaseQuotedIdentifiers: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getIdentifierQuoteString: String = "`"
+
+ override def getSQLKeywords: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNumericFunctions: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getStringFunctions: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSystemFunctions: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTimeDateFunctions: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSearchStringEscape: String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getExtraNameCharacters: String = ""
+
+ override def supportsAlterTableWithAddColumn: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsAlterTableWithDropColumn: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsColumnAliasing: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def nullPlusNonNullIsNull: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsConvert: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsConvert(fromType: Int, toType: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsTableCorrelationNames: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsDifferentTableCorrelationNames: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsExpressionsInOrderBy: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOrderByUnrelated: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsGroupBy: Boolean = true
+
+ override def supportsGroupByUnrelated: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsGroupByBeyondSelect: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsLikeEscapeClause: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMultipleResultSets: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMultipleTransactions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsNonNullableColumns: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMinimumSQLGrammar: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCoreSQLGrammar: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsExtendedSQLGrammar: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsANSI92EntryLevelSQL: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsANSI92IntermediateSQL: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsANSI92FullSQL: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsIntegrityEnhancementFacility: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOuterJoins: Boolean = true
+
+ override def supportsFullOuterJoins: Boolean = true
+
+ override def supportsLimitedOuterJoins: Boolean = true
+
+ override def getSchemaTerm: String = "schema"
+
+ override def getProcedureTerm: String = "procedure"
+
+ override def getCatalogTerm: String = "catalog"
+
+ override def isCatalogAtStart: Boolean = true
+
+ override def getCatalogSeparator: String = "."
+
+ override def supportsSchemasInDataManipulation: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSchemasInProcedureCalls: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSchemasInTableDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSchemasInIndexDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSchemasInPrivilegeDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCatalogsInDataManipulation: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCatalogsInProcedureCalls: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCatalogsInTableDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCatalogsInIndexDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCatalogsInPrivilegeDefinitions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsPositionedDelete: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsPositionedUpdate: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSelectForUpdate: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsStoredProcedures: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSubqueriesInComparisons: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSubqueriesInExists: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSubqueriesInIns: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsSubqueriesInQuantifieds: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsCorrelatedSubqueries: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsUnion: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsUnionAll: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOpenCursorsAcrossCommit: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOpenCursorsAcrossRollback: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOpenStatementsAcrossCommit: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsOpenStatementsAcrossRollback: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxBinaryLiteralLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxCharLiteralLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnsInGroupBy: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnsInIndex: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnsInOrderBy: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnsInSelect: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxColumnsInTable: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxConnections: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxCursorNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxIndexLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxSchemaNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxProcedureNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxCatalogNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxRowSize: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def doesMaxRowSizeIncludeBlobs: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxStatementLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxStatements: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxTableNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxTablesInSelect: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxUserNameLength: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDefaultTransactionIsolation: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsTransactions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsTransactionIsolationLevel(level: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsDataDefinitionAndDataManipulationTransactions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsDataManipulationTransactionsOnly: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def dataDefinitionCausesTransactionCommit: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def dataDefinitionIgnoredInTransactions: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getProcedures(
+ catalog: String,
+ schemaPattern: String,
+ procedureNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getProcedureColumns(
+ catalog: String,
+ schemaPattern: String,
+ procedureNamePattern: String,
+ columnNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getCatalogs: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSchemas: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSchemas(catalog: String, schemaPattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTableTypes: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTables(
+ catalog: String,
+ schemaPattern: String,
+ tableNamePattern: String,
+ types: Array[String]): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getColumns(
+ catalog: String,
+ schemaPattern: String,
+ tableNamePattern: String,
+ columnNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getColumnPrivileges(
+ catalog: String,
+ schema: String,
+ table: String,
+ columnNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTablePrivileges(
+ catalog: String,
+ schemaPattern: String,
+ tableNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBestRowIdentifier(
+ catalog: String,
+ schema: String,
+ table: String,
+ scope: Int,
+ nullable: Boolean): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getVersionColumns(
+ catalog: String, schema: String, table: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getPrimaryKeys(catalog: String, schema: String, table: String):
ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getImportedKeys(catalog: String, schema: String, table:
String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getExportedKeys(catalog: String, schema: String, table:
String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getCrossReference(
+ parentCatalog: String,
+ parentSchema: String,
+ parentTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTypeInfo: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getIndexInfo(
+ catalog: String,
+ schema: String,
+ table: String,
+ unique: Boolean,
+ approximate: Boolean): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsResultSetType(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsResultSetConcurrency(`type`: Int, concurrency: Int):
Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def ownUpdatesAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def ownDeletesAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def ownInsertsAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def othersUpdatesAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def othersDeletesAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def othersInsertsAreVisible(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def updatesAreDetected(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def deletesAreDetected(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def insertsAreDetected(`type`: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsBatchUpdates: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getUDTs(
+ catalog: String,
+ schemaPattern: String,
+ typeNamePattern: String,
+ types: Array[Int]): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getConnection: Connection = conn
+
+ override def supportsSavepoints: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsNamedParameters: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsMultipleOpenResults: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsGetGeneratedKeys: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSuperTypes(
+ catalog: String,
+ schemaPattern: String,
+ typeNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSuperTables(
+ catalog: String,
+ schemaPattern: String,
+ tableNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getAttributes(
+ catalog: String,
+ schemaPattern: String,
+ typeNamePattern: String,
+ attributeNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsResultSetHoldability(holdability: Int): Boolean =
+ holdability == ResultSet.CLOSE_CURSORS_AT_COMMIT
+
+ override def getResultSetHoldability: Int = ResultSet.CLOSE_CURSORS_AT_COMMIT
+
+ override def getDatabaseMajorVersion: Int =
VersionUtils.majorVersion(conn.spark.version)
+
+ override def getDatabaseMinorVersion: Int =
VersionUtils.minorVersion(conn.spark.version)
+
+ // JSR-221 defines JDBC 4.0 API Specification -
https://jcp.org/en/jsr/detail?id=221
+ // JDBC 4.3 is the latest Maintenance version of the JDBC 4.0 specification
as of JDK 17
+ //
https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/package-summary.html
+ override def getJDBCMajorVersion: Int = 4
+
+ override def getJDBCMinorVersion: Int = 3
+
+ override def getSQLStateType: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def locatorsUpdateCopy: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def supportsStatementPooling: Boolean = false
+
+ override def getRowIdLifetime: RowIdLifetime =
RowIdLifetime.ROWID_UNSUPPORTED
+
+ override def supportsStoredFunctionsUsingCallSyntax: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def autoCommitFailureClosesAllResultSets: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getClientInfoProperties: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFunctions(
+ catalog: String,
+ schemaPattern: String,
+ functionNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFunctionColumns(
+ catalog: String,
+ schemaPattern: String,
+ functionNamePattern: String,
+ columnNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def getPseudoColumns(
+ catalog: String,
+ schemaPattern: String,
+ tableNamePattern: String,
+ columnNamePattern: String): ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def generatedKeyAlwaysReturned: Boolean = false
+
+ override def getMaxLogicalLobSize: Long = 0
+
+ override def supportsRefCursors: Boolean = false
+
+ override def supportsSharding: Boolean = false
+
+ override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
+ iface.asInstanceOf[T]
+ } else {
+ throw new SQLException(s"${this.getClass.getName} not unwrappable from
${iface.getName}")
+ }
+
+ override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
new file mode 100644
index 000000000000..38417b0de217
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSet.scala
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.io.{InputStream, Reader}
+import java.net.URL
+import java.sql.{Array => JdbcArray, _}
+import java.util
+import java.util.Calendar
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connect.client.SparkResult
+
+class SparkConnectResultSet(
+ sparkResult: SparkResult[Row],
+ stmt: SparkConnectStatement = null) extends ResultSet {
+
+ private val iterator = sparkResult.destructiveIterator
+
+ private var currentRow: Row = _
+
+ private var _wasNull: Boolean = false
+
+ override def wasNull: Boolean = _wasNull
+
+ override def next(): Boolean = {
+ val hasNext = iterator.hasNext
+ if (hasNext) {
+ currentRow = iterator.next()
+ } else {
+ currentRow = null
+ }
+ hasNext
+ }
+
+ @volatile protected var closed: Boolean = false
+
+ override def isClosed: Boolean = closed
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ iterator.close()
+ sparkResult.close()
+ closed = true
+ }
+ }
+
+ private[jdbc] def checkOpen(): Unit = {
+ if (closed) {
+ throw new SQLException("JDBC Statement is closed.")
+ }
+ }
+
+ override def findColumn(columnLabel: String): Int = {
+ sparkResult.schema.getFieldIndex(columnLabel) match {
+ case Some(i) => i + 1
+ case None =>
+ throw new SQLException(s"Invalid column label: $columnLabel")
+ }
+ }
+
+ override def getString(columnIndex: Int): String = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return null
+ }
+ _wasNull = false
+ String.valueOf(currentRow.get(columnIndex - 1))
+ }
+
+ override def getBoolean(columnIndex: Int): Boolean = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return false
+ }
+ _wasNull = false
+ currentRow.getBoolean(columnIndex - 1)
+ }
+
+ override def getByte(columnIndex: Int): Byte = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0.toByte
+ }
+ _wasNull = false
+ currentRow.getByte(columnIndex - 1)
+ }
+
+ override def getShort(columnIndex: Int): Short = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0.toShort
+ }
+ _wasNull = false
+ currentRow.getShort(columnIndex - 1)
+ }
+
+ override def getInt(columnIndex: Int): Int = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0
+ }
+ _wasNull = false
+ currentRow.getInt(columnIndex - 1)
+ }
+
+ override def getLong(columnIndex: Int): Long = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0L
+ }
+ _wasNull = false
+ currentRow.getLong(columnIndex - 1)
+ }
+
+ override def getFloat(columnIndex: Int): Float = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0.toFloat
+ }
+ _wasNull = false
+ currentRow.getFloat(columnIndex - 1)
+ }
+
+ override def getDouble(columnIndex: Int): Double = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return 0.toDouble
+ }
+ _wasNull = false
+ currentRow.getDouble(columnIndex - 1)
+ }
+
+ override def getBigDecimal(columnIndex: Int, scale: Int):
java.math.BigDecimal =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBytes(columnIndex: Int): Array[Byte] =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDate(columnIndex: Int): Date =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTime(columnIndex: Int): Time =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTimestamp(columnIndex: Int): Timestamp =
+ throw new SQLFeatureNotSupportedException
+
+ override def getAsciiStream(columnIndex: Int): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getUnicodeStream(columnIndex: Int): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBinaryStream(columnIndex: Int): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getString(columnLabel: String): String =
+ getString(findColumn(columnLabel))
+
+ override def getBoolean(columnLabel: String): Boolean =
+ getBoolean(findColumn(columnLabel))
+
+ override def getByte(columnLabel: String): Byte =
+ getByte(findColumn(columnLabel))
+
+ override def getShort(columnLabel: String): Short =
+ getShort(findColumn(columnLabel))
+
+ override def getInt(columnLabel: String): Int =
+ getInt(findColumn(columnLabel))
+
+ override def getLong(columnLabel: String): Long =
+ getLong(findColumn(columnLabel))
+
+ override def getFloat(columnLabel: String): Float =
+ getFloat(findColumn(columnLabel))
+
+ override def getDouble(columnLabel: String): Double =
+ getDouble(findColumn(columnLabel))
+
+ override def getBigDecimal(columnLabel: String, scale: Int):
java.math.BigDecimal =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBytes(columnLabel: String): Array[Byte] =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDate(columnLabel: String): Date =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTime(columnLabel: String): Time =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTimestamp(columnLabel: String): Timestamp =
+ throw new SQLFeatureNotSupportedException
+
+ override def getAsciiStream(columnLabel: String): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getUnicodeStream(columnLabel: String): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBinaryStream(columnLabel: String): InputStream =
+ throw new SQLFeatureNotSupportedException
+
+ override def getWarnings: SQLWarning = null
+
+ override def clearWarnings(): Unit = {}
+
+ override def getCursorName: String = throw new
SQLFeatureNotSupportedException
+
+ override def getMetaData: ResultSetMetaData = {
+ checkOpen()
+ new SparkConnectResultSetMetaData(sparkResult.schema)
+ }
+
+ override def getObject(columnIndex: Int): AnyRef = {
+ if (currentRow.isNullAt(columnIndex - 1)) {
+ _wasNull = true
+ return null
+ }
+ _wasNull = false
+ currentRow.get(columnIndex - 1).asInstanceOf[AnyRef]
+ }
+
+ override def getObject(columnLabel: String): AnyRef =
+ getObject(findColumn(columnLabel))
+
+ override def getCharacterStream(columnIndex: Int): Reader =
+ throw new SQLFeatureNotSupportedException
+
+ override def getCharacterStream(columnLabel: String): Reader =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBigDecimal(columnIndex: Int): java.math.BigDecimal =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBigDecimal(columnLabel: String): java.math.BigDecimal =
+ throw new SQLFeatureNotSupportedException
+
+ override def isBeforeFirst: Boolean = throw new
SQLFeatureNotSupportedException
+
+ override def isAfterLast: Boolean = throw new SQLFeatureNotSupportedException
+
+ override def isFirst: Boolean = throw new SQLFeatureNotSupportedException
+
+ override def isLast: Boolean = throw new SQLFeatureNotSupportedException
+
+ override def beforeFirst(): Unit = throw new SQLFeatureNotSupportedException
+
+ override def afterLast(): Unit = throw new SQLFeatureNotSupportedException
+
+ override def first(): Boolean = throw new SQLFeatureNotSupportedException
+
+ override def last(): Boolean = throw new SQLFeatureNotSupportedException
+
+ override def getRow: Int = throw new SQLFeatureNotSupportedException
+
+ override def absolute(row: Int): Boolean = throw new
SQLFeatureNotSupportedException
+
+ override def relative(rows: Int): Boolean = throw new
SQLFeatureNotSupportedException
+
+ override def previous(): Boolean = throw new SQLFeatureNotSupportedException
+
+ override def setFetchDirection(direction: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFetchDirection: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def setFetchSize(rows: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFetchSize: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getType: Int = {
+ checkOpen()
+ ResultSet.TYPE_FORWARD_ONLY
+ }
+
+ override def getConcurrency: Int = {
+ checkOpen()
+ ResultSet.CONCUR_READ_ONLY
+ }
+
+ override def rowUpdated(): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def rowInserted(): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def rowDeleted(): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNull(columnIndex: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBoolean(columnIndex: Int, x: Boolean): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateByte(columnIndex: Int, x: Byte): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateShort(columnIndex: Int, x: Short): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateInt(columnIndex: Int, x: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateLong(columnIndex: Int, x: Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateFloat(columnIndex: Int, x: Float): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateDouble(columnIndex: Int, x: Double): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBigDecimal(columnIndex: Int, x: java.math.BigDecimal):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateString(columnIndex: Int, x: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBytes(columnIndex: Int, x: scala.Array[Byte]): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateDate(columnIndex: Int, x: Date): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateTime(columnIndex: Int, x: Time): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateTimestamp(columnIndex: Int, x: Timestamp): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnIndex: Int, x: InputStream, length:
Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnIndex: Int, x: InputStream, length:
Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnIndex: Int, x: Reader, length:
Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateObject(columnIndex: Int, x: Any, scaleOrLength: Int):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateObject(columnIndex: Int, x: Any): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNull(columnLabel: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBoolean(columnLabel: String, x: Boolean): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateByte(columnLabel: String, x: Byte): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateShort(columnLabel: String, x: Short): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateInt(columnLabel: String, x: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateLong(columnLabel: String, x: Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateFloat(columnLabel: String, x: Float): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateDouble(columnLabel: String, x: Double): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBigDecimal(columnLabel: String, x: java.math.BigDecimal):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateString(columnLabel: String, x: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBytes(columnLabel: String, x: Array[Byte]): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateDate(columnLabel: String, x: Date): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateTime(columnLabel: String, x: Time): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateTimestamp(columnLabel: String, x: Timestamp): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnLabel: String, x: InputStream, length:
Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnLabel: String, x: InputStream, length:
Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnLabel: String, reader: Reader,
length: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateObject(columnLabel: String, x: Any, scaleOrLength: Int):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateObject(columnLabel: String, x: Any): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def insertRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def deleteRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def refreshRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def cancelRowUpdates(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def moveToInsertRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def moveToCurrentRow(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getStatement: Statement = {
+ checkOpen()
+ stmt
+ }
+
+ override def getObject(columnIndex: Int, map: util.Map[String, Class[_]]):
AnyRef =
+ throw new SQLFeatureNotSupportedException
+
+ override def getRef(columnIndex: Int): Ref =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBlob(columnIndex: Int): Blob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getClob(columnIndex: Int): Clob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getArray(columnIndex: Int): JdbcArray =
+ throw new SQLFeatureNotSupportedException
+
+ override def getObject(columnLabel: String, map: util.Map[String,
Class[_]]): AnyRef =
+ throw new SQLFeatureNotSupportedException
+
+ override def getRef(columnLabel: String): Ref =
+ throw new SQLFeatureNotSupportedException
+
+ override def getBlob(columnLabel: String): Blob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getClob(columnLabel: String): Clob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getArray(columnLabel: String): JdbcArray =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDate(columnIndex: Int, cal: Calendar): Date =
+ throw new SQLFeatureNotSupportedException
+
+ override def getDate(columnLabel: String, cal: Calendar): Date =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTime(columnIndex: Int, cal: Calendar): Time =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTime(columnLabel: String, cal: Calendar): Time =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTimestamp(columnIndex: Int, cal: Calendar): Timestamp =
+ throw new SQLFeatureNotSupportedException
+
+ override def getTimestamp(columnLabel: String, cal: Calendar): Timestamp =
+ throw new SQLFeatureNotSupportedException
+
+ override def getURL(columnIndex: Int): URL =
+ throw new SQLFeatureNotSupportedException
+
+ override def getURL(columnLabel: String): URL =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateRef(columnIndex: Int, x: Ref): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateRef(columnLabel: String, x: Ref): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnIndex: Int, x: Blob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnLabel: String, x: Blob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnIndex: Int, x: Clob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnLabel: String, x: Clob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateArray(columnIndex: Int, x: JdbcArray): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateArray(columnLabel: String, x: JdbcArray): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getRowId(columnIndex: Int): RowId =
+ throw new SQLFeatureNotSupportedException
+
+ override def getRowId(columnLabel: String): RowId =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateRowId(columnIndex: Int, x: RowId): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateRowId(columnLabel: String, x: RowId): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getHoldability: Int = ResultSet.HOLD_CURSORS_OVER_COMMIT
+
+ override def updateNString(columnIndex: Int, nString: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNString(columnLabel: String, nString: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnIndex: Int, nClob: NClob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnLabel: String, nClob: NClob): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNClob(columnIndex: Int): NClob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNClob(columnLabel: String): NClob =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSQLXML(columnIndex: Int): SQLXML =
+ throw new SQLFeatureNotSupportedException
+
+ override def getSQLXML(columnLabel: String): SQLXML =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateSQLXML(columnIndex: Int, xmlObject: SQLXML): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateSQLXML(columnLabel: String, xmlObject: SQLXML): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNString(columnIndex: Int): String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNString(columnLabel: String): String =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNCharacterStream(columnIndex: Int): Reader =
+ throw new SQLFeatureNotSupportedException
+
+ override def getNCharacterStream(columnLabel: String): Reader =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNCharacterStream(columnIndex: Int, x: Reader, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNCharacterStream(columnLabel: String, reader: Reader,
length: Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnIndex: Int, x: InputStream, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnIndex: Int, x: InputStream, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnIndex: Int, x: Reader, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnLabel: String, x: InputStream, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnLabel: String, x: InputStream, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnLabel: String, reader: Reader,
length: Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnIndex: Int, inputStream: InputStream, length:
Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnLabel: String, inputStream: InputStream,
length: Long): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnIndex: Int, reader: Reader, length: Long):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnLabel: String, reader: Reader, length: Long):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnIndex: Int, reader: Reader, length: Long):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnLabel: String, reader: Reader, length: Long):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNCharacterStream(columnIndex: Int, x: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNCharacterStream(columnLabel: String, reader: Reader):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnIndex: Int, x: InputStream): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnIndex: Int, x: InputStream): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnIndex: Int, x: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateAsciiStream(columnLabel: String, x: InputStream): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBinaryStream(columnLabel: String, x: InputStream): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateCharacterStream(columnLabel: String, reader: Reader):
Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnIndex: Int, inputStream: InputStream): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateBlob(columnLabel: String, inputStream: InputStream): Unit
=
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnIndex: Int, reader: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateClob(columnLabel: String, reader: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnIndex: Int, reader: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def updateNClob(columnLabel: String, reader: Reader): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getObject[T](columnIndex: Int, `type`: Class[T]): T =
+ throw new SQLFeatureNotSupportedException
+
+ override def getObject[T](columnLabel: String, `type`: Class[T]): T =
+ throw new SQLFeatureNotSupportedException
+
+ override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
+ iface.asInstanceOf[T]
+ } else {
+ throw new SQLException(s"${this.getClass.getName} not unwrappable from
${iface.getName}")
+ }
+
+ override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetMetaData.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetMetaData.scala
new file mode 100644
index 000000000000..38e8f1d2e5d9
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectResultSetMetaData.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+import java.sql.ResultSetMetaData.{columnNoNulls, columnNullable}
+
+import org.apache.spark.sql.connect.client.jdbc.util.JdbcTypeUtils
+import org.apache.spark.sql.types._
+
+class SparkConnectResultSetMetaData(schema: StructType) extends
ResultSetMetaData {
+
+ override def getColumnCount: Int = schema.length
+
+ override def isAutoIncrement(column: Int): Boolean = false
+
+ override def isCaseSensitive(column: Int): Boolean = false
+
+ override def isSearchable(column: Int): Boolean = false
+
+ override def isCurrency(column: Int): Boolean = false
+
+ override def isNullable(column: Int): Int =
+ if (schema(column - 1).nullable) columnNullable else columnNoNulls
+
+ override def isSigned(column: Int): Boolean =
+ JdbcTypeUtils.isSigned(schema(column - 1))
+
+ override def getColumnDisplaySize(column: Int): Int =
+ JdbcTypeUtils.getDisplaySize(schema(column - 1))
+
+ override def getColumnLabel(column: Int): String = getColumnName(column)
+
+ override def getColumnName(column: Int): String = schema(column - 1).name
+
+ override def getColumnType(column: Int): Int =
+ JdbcTypeUtils.getColumnType(schema(column - 1))
+
+ override def getColumnTypeName(column: Int): String = schema(column -
1).dataType.sql
+
+ override def getColumnClassName(column: Int): String =
+ JdbcTypeUtils.getColumnTypeClassName(schema(column - 1))
+
+ override def getPrecision(column: Int): Int =
+ JdbcTypeUtils.getPrecision(schema(column - 1))
+
+ override def getScale(column: Int): Int =
+ JdbcTypeUtils.getScale(schema(column - 1))
+
+ override def getCatalogName(column: Int): String = ""
+
+ override def getSchemaName(column: Int): String = ""
+
+ override def getTableName(column: Int): String = ""
+
+ override def isReadOnly(column: Int): Boolean = true
+
+ override def isWritable(column: Int): Boolean = false
+
+ override def isDefinitelyWritable(column: Int): Boolean = false
+
+ override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
+ iface.asInstanceOf[T]
+ } else {
+ throw new SQLException(s"${this.getClass.getName} not unwrappable from
${iface.getName}")
+ }
+
+ override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
new file mode 100644
index 000000000000..8de227f9d07c
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectStatement.scala
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.{Array => _, _}
+
+class SparkConnectStatement(conn: SparkConnectConnection) extends Statement {
+
+ private var operationId: String = _
+ private var resultSet: SparkConnectResultSet = _
+
+ @volatile private var closed: Boolean = false
+
+ override def isClosed: Boolean = closed
+
+ override def close(): Unit = synchronized {
+ if (!closed) {
+ if (operationId != null) {
+ conn.spark.interruptOperation(operationId)
+ operationId = null
+ }
+ if (resultSet != null) {
+ resultSet.close()
+ resultSet = null
+ }
+ closed = false
+ }
+ }
+
+ private[jdbc] def checkOpen(): Unit = {
+ if (closed) {
+ throw new SQLException("JDBC Statement is closed.")
+ }
+ }
+
+ override def executeQuery(sql: String): ResultSet = {
+ checkOpen()
+
+ val df = conn.spark.sql(sql)
+ val sparkResult = df.collectResult()
+ operationId = sparkResult.operationId
+ resultSet = new SparkConnectResultSet(sparkResult, this)
+ resultSet
+ }
+
+ override def executeUpdate(sql: String): Int = {
+ checkOpen()
+
+ val df = conn.spark.sql(sql)
+ val sparkResult = df.collectResult()
+ operationId = sparkResult.operationId
+ resultSet = null
+
+ // always return 0 because affected rows is not supported yet
+ 0
+ }
+
+ override def execute(sql: String): Boolean = {
+ checkOpen()
+
+ // always perform executeQuery and reture a ResultSet
+ executeQuery(sql)
+ true
+ }
+
+ override def getResultSet: ResultSet = {
+ checkOpen()
+ resultSet
+ }
+
+ override def getMaxFieldSize: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def setMaxFieldSize(max: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMaxRows: Int = {
+ checkOpen()
+ 0
+ }
+
+ override def setMaxRows(max: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def setEscapeProcessing(enable: Boolean): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getQueryTimeout: Int = {
+ checkOpen()
+ 0
+ }
+
+ override def setQueryTimeout(seconds: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def cancel(): Unit = {
+ checkOpen()
+
+ if (operationId != null) {
+ conn.spark.interruptOperation(operationId)
+ }
+ }
+
+ override def getWarnings: SQLWarning = null
+
+ override def clearWarnings(): Unit = {}
+
+ override def setCursorName(name: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getUpdateCount: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getMoreResults: Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def setFetchDirection(direction: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFetchDirection: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def setFetchSize(rows: Int): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def getFetchSize: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def getResultSetConcurrency: Int = {
+ checkOpen()
+ ResultSet.CONCUR_READ_ONLY
+ }
+
+ override def getResultSetType: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def addBatch(sql: String): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def clearBatch(): Unit =
+ throw new SQLFeatureNotSupportedException
+
+ override def executeBatch(): Array[Int] =
+ throw new SQLFeatureNotSupportedException
+
+ override def getConnection: Connection = {
+ checkOpen()
+ conn
+ }
+
+ override def getMoreResults(current: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getGeneratedKeys: ResultSet =
+ throw new SQLFeatureNotSupportedException
+
+ override def executeUpdate(sql: String, autoGeneratedKeys: Int): Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def executeUpdate(sql: String, columnIndexes: Array[Int]): Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def executeUpdate(sql: String, columnNames: Array[String]): Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def execute(sql: String, autoGeneratedKeys: Int): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def execute(sql: String, columnIndexes: Array[Int]): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def execute(sql: String, columnNames: Array[String]): Boolean =
+ throw new SQLFeatureNotSupportedException
+
+ override def getResultSetHoldability: Int =
+ throw new SQLFeatureNotSupportedException
+
+ override def setPoolable(poolable: Boolean): Unit = {
+ checkOpen()
+
+ if (poolable) {
+ throw new SQLFeatureNotSupportedException("Poolable statement is not
supported")
+ }
+ }
+
+ override def isPoolable: Boolean = {
+ checkOpen()
+ false
+ }
+
+ override def closeOnCompletion(): Unit = {
+ checkOpen()
+ }
+
+ override def isCloseOnCompletion: Boolean = {
+ checkOpen()
+ false
+ }
+
+ override def unwrap[T](iface: Class[T]): T = if (isWrapperFor(iface)) {
+ iface.asInstanceOf[T]
+ } else {
+ throw new SQLException(s"${this.getClass.getName} not unwrappable from
${iface.getName}")
+ }
+
+ override def isWrapperFor(iface: Class[_]): Boolean = iface.isInstance(this)
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
new file mode 100644
index 000000000000..cb941b7420a7
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcErrorUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc.util
+
+import java.sql.{Array => _, _}
+
+private[jdbc] object JdbcErrorUtils {
+
+ def stringfiyTransactionIsolationLevel(level: Int): String = level match {
+ case Connection.TRANSACTION_NONE => "NONE"
+ case Connection.TRANSACTION_READ_UNCOMMITTED => "READ_UNCOMMITTED"
+ case Connection.TRANSACTION_READ_COMMITTED => "READ_COMMITTED"
+ case Connection.TRANSACTION_REPEATABLE_READ => "REPEATABLE_READ"
+ case Connection.TRANSACTION_SERIALIZABLE => "SERIALIZABLE"
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid transaction isolation
level: $level")
+ }
+
+ def stringfiyHoldability(holdability: Int): String = holdability match {
+ case ResultSet.HOLD_CURSORS_OVER_COMMIT => "HOLD_CURSORS_OVER_COMMIT"
+ case ResultSet.CLOSE_CURSORS_AT_COMMIT => "CLOSE_CURSORS_AT_COMMIT"
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid holdability: $holdability")
+ }
+}
diff --git
a/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcTypeUtils.scala
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcTypeUtils.scala
new file mode 100644
index 000000000000..55e3d29c99a5
--- /dev/null
+++
b/sql/connect/client/jdbc/src/main/scala/org/apache/spark/sql/connect/client/jdbc/util/JdbcTypeUtils.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc.util
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float
=> JFloat, Long => JLong, Short => JShort}
+import java.sql.{Array => _, _}
+
+import org.apache.spark.sql.types._
+
+private[jdbc] object JdbcTypeUtils {
+
+ def getColumnType(field: StructField): Int = field.dataType match {
+ case NullType => Types.NULL
+ case BooleanType => Types.BOOLEAN
+ case ByteType => Types.TINYINT
+ case ShortType => Types.SMALLINT
+ case IntegerType => Types.INTEGER
+ case LongType => Types.BIGINT
+ case FloatType => Types.FLOAT
+ case DoubleType => Types.DOUBLE
+ case StringType => Types.VARCHAR
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+
+ def getColumnTypeClassName(field: StructField): String = field.dataType
match {
+ case NullType => "null"
+ case BooleanType => classOf[JBoolean].getName
+ case ByteType => classOf[JByte].getName
+ case ShortType => classOf[JShort].getName
+ case IntegerType => classOf[Integer].getName
+ case LongType => classOf[JLong].getName
+ case FloatType => classOf[JFloat].getName
+ case DoubleType => classOf[JDouble].getName
+ case StringType => classOf[String].getName
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+
+ def isSigned(field: StructField): Boolean = field.dataType match {
+ case ByteType | ShortType | IntegerType | LongType | FloatType |
DoubleType => true
+ case NullType | BooleanType | StringType => false
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+
+ def getPrecision(field: StructField): Int = field.dataType match {
+ case NullType => 0
+ case BooleanType => 1
+ case ByteType => 3
+ case ShortType => 5
+ case IntegerType => 10
+ case LongType => 19
+ case FloatType => 7
+ case DoubleType => 15
+ case StringType => 255
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+
+ def getScale(field: StructField): Int = field.dataType match {
+ case FloatType => 7
+ case DoubleType => 15
+ case NullType | BooleanType | ByteType | ShortType | IntegerType |
LongType | StringType => 0
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+
+ def getDisplaySize(field: StructField): Int = field.dataType match {
+ case NullType => 4 // length of `NULL`
+ case BooleanType => 5 // `TRUE` or `FALSE`
+ case ByteType | ShortType | IntegerType | LongType =>
+ getPrecision(field) + 1 // may have leading negative sign
+ case FloatType => 14
+ case DoubleType => 24
+ case StringType =>
+ getPrecision(field)
+ case other =>
+ throw new SQLFeatureNotSupportedException(s"DataType $other is not
supported yet.")
+ }
+}
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
index eb4ce76d2c0a..09ba78629757 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
@@ -17,18 +17,69 @@
package org.apache.spark.sql.connect.client.jdbc
-import java.sql.DriverManager
+import java.sql.{Array => _, _}
+import java.util.Properties
-import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+import org.apache.spark.util.VersionUtils
-class SparkConnectDriverSuite extends AnyFunSuite { // scalastyle:ignore
funsuite
+class SparkConnectDriverSuite extends ConnectFunSuite with RemoteSparkSession
+ with JdbcHelper {
- // explicitly load the class to make it known to the DriverManager
- classOf[SparkConnectDriver].getClassLoader
+ def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
- val jdbcUrl: String = s"jdbc:sc://localhost:15002"
-
- test("test SparkConnectDriver") {
+ test("get Connection from SparkConnectDriver") {
assert(DriverManager.getDriver(jdbcUrl).isInstanceOf[SparkConnectDriver])
+
+ val cause = intercept[SQLException] {
+ new SparkConnectDriver().connect(null, new Properties())
+ }
+ assert(cause.getMessage === "url must not be null")
+
+ withConnection { conn =>
+ assert(conn.isInstanceOf[SparkConnectConnection])
+ }
+ }
+
+ test("get DatabaseMetaData from SparkConnectConnection") {
+ withConnection { conn =>
+ val spark = conn.asInstanceOf[SparkConnectConnection].spark
+ val metadata = conn.getMetaData
+ assert(metadata.getURL === jdbcUrl)
+ assert(metadata.isReadOnly === false)
+ assert(metadata.getUserName === spark.client.configuration.userName)
+ assert(metadata.getDatabaseProductName === "Apache Spark Connect Server")
+ assert(metadata.getDatabaseProductVersion === spark.version)
+ assert(metadata.getDriverVersion === SPARK_VERSION)
+ assert(metadata.getDriverMajorVersion ===
VersionUtils.majorVersion(SPARK_VERSION))
+ assert(metadata.getDriverMinorVersion ===
VersionUtils.minorVersion(SPARK_VERSION))
+ assert(metadata.getIdentifierQuoteString === "`")
+ assert(metadata.getExtraNameCharacters === "")
+ assert(metadata.supportsGroupBy === true)
+ assert(metadata.supportsOuterJoins === true)
+ assert(metadata.supportsFullOuterJoins === true)
+ assert(metadata.supportsLimitedOuterJoins === true)
+ assert(metadata.getSchemaTerm === "schema")
+ assert(metadata.getProcedureTerm === "procedure")
+ assert(metadata.getCatalogTerm === "catalog")
+ assert(metadata.isCatalogAtStart === true)
+ assert(metadata.getCatalogSeparator === ".")
+ assert(metadata.getConnection === conn)
+
assert(metadata.supportsResultSetHoldability(ResultSet.HOLD_CURSORS_OVER_COMMIT)
=== false)
+
assert(metadata.supportsResultSetHoldability(ResultSet.CLOSE_CURSORS_AT_COMMIT)
=== true)
+ assert(metadata.getResultSetHoldability ===
ResultSet.CLOSE_CURSORS_AT_COMMIT)
+ assert(metadata.getDatabaseMajorVersion ===
VersionUtils.majorVersion(spark.version))
+ assert(metadata.getDatabaseMinorVersion ===
VersionUtils.minorVersion(spark.version))
+ assert(metadata.getJDBCMajorVersion === 4)
+ assert(metadata.getJDBCMinorVersion === 3)
+ assert(metadata.supportsStatementPooling === false)
+ assert(metadata.getRowIdLifetime === RowIdLifetime.ROWID_UNSUPPORTED)
+ assert(metadata.generatedKeyAlwaysReturned === false)
+ assert(metadata.getMaxLogicalLobSize === 0)
+ assert(metadata.supportsRefCursors === false)
+ assert(metadata.supportsSharding === false)
+ }
}
}
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectJdbcDataTypeSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectJdbcDataTypeSuite.scala
new file mode 100644
index 000000000000..619b279310eb
--- /dev/null
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectJdbcDataTypeSuite.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.client.jdbc
+
+import java.sql.Types
+
+import org.apache.spark.sql.connect.client.jdbc.test.JdbcHelper
+import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
+
+class SparkConnectJdbcDataTypeSuite extends ConnectFunSuite with
RemoteSparkSession
+ with JdbcHelper {
+
+ override def jdbcUrl: String = s"jdbc:sc://localhost:$serverPort"
+
+ test("get null type") {
+ withExecuteQuery("SELECT null") { rs =>
+ assert(rs.next())
+ assert(rs.getString(1) === null)
+ assert(rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "NULL")
+ assert(metaData.getColumnLabel(1) === "NULL")
+ assert(metaData.getColumnType(1) === Types.NULL)
+ assert(metaData.getColumnTypeName(1) === "VOID")
+ assert(metaData.getColumnClassName(1) === "null")
+ assert(metaData.isSigned(1) === false)
+ assert(metaData.getPrecision(1) === 0)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 4)
+ }
+ }
+
+ test("get boolean type") {
+ withExecuteQuery("SELECT true") { rs =>
+ assert(rs.next())
+ assert(rs.getBoolean(1) === true)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "true")
+ assert(metaData.getColumnLabel(1) === "true")
+ assert(metaData.getColumnType(1) === Types.BOOLEAN)
+ assert(metaData.getColumnTypeName(1) === "BOOLEAN")
+ assert(metaData.getColumnClassName(1) === "java.lang.Boolean")
+ assert(metaData.isSigned(1) === false)
+ assert(metaData.getPrecision(1) === 1)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 5)
+ }
+ }
+
+ test("get byte type") {
+ withExecuteQuery("SELECT cast(1 as byte)") { rs =>
+ assert(rs.next())
+ assert(rs.getByte(1) === 1.toByte)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "CAST(1 AS TINYINT)")
+ assert(metaData.getColumnLabel(1) === "CAST(1 AS TINYINT)")
+ assert(metaData.getColumnType(1) === Types.TINYINT)
+ assert(metaData.getColumnTypeName(1) === "TINYINT")
+ assert(metaData.getColumnClassName(1) === "java.lang.Byte")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 3)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 4)
+ }
+ }
+
+ test("get short type") {
+ withExecuteQuery("SELECT cast(1 as short)") { rs =>
+ assert(rs.next())
+ assert(rs.getShort(1) === 1.toShort)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "CAST(1 AS SMALLINT)")
+ assert(metaData.getColumnLabel(1) === "CAST(1 AS SMALLINT)")
+ assert(metaData.getColumnType(1) === Types.SMALLINT)
+ assert(metaData.getColumnTypeName(1) === "SMALLINT")
+ assert(metaData.getColumnClassName(1) === "java.lang.Short")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 5)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 6)
+ }
+ }
+
+ test("get int type") {
+ withExecuteQuery("SELECT 1") { rs =>
+ assert(rs.next())
+ assert(rs.getInt(1) === 1)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "1")
+ assert(metaData.getColumnLabel(1) === "1")
+ assert(metaData.getColumnType(1) === Types.INTEGER)
+ assert(metaData.getColumnTypeName(1) === "INT")
+ assert(metaData.getColumnClassName(1) === "java.lang.Integer")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 10)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 11)
+ }
+ }
+
+ test("get bigint type") {
+ withExecuteQuery("SELECT cast(1 as bigint)") { rs =>
+ assert(rs.next())
+ assert(rs.getLong(1) === 1L)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "CAST(1 AS BIGINT)")
+ assert(metaData.getColumnLabel(1) === "CAST(1 AS BIGINT)")
+ assert(metaData.getColumnType(1) === Types.BIGINT)
+ assert(metaData.getColumnTypeName(1) === "BIGINT")
+ assert(metaData.getColumnClassName(1) === "java.lang.Long")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 19)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 20)
+ }
+ }
+
+ test("get float type") {
+ withExecuteQuery("SELECT cast(1.2 as float)") { rs =>
+ assert(rs.next())
+ assert(rs.getFloat(1) === 1.2F)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "CAST(1.2 AS FLOAT)")
+ assert(metaData.getColumnLabel(1) === "CAST(1.2 AS FLOAT)")
+ assert(metaData.getColumnType(1) === Types.FLOAT)
+ assert(metaData.getColumnTypeName(1) === "FLOAT")
+ assert(metaData.getColumnClassName(1) === "java.lang.Float")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 7)
+ assert(metaData.getScale(1) === 7)
+ assert(metaData.getColumnDisplaySize(1) === 14)
+ }
+ }
+
+ test("get double type") {
+ withExecuteQuery("SELECT cast(1.2 as double)") { rs =>
+ assert(rs.next())
+ assert(rs.getDouble(1) === 1.2D)
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "CAST(1.2 AS DOUBLE)")
+ assert(metaData.getColumnLabel(1) === "CAST(1.2 AS DOUBLE)")
+ assert(metaData.getColumnType(1) === Types.DOUBLE)
+ assert(metaData.getColumnTypeName(1) === "DOUBLE")
+ assert(metaData.getColumnClassName(1) === "java.lang.Double")
+ assert(metaData.isSigned(1) === true)
+ assert(metaData.getPrecision(1) === 15)
+ assert(metaData.getScale(1) === 15)
+ assert(metaData.getColumnDisplaySize(1) === 24)
+ }
+ }
+
+ test("get string type") {
+ withExecuteQuery("SELECT 'str'") { rs =>
+ assert(rs.next())
+ assert(rs.getString(1) === "str")
+ assert(!rs.wasNull)
+ assert(!rs.next())
+
+ val metaData = rs.getMetaData
+ assert(metaData.getColumnCount === 1)
+ assert(metaData.getColumnName(1) === "str")
+ assert(metaData.getColumnLabel(1) === "str")
+ assert(metaData.getColumnType(1) === Types.VARCHAR)
+ assert(metaData.getColumnTypeName(1) === "STRING")
+ assert(metaData.getColumnClassName(1) === "java.lang.String")
+ assert(metaData.isSigned(1) === false)
+ assert(metaData.getPrecision(1) === 255)
+ assert(metaData.getScale(1) === 0)
+ assert(metaData.getColumnDisplaySize(1) === 255)
+ }
+ }
+}
diff --git
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/test/JdbcHelper.scala
similarity index 57%
copy from
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
copy to
sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/test/JdbcHelper.scala
index eb4ce76d2c0a..9b3aa373e93c 100644
---
a/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/SparkConnectDriverSuite.scala
+++
b/sql/connect/client/jdbc/src/test/scala/org/apache/spark/sql/connect/client/jdbc/test/JdbcHelper.scala
@@ -15,20 +15,32 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connect.client.jdbc
+package org.apache.spark.sql.connect.client.jdbc.test
-import java.sql.DriverManager
+import java.sql.{Connection, DriverManager, ResultSet, Statement}
-import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
+import scala.util.Using
-class SparkConnectDriverSuite extends AnyFunSuite { // scalastyle:ignore
funsuite
+import org.apache.spark.sql.connect.client.jdbc.SparkConnectDriver
+
+trait JdbcHelper {
+
+ def jdbcUrl: String
// explicitly load the class to make it known to the DriverManager
classOf[SparkConnectDriver].getClassLoader
- val jdbcUrl: String = s"jdbc:sc://localhost:15002"
+ def withConnection[T](f: Connection => T): T = {
+ Using.resource(DriverManager.getConnection(jdbcUrl)) { conn => f(conn) }
+ }
+
+ def withStatement[T](f: Statement => T): T = withConnection { conn =>
+ Using.resource(conn.createStatement()) { stmt => f(stmt) }
+ }
- test("test SparkConnectDriver") {
- assert(DriverManager.getDriver(jdbcUrl).isInstanceOf[SparkConnectDriver])
+ def withExecuteQuery(query: String)(f: ResultSet => Unit): Unit = {
+ withStatement { stmt =>
+ Using.resource { stmt.executeQuery(query) } { rs => f(rs) }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]