This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 07ecbc4 [SPARK-36913][SQL] Implement createIndex and IndexExists in
DS V2 JDBC (MySQL dialect)
07ecbc4 is described below
commit 07ecbc4049aa7f8daa11e6a924c37c1db2f53c73
Author: Huaxin Gao <[email protected]>
AuthorDate: Fri Oct 8 11:30:12 2021 -0700
[SPARK-36913][SQL] Implement createIndex and IndexExists in DS V2 JDBC
(MySQL dialect)
### What changes were proposed in this pull request?
Implementing `createIndex`/`IndexExists` in DS V2 JDBC
### Why are the changes needed?
This is a subtask of the V2 Index support. I am implementing index support
for DS V2 JDBC so we can have a POC and an end to end testing. This PR
implements `createIndex` and `IndexExists`. Next PR will implement
`listIndexes` and `dropIndex`. I intentionally make the PR small so it's easier
to review.
Index is not supported by h2 database and create/drop index are not
standard SQL syntax. This PR only implements `createIndex` and `IndexExists` in
`MySQL` dialect.
### Does this PR introduce _any_ user-facing change?
Yes, `createIndex`/`IndexExist` in DS V2 JDBC
### How was this patch tested?
new test
Closes #34164 from huaxingao/createIndexJDBC.
Authored-by: Huaxin Gao <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
---
.../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala | 33 ++++++++++
.../org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala | 9 +++
.../sql/connector/catalog/index/SupportsIndex.java | 4 +-
.../catalyst/analysis/AlreadyExistException.scala | 4 +-
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 58 +++++++++++++++++
.../execution/datasources/v2/jdbc/JDBCTable.scala | 36 ++++++++++-
.../datasources/v2/jdbc/JDBCTableCatalog.scala | 55 ++++++----------
.../org/apache/spark/sql/jdbc/JdbcDialects.scala | 41 +++++++++++-
.../org/apache/spark/sql/jdbc/MySQLDialect.scala | 74 +++++++++++++++++++++-
9 files changed, 268 insertions(+), 46 deletions(-)
diff --git
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
index db626df..3cb8787 100644
---
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
+++
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/MySQLIntegrationSuite.scala
@@ -18,11 +18,16 @@
package org.apache.spark.sql.jdbc.v2
import java.sql.{Connection, SQLFeatureNotSupportedException}
+import java.util
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkConf
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier,
TableCatalog}
+import org.apache.spark.sql.connector.catalog.index.SupportsIndex
+import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
import org.apache.spark.sql.jdbc.{DatabaseOnDocker, DockerJDBCIntegrationSuite}
import org.apache.spark.sql.types._
@@ -115,4 +120,32 @@ class MySQLIntegrationSuite extends
DockerJDBCIntegrationSuite with V2JDBCTest {
val expectedSchema = new StructType().add("ID", IntegerType, true,
defaultMetadata)
assert(t.schema === expectedSchema)
}
+
+ override def testIndex(tbl: String): Unit = {
+ val loaded = Catalogs.load("mysql", conf)
+ val jdbcTable = loaded.asInstanceOf[TableCatalog]
+ .loadTable(Identifier.of(Array.empty[String], "new_table"))
+ .asInstanceOf[SupportsIndex]
+ assert(jdbcTable.indexExists("i1") == false)
+ assert(jdbcTable.indexExists("i2") == false)
+
+ val properties = new util.Properties();
+ properties.put("KEY_BLOCK_SIZE", "10")
+ properties.put("COMMENT", "'this is a comment'")
+ jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+
+ jdbcTable.createIndex("i2", "",
+ Array(FieldReference("col2"), FieldReference("col3"),
FieldReference("col5")),
+ Array.empty[util.Map[NamedReference, util.Properties]], new
util.Properties)
+
+ assert(jdbcTable.indexExists("i1") == true)
+ assert(jdbcTable.indexExists("i2") == true)
+
+ val m = intercept[IndexAlreadyExistsException] {
+ jdbcTable.createIndex("i1", "", Array(FieldReference("col1")),
+ Array.empty[util.Map[NamedReference, util.Properties]], properties)
+ }.getMessage
+ assert(m.contains("Failed to create index: i1 in new_table"))
+ }
}
diff --git
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
index 1afe26a..da57ed7 100644
---
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
+++
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCTest.scala
@@ -180,5 +180,14 @@ private[v2] trait V2JDBCTest extends SharedSparkSession
with DockerIntegrationFu
testCreateTableWithProperty(s"$catalogName.new_table")
}
}
+
+ def testIndex(tbl: String): Unit = {}
+
+ test("SPARK-36913: Test INDEX") {
+ withTable(s"$catalogName.new_table") {
+ sql(s"CREATE TABLE $catalogName.new_table(col1 INT, col2 INT, col3 INT,
col4 INT, col5 INT)")
+ testIndex(s"$catalogName.new_table")
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
index a8d55fb..24961e4 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/index/SupportsIndex.java
@@ -40,14 +40,14 @@ public interface SupportsIndex extends Table {
* @param indexName the name of the index to be created
* @param indexType the IndexType of the index to be created
* @param columns the columns on which index to be created
- * @param columnProperties the properties of the columns on which index to
be created
+ * @param columnsProperties the properties of the columns on which index to
be created
* @param properties the properties of the index to be created
* @throws IndexAlreadyExistsException If the index already exists (optional)
*/
void createIndex(String indexName,
String indexType,
NamedReference[] columns,
- Map<NamedReference, Properties>[] columnProperties,
+ Map<NamedReference, Properties>[] columnsProperties,
Properties properties)
throws IndexAlreadyExistsException;
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
index ce48cfa..fb17725 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala
@@ -79,5 +79,5 @@ class PartitionsAlreadyExistException(message: String)
extends AnalysisException
class FunctionAlreadyExistsException(db: String, func: String)
extends AnalysisException(s"Function '$func' already exists in database
'$db'")
-class IndexAlreadyExistsException(indexName: String, table: Identifier)
- extends AnalysisException(s"Index '$indexName' already exists in table
${table.quoted}")
+class IndexAlreadyExistsException(message: String, cause: Option[Throwable] =
None)
+ extends AnalysisException(message, cause = cause)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index d49f4b0..168d16a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet,
ResultSetMetaData, SQLException}
import java.time.{Instant, LocalDate}
+import java.util
import java.util.Locale
import java.util.concurrent.TimeUnit
@@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils,
GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
+import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
@@ -1009,6 +1011,35 @@ object JdbcUtils extends Logging with SQLConfHelper {
executeStatement(conn, options, s"DROP SCHEMA
${dialect.quoteIdentifier(namespace)}")
}
+ /**
+ * Create an index.
+ */
+ def createIndex(
+ conn: Connection,
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties,
+ options: JDBCOptions): Unit = {
+ val dialect = JdbcDialects.get(options.url)
+ executeStatement(conn, options,
+ dialect.createIndex(indexName, indexType, tableName, columns,
columnsProperties, properties))
+ }
+
+ /**
+ * Check if an index exists
+ */
+ def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Boolean = {
+ val dialect = JdbcDialects.get(options.url)
+ dialect.indexExists(conn, indexName, tableName, options)
+ }
+
private def executeStatement(conn: Connection, options: JDBCOptions, sql:
String): Unit = {
val statement = conn.createStatement
try {
@@ -1018,4 +1049,31 @@ object JdbcUtils extends Logging with SQLConfHelper {
statement.close()
}
}
+
+ def executeQuery(conn: Connection, options: JDBCOptions, sql: String):
ResultSet = {
+ val statement = conn.createStatement
+ try {
+ statement.setQueryTimeout(options.queryTimeout)
+ statement.executeQuery(sql)
+ } finally {
+ statement.close()
+ }
+ }
+
+ def classifyException[T](message: String, dialect: JdbcDialect)(f: => T): T
= {
+ try {
+ f
+ } catch {
+ case e: Throwable => throw dialect.classifyException(message, e)
+ }
+ }
+
+ def withConnection[T](options: JDBCOptions)(f: Connection => T): T = {
+ val conn = createConnectionFactory(options)()
+ try {
+ f(conn)
+ } finally {
+ conn.close()
+ }
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
index d88ec2f..1db938e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTable.scala
@@ -23,13 +23,16 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.TableCapability._
+import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
+import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JdbcOptionsInWrite}
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JdbcOptionsInWrite, JdbcUtils}
+import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions:
JDBCOptions)
- extends Table with SupportsRead with SupportsWrite {
+ extends Table with SupportsRead with SupportsWrite with SupportsIndex {
override def name(): String = ident.toString
@@ -48,4 +51,33 @@ case class JDBCTable(ident: Identifier, schema: StructType,
jdbcOptions: JDBCOpt
jdbcOptions.parameters.originalMap ++
info.options.asCaseSensitiveMap().asScala)
JDBCWriteBuilder(schema, mergedOptions)
}
+
+ override def createIndex(
+ indexName: String,
+ indexType: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): Unit = {
+ JdbcUtils.withConnection(jdbcOptions) { conn =>
+ JdbcUtils.classifyException(s"Failed to create index: $indexName in
$name",
+ JdbcDialects.get(jdbcOptions.url)) {
+ JdbcUtils.createIndex(
+ conn, indexName, indexType, name, columns, columnsProperties,
properties, jdbcOptions)
+ }
+ }
+ }
+
+ override def indexExists(indexName: String): Boolean = {
+ JdbcUtils.withConnection(jdbcOptions) { conn =>
+ JdbcUtils.indexExists(conn, indexName, name, jdbcOptions)
+ }
+ }
+
+ override def dropIndex(indexName: String): Boolean = {
+ throw new UnsupportedOperationException("dropIndex is not supported yet")
+ }
+
+ override def listIndexes(): Array[TableIndex] = {
+ throw new UnsupportedOperationException("listIndexes is not supported yet")
+ }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index a90ab56..5667064 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources.v2.jdbc
-import java.sql.{Connection, SQLException}
+import java.sql.SQLException
import java.util
import scala.collection.JavaConverters._
@@ -57,7 +57,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
override def listTables(namespace: Array[String]): Array[Identifier] = {
checkNamespace(namespace)
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
val schemaPattern = if (namespace.length == 1) namespace.head else null
val rs = conn.getMetaData
.getTables(null, schemaPattern, "%", Array("TABLE"));
@@ -72,14 +72,14 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
checkNamespace(ident.namespace())
val writeOptions = new JdbcOptionsInWrite(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME ->
getTableName(ident)))
- classifyException(s"Failed table existence check: $ident") {
- withConnection(JdbcUtils.tableExists(_, writeOptions))
+ JdbcUtils.classifyException(s"Failed table existence check: $ident",
dialect) {
+ JdbcUtils.withConnection(options)(JdbcUtils.tableExists(_, writeOptions))
}
}
override def dropTable(ident: Identifier): Boolean = {
checkNamespace(ident.namespace())
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
try {
JdbcUtils.dropTable(conn, getTableName(ident), options)
true
@@ -91,8 +91,8 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
{
checkNamespace(oldIdent.namespace())
- withConnection { conn =>
- classifyException(s"Failed table renaming from $oldIdent to $newIdent") {
+ JdbcUtils.withConnection(options) { conn =>
+ JdbcUtils.classifyException(s"Failed table renaming from $oldIdent to
$newIdent", dialect) {
JdbcUtils.renameTable(conn, getTableName(oldIdent),
getTableName(newIdent), options)
}
}
@@ -151,8 +151,8 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
val writeOptions = new JdbcOptionsInWrite(tableOptions)
val caseSensitive = SQLConf.get.caseSensitiveAnalysis
- withConnection { conn =>
- classifyException(s"Failed table creation: $ident") {
+ JdbcUtils.withConnection(options) { conn =>
+ JdbcUtils.classifyException(s"Failed table creation: $ident", dialect) {
JdbcUtils.createTable(conn, getTableName(ident), schema,
caseSensitive, writeOptions)
}
}
@@ -162,8 +162,8 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
checkNamespace(ident.namespace())
- withConnection { conn =>
- classifyException(s"Failed table altering: $ident") {
+ JdbcUtils.withConnection(options) { conn =>
+ JdbcUtils.classifyException(s"Failed table altering: $ident", dialect) {
JdbcUtils.alterTable(conn, getTableName(ident), changes, options)
}
loadTable(ident)
@@ -172,7 +172,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
override def namespaceExists(namespace: Array[String]): Boolean = namespace
match {
case Array(db) =>
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
val rs = conn.getMetaData.getSchemas(null, db)
while (rs.next()) {
if (rs.getString(1) == db) return true;
@@ -183,7 +183,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
}
override def listNamespaces(): Array[Array[String]] = {
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
val schemaBuilder = ArrayBuilder.make[Array[String]]
val rs = conn.getMetaData.getSchemas()
while (rs.next()) {
@@ -234,8 +234,8 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
}
}
}
- withConnection { conn =>
- classifyException(s"Failed create name space: $db") {
+ JdbcUtils.withConnection(options) { conn =>
+ JdbcUtils.classifyException(s"Failed create name space: $db", dialect)
{
JdbcUtils.createNamespace(conn, options, db, comment)
}
}
@@ -253,7 +253,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
changes.foreach {
case set: NamespaceChange.SetProperty =>
if (set.property() == SupportsNamespaces.PROP_COMMENT) {
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
JdbcUtils.createNamespaceComment(conn, options, db, set.value)
}
} else {
@@ -262,7 +262,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
case unset: NamespaceChange.RemoveProperty =>
if (unset.property() == SupportsNamespaces.PROP_COMMENT) {
- withConnection { conn =>
+ JdbcUtils.withConnection(options) { conn =>
JdbcUtils.removeNamespaceComment(conn, options, db)
}
} else {
@@ -283,8 +283,8 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
if (listTables(Array(db)).nonEmpty) {
throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
}
- withConnection { conn =>
- classifyException(s"Failed drop name space: $db") {
+ JdbcUtils.withConnection(options) { conn =>
+ JdbcUtils.classifyException(s"Failed drop name space: $db", dialect) {
JdbcUtils.dropNamespace(conn, options, db)
true
}
@@ -301,24 +301,7 @@ class JDBCTableCatalog extends TableCatalog with
SupportsNamespaces with Logging
}
}
- private def withConnection[T](f: Connection => T): T = {
- val conn = JdbcUtils.createConnectionFactory(options)()
- try {
- f(conn)
- } finally {
- conn.close()
- }
- }
-
private def getTableName(ident: Identifier): String = {
(ident.namespace() :+
ident.name()).map(dialect.quoteIdentifier).mkString(".")
}
-
- private def classifyException[T](message: String)(f: => T): T = {
- try {
- f
- } catch {
- case e: Throwable => throw dialect.classifyException(message, e)
- }
- }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index aa95711..d1c4f8d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.jdbc
import java.sql.{Connection, Date, Timestamp}
import java.time.{Instant, LocalDate}
+import java.util
import scala.collection.mutable.ArrayBuilder
@@ -30,8 +31,9 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils,
TimestampFormatter}
import org.apache.spark.sql.connector.catalog.TableChange
import org.apache.spark.sql.connector.catalog.TableChange._
+import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -288,6 +290,43 @@ abstract class JdbcDialect extends Serializable with
Logging{
}
/**
+ * Creates an index.
+ *
+ * @param indexName the name of the index to be created
+ * @param indexType the type of the index to be created
+ * @param tableName the table on which index to be created
+ * @param columns the columns on which index to be created
+ * @param columnsProperties the properties of the columns on which index to
be created
+ * @param properties the properties of the index to be created
+ */
+ def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ throw new UnsupportedOperationException("createIndex is not supported")
+ }
+
+ /**
+ * Checks whether an index exists
+ *
+ * @param indexName the name of the index
+ * @param tableName the table name on which index to be checked
+ * @param options JDBCOptions of the table
+ * @return true if the index with `indexName` exists in the table with
`tableName`,
+ * false otherwise
+ */
+ def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Boolean = {
+ throw new UnsupportedOperationException("indexExists is not supported")
+ }
+
+ /**
* Gets a dialect exception, classifies it and wraps it by
`AnalysisException`.
* @param message The error message to be placed to the returned exception.
* @param e The dialect specific exception.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
index ed10770..5c16ef6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala
@@ -17,14 +17,21 @@
package org.apache.spark.sql.jdbc
-import java.sql.Types
+import java.sql.{Connection, SQLException, Types}
+import java.util
import java.util.Locale
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.IndexAlreadyExistsException
+import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType,
MetadataBuilder}
-private case object MySQLDialect extends JdbcDialect {
+private case object MySQLDialect extends JdbcDialect with SQLConfHelper {
override def canHandle(url : String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:mysql")
@@ -102,4 +109,65 @@ private case object MySQLDialect extends JdbcDialect {
case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT))
case _ => JdbcUtils.getCommonJDBCType(dt)
}
+
+ // CREATE INDEX syntax
+ // https://dev.mysql.com/doc/refman/8.0/en/create-index.html
+ override def createIndex(
+ indexName: String,
+ indexType: String,
+ tableName: String,
+ columns: Array[NamedReference],
+ columnsProperties: Array[util.Map[NamedReference, util.Properties]],
+ properties: util.Properties): String = {
+ val columnList = columns.map(col => quoteIdentifier(col.fieldNames.head))
+ var indexProperties: String = ""
+ val scalaProps = properties.asScala
+ if (!properties.isEmpty) {
+ scalaProps.foreach { case (k, v) =>
+ indexProperties = indexProperties + " " + s"$k $v"
+ }
+ }
+
+ // columnsProperties doesn't apply to MySQL so it is ignored
+ s"CREATE $indexType INDEX ${quoteIdentifier(indexName)} ON" +
+ s" ${quoteIdentifier(tableName)}" + s" (${columnList.mkString(", ")})
$indexProperties"
+ }
+
+ // SHOW INDEX syntax
+ // https://dev.mysql.com/doc/refman/8.0/en/show-index.html
+ override def indexExists(
+ conn: Connection,
+ indexName: String,
+ tableName: String,
+ options: JDBCOptions): Boolean = {
+ val sql = s"SHOW INDEXES FROM ${quoteIdentifier(tableName)}"
+ try {
+ val rs = JdbcUtils.executeQuery(conn, options, sql)
+ while (rs.next()) {
+ val retrievedIndexName = rs.getString("key_name")
+ if (conf.resolver(retrievedIndexName, indexName)) {
+ return true
+ }
+ }
+ false
+ } catch {
+ case _: Exception =>
+ logWarning("Cannot retrieved index info.")
+ false
+ }
+ }
+
+ override def classifyException(message: String, e: Throwable):
AnalysisException = {
+ if (e.isInstanceOf[SQLException]) {
+ // Error codes are from
+ //
https://mariadb.com/kb/en/mariadb-error-codes/#shared-mariadbmysql-error-codes
+ e.asInstanceOf[SQLException].getErrorCode match {
+ // ER_DUP_KEYNAME
+ case 1061 =>
+ throw new IndexAlreadyExistsException(message, cause = Some(e))
+ case _ =>
+ }
+ }
+ super.classifyException(message, e)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]