This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ffc378c1544b [SPARK-47813][SQL] Replace getArrayDimension with
updateExtraColumnMeta
ffc378c1544b is described below
commit ffc378c1544b558c108737cc89fb9c5764ef6e6e
Author: Kent Yao <[email protected]>
AuthorDate: Fri Apr 12 09:51:41 2024 +0800
[SPARK-47813][SQL] Replace getArrayDimension with updateExtraColumnMeta
### What changes were proposed in this pull request?
[SPARK-47754](https://issues.apache.org/jira/browse/SPARK-47754) introduced
a new developer API called `getArrayDimension`.
This PR expands the scope of `getArrayDimension` and renames it to
`updateExtraColumnMeta`. Just as their names said, `getArrayDimension` handles
only one column of metadata that is the dimension of an array, while
`updateExtraColumnMeta` can retrieve any type of metadata based on the given
ResultSetMetadata and Connection. This is much more general and useful and
reduces the number of potentially new Developer APIs in the same shape. Also
the current parameters for
getArrayDimension might not be enough for other dialects
### Why are the changes needed?
Refactoring unreleased Developer APIs to make it more sustainable
### Does this PR introduce _any_ user-facing change?
no, unreleased API change
### How was this patch tested?
existing ut
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #46006 from yaooqinn/SPARK-47813.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +---
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 21 +++++-----
.../apache/spark/sql/jdbc/PostgresDialect.scala | 46 +++++++++++++---------
3 files changed, 39 insertions(+), 35 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 9d27a71b005b..53b0b8b5d29d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -307,16 +307,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
metadata.putBoolean("logical_time_type", true)
case java.sql.Types.ROWID =>
metadata.putBoolean("rowid", true)
- case java.sql.Types.ARRAY =>
- val tableName = rsmd.getTableName(i + 1)
- dialect.getArrayDimension(conn, tableName, columnName).foreach {
dimension =>
- metadata.putLong("arrayDimension", dimension)
- }
case _ =>
}
metadata.putBoolean("isSigned", isSigned)
metadata.putBoolean("isTimestampNTZ", isTimestampNTZ)
metadata.putLong("scale", fieldScale)
+ dialect.updateExtraColumnMeta(conn, rsmd, i + 1, metadata)
+
val columnType =
dialect.getCatalystType(dataType, typeName, fieldSize,
metadata).getOrElse(
getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned,
isTimestampNTZ))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 5e98bcbce489..5f69d18cad75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.{Connection, Date, Driver, Statement, Timestamp}
+import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement,
Timestamp}
import java.time.{Instant, LocalDate, LocalDateTime}
import java.util
import java.util.ServiceLoader
@@ -826,20 +826,19 @@ abstract class JdbcDialect extends Serializable with
Logging {
}
/**
- * Return the array dimension of the column. The array dimension will be
carried in the
- * metadata of the column and used by `getCatalystType` to determine the
dimension of the
- * ArrayType.
+ * Get extra column metadata for the given column.
*
* @param conn The connection currently connection being used.
- * @param tableName The name of the table which the column belongs to.
- * @param columnName The name of the column.
- * @return An Option[Int] which contains the number of array dimension.
- * If Some(n), the column is an array with n dimensions.
- * If the method is un-implemented, or some error encountered,
return None.
- * Then, `getCatalystType` will try use 1 dimension as default for
arrays.
+ * @param rsmd The metadata of the current result set.
+ * @param columnIdx The index of the column.
+ * @param metadata The metadata builder to store the extra column
information.
*/
@Since("4.0.0")
- def getArrayDimension(conn: Connection, tableName: String, columnName:
String): Option[Int] = None
+ def updateExtraColumnMeta(
+ conn: Connection,
+ rsmd: ResultSetMetaData,
+ columnIdx: Int,
+ metadata: MetadataBuilder): Unit = {}
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index b9c39b467e8f..c2c430a7b39d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.jdbc
-import java.sql.{Connection, Date, SQLException, Timestamp, Types}
+import java.sql.{Connection, Date, ResultSetMetaData, SQLException, Timestamp,
Types}
import java.time.{LocalDateTime, ZoneOffset}
import java.util
import java.util.Locale
@@ -343,26 +343,34 @@ private case class PostgresDialect() extends JdbcDialect
with SQLConfHelper {
}
}
- override def getArrayDimension(
+ override def updateExtraColumnMeta(
conn: Connection,
- tableName: String,
- columnName: String): Option[Int] = {
- val query =
- s"""
- |SELECT pg_attribute.attndims
- |FROM pg_attribute
- | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid
- | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
- |WHERE pg_class.relname = '$tableName' and pg_attribute.attname =
'$columnName'
- |""".stripMargin
- try {
- Using.resource(conn.createStatement()) { stmt =>
- Using.resource(stmt.executeQuery(query)) { rs =>
- if (rs.next()) Some(rs.getInt(1)) else None
+ rsmd: ResultSetMetaData,
+ columnIdx: Int,
+ metadata: MetadataBuilder): Unit = {
+ rsmd.getColumnType(columnIdx) match {
+ case Types.ARRAY =>
+ val tableName = rsmd.getTableName(columnIdx)
+ val columnName = rsmd.getColumnName(columnIdx)
+ val query =
+ s"""
+ |SELECT pg_attribute.attndims
+ |FROM pg_attribute
+ | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid
+ | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
+ |WHERE pg_class.relname = '$tableName' and pg_attribute.attname =
'$columnName'
+ |""".stripMargin
+ try {
+ Using.resource(conn.createStatement()) { stmt =>
+ Using.resource(stmt.executeQuery(query)) { rs =>
+ if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1))
+ }
+ }
+ } catch {
+ case e: SQLException =>
+ logWarning(s"Failed to get array dimension for column
$columnName", e)
}
- }
- } catch {
- case _: SQLException => None
+ case _ =>
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]