This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ffc378c1544b [SPARK-47813][SQL] Replace getArrayDimension with updateExtraColumnMeta ffc378c1544b is described below commit ffc378c1544b558c108737cc89fb9c5764ef6e6e Author: Kent Yao <y...@apache.org> AuthorDate: Fri Apr 12 09:51:41 2024 +0800 [SPARK-47813][SQL] Replace getArrayDimension with updateExtraColumnMeta ### What changes were proposed in this pull request? [SPARK-47754](https://issues.apache.org/jira/browse/SPARK-47754) introduced a new developer API called `getArrayDimension`. This PR expands the scope of `getArrayDimension` and renames it to `updateExtraColumnMeta`. Just as their names said, `getArrayDimension` handles only one column of metadata that is the dimension of an array, while `updateExtraColumnMeta` can retrieve any type of metadata based on the given ResultSetMetadata and Connection. This is much more general and useful and reduces the number of potentially new Developer APIs in the same shape. Also the current parameters for getArrayDimension might not be enough for other dialects ### Why are the changes needed? Refactoring unreleased Developer APIs to make it more sustainable ### Does this PR introduce _any_ user-facing change? no, unreleased API change ### How was this patch tested? existing ut ### Was this patch authored or co-authored using generative AI tooling? no Closes #46006 from yaooqinn/SPARK-47813. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> --- .../sql/execution/datasources/jdbc/JdbcUtils.scala | 7 +--- .../org/apache/spark/sql/jdbc/JdbcDialects.scala | 21 +++++----- .../apache/spark/sql/jdbc/PostgresDialect.scala | 46 +++++++++++++--------- 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala index 9d27a71b005b..53b0b8b5d29d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala @@ -307,16 +307,13 @@ object JdbcUtils extends Logging with SQLConfHelper { metadata.putBoolean("logical_time_type", true) case java.sql.Types.ROWID => metadata.putBoolean("rowid", true) - case java.sql.Types.ARRAY => - val tableName = rsmd.getTableName(i + 1) - dialect.getArrayDimension(conn, tableName, columnName).foreach { dimension => - metadata.putLong("arrayDimension", dimension) - } case _ => } metadata.putBoolean("isSigned", isSigned) metadata.putBoolean("isTimestampNTZ", isTimestampNTZ) metadata.putLong("scale", fieldScale) + dialect.updateExtraColumnMeta(conn, rsmd, i + 1, metadata) + val columnType = dialect.getCatalystType(dataType, typeName, fieldSize, metadata).getOrElse( getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, isTimestampNTZ)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala index 5e98bcbce489..5f69d18cad75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Date, Driver, Statement, Timestamp} +import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement, Timestamp} import java.time.{Instant, LocalDate, LocalDateTime} import java.util import java.util.ServiceLoader @@ -826,20 +826,19 @@ abstract class JdbcDialect extends Serializable with Logging { } /** - * Return the array dimension of the column. The array dimension will be carried in the - * metadata of the column and used by `getCatalystType` to determine the dimension of the - * ArrayType. + * Get extra column metadata for the given column. * * @param conn The connection currently connection being used. - * @param tableName The name of the table which the column belongs to. - * @param columnName The name of the column. - * @return An Option[Int] which contains the number of array dimension. - * If Some(n), the column is an array with n dimensions. - * If the method is un-implemented, or some error encountered, return None. - * Then, `getCatalystType` will try use 1 dimension as default for arrays. + * @param rsmd The metadata of the current result set. + * @param columnIdx The index of the column. + * @param metadata The metadata builder to store the extra column information. */ @Since("4.0.0") - def getArrayDimension(conn: Connection, tableName: String, columnName: String): Option[Int] = None + def updateExtraColumnMeta( + conn: Connection, + rsmd: ResultSetMetaData, + columnIdx: Int, + metadata: MetadataBuilder): Unit = {} } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index b9c39b467e8f..c2c430a7b39d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.jdbc -import java.sql.{Connection, Date, SQLException, Timestamp, Types} +import java.sql.{Connection, Date, ResultSetMetaData, SQLException, Timestamp, Types} import java.time.{LocalDateTime, ZoneOffset} import java.util import java.util.Locale @@ -343,26 +343,34 @@ private case class PostgresDialect() extends JdbcDialect with SQLConfHelper { } } - override def getArrayDimension( + override def updateExtraColumnMeta( conn: Connection, - tableName: String, - columnName: String): Option[Int] = { - val query = - s""" - |SELECT pg_attribute.attndims - |FROM pg_attribute - | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid - | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid - |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' - |""".stripMargin - try { - Using.resource(conn.createStatement()) { stmt => - Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) Some(rs.getInt(1)) else None + rsmd: ResultSetMetaData, + columnIdx: Int, + metadata: MetadataBuilder): Unit = { + rsmd.getColumnType(columnIdx) match { + case Types.ARRAY => + val tableName = rsmd.getTableName(columnIdx) + val columnName = rsmd.getColumnName(columnIdx) + val query = + s""" + |SELECT pg_attribute.attndims + |FROM pg_attribute + | JOIN pg_class ON pg_attribute.attrelid = pg_class.oid + | JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid + |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = '$columnName' + |""".stripMargin + try { + Using.resource(conn.createStatement()) { stmt => + Using.resource(stmt.executeQuery(query)) { rs => + if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1)) + } + } + } catch { + case e: SQLException => + logWarning(s"Failed to get array dimension for column $columnName", e) } - } - } catch { - case _: SQLException => None + case _ => } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org