This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ffc378c1544b [SPARK-47813][SQL] Replace getArrayDimension with 
updateExtraColumnMeta
ffc378c1544b is described below

commit ffc378c1544b558c108737cc89fb9c5764ef6e6e
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri Apr 12 09:51:41 2024 +0800

    [SPARK-47813][SQL] Replace getArrayDimension with updateExtraColumnMeta
    
    ### What changes were proposed in this pull request?
    
    [SPARK-47754](https://issues.apache.org/jira/browse/SPARK-47754) introduced 
a new developer API called `getArrayDimension`.
    
    This PR expands the scope of `getArrayDimension` and renames it to 
`updateExtraColumnMeta`. Just as their names said, `getArrayDimension` handles 
only one column of metadata that is the dimension of an array, while 
`updateExtraColumnMeta` can retrieve any type of metadata based on the given 
ResultSetMetadata and Connection. This is much more general and useful and 
reduces the number of potentially new Developer APIs in the same shape. Also 
the current parameters for
    getArrayDimension might not be enough for other dialects
    
    ### Why are the changes needed?
    
    Refactoring unreleased Developer APIs to make it more sustainable
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, unreleased API change
    
    ### How was this patch tested?
    
    existing ut
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #46006 from yaooqinn/SPARK-47813.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala |  7 +---
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   | 21 +++++-----
 .../apache/spark/sql/jdbc/PostgresDialect.scala    | 46 +++++++++++++---------
 3 files changed, 39 insertions(+), 35 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 9d27a71b005b..53b0b8b5d29d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -307,16 +307,13 @@ object JdbcUtils extends Logging with SQLConfHelper {
           metadata.putBoolean("logical_time_type", true)
         case java.sql.Types.ROWID =>
           metadata.putBoolean("rowid", true)
-        case java.sql.Types.ARRAY =>
-          val tableName = rsmd.getTableName(i + 1)
-          dialect.getArrayDimension(conn, tableName, columnName).foreach { 
dimension =>
-            metadata.putLong("arrayDimension", dimension)
-          }
         case _ =>
       }
       metadata.putBoolean("isSigned", isSigned)
       metadata.putBoolean("isTimestampNTZ", isTimestampNTZ)
       metadata.putLong("scale", fieldScale)
+      dialect.updateExtraColumnMeta(conn, rsmd, i + 1, metadata)
+
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
           getCatalystType(dataType, typeName, fieldSize, fieldScale, isSigned, 
isTimestampNTZ))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 5e98bcbce489..5f69d18cad75 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.{Connection, Date, Driver, Statement, Timestamp}
+import java.sql.{Connection, Date, Driver, ResultSetMetaData, Statement, 
Timestamp}
 import java.time.{Instant, LocalDate, LocalDateTime}
 import java.util
 import java.util.ServiceLoader
@@ -826,20 +826,19 @@ abstract class JdbcDialect extends Serializable with 
Logging {
   }
 
   /**
-   * Return the array dimension of the column. The array dimension will be 
carried in the
-   * metadata of the column and used by `getCatalystType` to determine the 
dimension of the
-   * ArrayType.
+   * Get extra column metadata for the given column.
    *
    * @param conn The connection currently connection being used.
-   * @param tableName The name of the table which the column belongs to.
-   * @param columnName The name of the column.
-   * @return An Option[Int] which contains the number of array dimension.
-   *         If Some(n), the column is an array with n dimensions.
-   *         If the method is un-implemented, or some error encountered, 
return None.
-   *         Then, `getCatalystType` will try use 1 dimension as default for 
arrays.
+   * @param rsmd The metadata of the current result set.
+   * @param columnIdx The index of the column.
+   * @param metadata The metadata builder to store the extra column 
information.
    */
   @Since("4.0.0")
-  def getArrayDimension(conn: Connection, tableName: String, columnName: 
String): Option[Int] = None
+  def updateExtraColumnMeta(
+      conn: Connection,
+      rsmd: ResultSetMetaData,
+      columnIdx: Int,
+      metadata: MetadataBuilder): Unit = {}
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index b9c39b467e8f..c2c430a7b39d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.{Connection, Date, SQLException, Timestamp, Types}
+import java.sql.{Connection, Date, ResultSetMetaData, SQLException, Timestamp, 
Types}
 import java.time.{LocalDateTime, ZoneOffset}
 import java.util
 import java.util.Locale
@@ -343,26 +343,34 @@ private case class PostgresDialect() extends JdbcDialect 
with SQLConfHelper {
     }
   }
 
-  override def getArrayDimension(
+  override def updateExtraColumnMeta(
       conn: Connection,
-      tableName: String,
-      columnName: String): Option[Int] = {
-    val query =
-      s"""
-         |SELECT pg_attribute.attndims
-         |FROM pg_attribute
-         |  JOIN pg_class ON pg_attribute.attrelid = pg_class.oid
-         |  JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
-         |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = 
'$columnName'
-         |""".stripMargin
-    try {
-      Using.resource(conn.createStatement()) { stmt =>
-        Using.resource(stmt.executeQuery(query)) { rs =>
-          if (rs.next()) Some(rs.getInt(1)) else None
+      rsmd: ResultSetMetaData,
+      columnIdx: Int,
+      metadata: MetadataBuilder): Unit = {
+    rsmd.getColumnType(columnIdx) match {
+      case Types.ARRAY =>
+        val tableName = rsmd.getTableName(columnIdx)
+        val columnName = rsmd.getColumnName(columnIdx)
+        val query =
+          s"""
+             |SELECT pg_attribute.attndims
+             |FROM pg_attribute
+             |  JOIN pg_class ON pg_attribute.attrelid = pg_class.oid
+             |  JOIN pg_namespace ON pg_class.relnamespace = pg_namespace.oid
+             |WHERE pg_class.relname = '$tableName' and pg_attribute.attname = 
'$columnName'
+             |""".stripMargin
+        try {
+          Using.resource(conn.createStatement()) { stmt =>
+            Using.resource(stmt.executeQuery(query)) { rs =>
+              if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1))
+            }
+          }
+        } catch {
+          case e: SQLException =>
+            logWarning(s"Failed to get array dimension for column 
$columnName", e)
         }
-      }
-    } catch {
-      case _: SQLException => None
+      case _ =>
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to