This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6278becfbed [SPARK-39759][SQL] Implement listIndexes in JDBC (H2
dialect)
6278becfbed is described below
commit 6278becfbed412bad3d00f2b7989fd19a3a0ff07
Author: panbingkun <[email protected]>
AuthorDate: Mon Jul 18 23:34:28 2022 -0700
[SPARK-39759][SQL] Implement listIndexes in JDBC (H2 dialect)
### What changes were proposed in this pull request?
Implementing listIndexes in DS V2 JDBC for H2 dialect.
### Why are the changes needed?
This is a subtask of the V2 Index
support(https://issues.apache.org/jira/browse/SPARK-36525).
**It can better test the index interface locally.**
> This PR implements listIndexes in H2 dialect.
### Does this PR introduce _any_ user-facing change?
Yes, listIndexes in DS V2 JDBC for H2
### How was this patch tested?
Update existed UT.
Closes #37172 from panbingkun/h2dialect_listindex_dev.
Authored-by: panbingkun <[email protected]>
Signed-off-by: huaxingao <[email protected]>
---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 4 +-
.../execution/datasources/v2/jdbc/JDBCTable.scala | 2 +-
.../org/apache/spark/sql/jdbc/H2Dialect.scala | 66 +++++++++++++++++++++-
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 2 +-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 4 +-
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 9 +++
6 files changed, 78 insertions(+), 9 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 4401ee4564e..60ecd2ff60b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -1072,10 +1072,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
*/
def listIndexes(
conn: Connection,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
val dialect = JdbcDialects.get(options.url)
- dialect.listIndexes(conn, tableName, options)
+ dialect.listIndexes(conn, tableIdent, options)
}
private def executeStatement(conn: Connection, options: JDBCOptions, sql:
String): Unit = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index be8e1c68b7c..0a184116a0f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -83,7 +83,7 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
override def listIndexes(): Array[TableIndex] = {
JdbcUtils.withConnection(jdbcOptions) { conn =>
- JdbcUtils.listIndexes(conn, name, jdbcOptions)
+ JdbcUtils.listIndexes(conn, ident, jdbcOptions)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index d41929225a8..4200ba91fb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -25,12 +25,14 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
+import org.apache.commons.lang3.StringUtils
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException,
NoSuchIndexException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
-import org.apache.spark.sql.connector.expressions.Expression
-import org.apache.spark.sql.connector.expressions.NamedReference
+import org.apache.spark.sql.connector.catalog.index.TableIndex
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference,
NamedReference}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, ByteType, DataType,
DecimalType, ShortType, StringType}
@@ -110,6 +112,64 @@ private[sql] object H2Dialect extends JdbcDialect {
JdbcUtils.checkIfIndexExists(conn, sql, options)
}
+ // See
+ //
https://www.h2database.com/html/systemtables.html?#information_schema_indexes
+ //
https://www.h2database.com/html/systemtables.html?#information_schema_index_columns
+ override def listIndexes(
+ conn: Connection,
+ tableIdent: Identifier,
+ options: JDBCOptions): Array[TableIndex] = {
+ val sql = {
+ s"""
+ | SELECT
+ | i.INDEX_CATALOG AS INDEX_CATALOG,
+ | i.INDEX_SCHEMA AS INDEX_SCHEMA,
+ | i.INDEX_NAME AS INDEX_NAME,
+ | i.INDEX_TYPE_NAME AS INDEX_TYPE_NAME,
+ | i.REMARKS as REMARKS,
+ | ic.COLUMN_NAME AS COLUMN_NAME
+ | FROM INFORMATION_SCHEMA.INDEXES i, INFORMATION_SCHEMA.INDEX_COLUMNS
ic
+ | WHERE i.TABLE_CATALOG = ic.TABLE_CATALOG
+ | AND i.TABLE_SCHEMA = ic.TABLE_SCHEMA
+ | AND i.TABLE_NAME = ic.TABLE_NAME
+ | AND i.INDEX_CATALOG = ic.INDEX_CATALOG
+ | AND i.INDEX_SCHEMA = ic.INDEX_SCHEMA
+ | AND i.INDEX_NAME = ic.INDEX_NAME
+ | AND i.TABLE_NAME = '${tableIdent.name()}'
+ | AND i.INDEX_SCHEMA = '${tableIdent.namespace().last}'
+ |""".stripMargin
+ }
+ var indexMap: Map[String, TableIndex] = Map()
+ try {
+ JdbcUtils.executeQuery(conn, options, sql) { rs =>
+ while (rs.next()) {
+ val indexName = rs.getString("INDEX_NAME")
+ val colName = rs.getString("COLUMN_NAME")
+ val indexType = rs.getString("INDEX_TYPE_NAME")
+ val indexComment = rs.getString("REMARKS")
+ if (indexMap.contains(indexName)) {
+ val index = indexMap(indexName)
+ val newIndex = new TableIndex(indexName, indexType,
+ index.columns() :+ FieldReference(colName),
+ index.columnProperties, index.properties)
+ indexMap += (indexName -> newIndex)
+ } else {
+ val properties = new util.Properties()
+ if (StringUtils.isNotEmpty(indexComment))
properties.put("COMMENT", indexComment)
+ val index = new TableIndex(indexName, indexType,
Array(FieldReference(colName)),
+ new util.HashMap[NamedReference, util.Properties](), properties)
+ indexMap += (indexName -> index)
+ }
+ }
+ }
+ } catch {
+ case _: Exception =>
+ logWarning("Cannot retrieved index info.")
+ }
+
+ indexMap.values.toArray
+ }
+
private def tableNameWithSchema(ident: Identifier): String = {
(ident.namespace() :+ ident.name()).map(quoteIdentifier).mkString(".")
}
@@ -161,7 +221,7 @@ private[sql] object H2Dialect extends JdbcDialect {
funcName: String, isDistinct: Boolean, inputs: Array[String]): String =
if (isDistinct &&
distinctUnsupportedAggregateFunctions.contains(funcName)) {
throw new
UnsupportedOperationException(s"${this.getClass.getSimpleName} does not " +
- s"support aggregate function: $funcName with DISTINCT");
+ s"support aggregate function: $funcName with DISTINCT")
} else {
super.visitAggregateFunction(funcName, isDistinct, inputs)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index 491e0231a23..230276e7100 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -522,7 +522,7 @@ abstract class JdbcDialect extends Serializable with
Logging {
*/
def listIndexes(
conn: Connection,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
throw new UnsupportedOperationException("listIndexes is not supported")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index 7dc76eed49f..d88f3566eaa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -210,9 +210,9 @@ private case object MySQLDialect extends JdbcDialect with
SQLConfHelper {
// https://dev.mysql.com/doc/refman/8.0/en/show-index.html
override def listIndexes(
conn: Connection,
- tableName: String,
+ tableIdent: Identifier,
options: JDBCOptions): Array[TableIndex] = {
- val sql = s"SHOW INDEXES FROM $tableName"
+ val sql = s"SHOW INDEXES FROM ${tableIdent.name()}"
var indexMap: Map[String, TableIndex] = Map()
try {
JdbcUtils.executeQuery(conn, options, sql) { rs =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index ddcf28652e9..ac1c59ae01e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -2259,11 +2259,20 @@ class JDBCV2Suite extends QueryTest with
SharedSparkSession with ExplainSuiteHel
.asInstanceOf[SupportsIndex]
assert(jdbcTable != null)
assert(jdbcTable.indexExists("people_index") == false)
+ val indexes1 = jdbcTable.listIndexes()
+ assert(indexes1.isEmpty)
sql(s"CREATE INDEX people_index ON TABLE h2.test.people (id)")
assert(jdbcTable.indexExists("people_index"))
+ val indexes2 = jdbcTable.listIndexes()
+ assert(!indexes2.isEmpty)
+ assert(indexes2.size == 1)
+ val tableIndex = indexes2.head
+ assert(tableIndex.indexName() == "people_index")
sql(s"DROP INDEX people_index ON TABLE h2.test.people")
assert(jdbcTable.indexExists("people_index") == false)
+ val indexes3 = jdbcTable.listIndexes()
+ assert(indexes3.isEmpty)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]