This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 802890a75 [KYUUBI #2542] [Subtask] Kyuubi Spark TPC-DS Connector -
Make useAnsiStringType configurable
802890a75 is described below
commit 802890a759929a01ae74ee1b03dccb7f2750f8ab
Author: sychen <[email protected]>
AuthorDate: Tue May 10 13:24:03 2022 +0800
[KYUUBI #2542] [Subtask] Kyuubi Spark TPC-DS Connector - Make
useAnsiStringType configurable
### _Why are the changes needed?_
close #2542
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2610 from cxzl25/KYUUBI-2542.
Closes #2542
cefc1949 [sychen] useAnsiStringType
Authored-by: sychen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../spark/connector/tpcds/TPCDSCatalog.scala | 8 ++-
.../kyuubi/spark/connector/tpcds/TPCDSTable.scala | 6 +--
.../spark/connector/tpcds/LocalSparkSession.scala | 39 +++++++++++++++
.../spark/connector/tpcds/TPCDSCatalogSuite.scala | 5 ++
.../spark/connector/tpcds/TPCDSTableSuite.scala | 58 ++++++++++++++++++++++
5 files changed, 111 insertions(+), 5 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
index 13b3f55b6..9026cfe95 100644
---
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
@@ -38,9 +38,13 @@ class TPCDSCatalog extends TableCatalog {
val databases: Array[String] = scales.map("sf" + _)
+ var options: CaseInsensitiveStringMap = _
+
override def name: String = "tpcds"
- override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {}
+ override def initialize(name: String, options: CaseInsensitiveStringMap):
Unit = {
+ this.options = options
+ }
override def listTables(namespace: Array[String]): Array[Identifier] =
namespace match {
case Array(db) if databases contains db =>
tables.map(Identifier.of(namespace, _))
@@ -49,7 +53,7 @@ class TPCDSCatalog extends TableCatalog {
override def loadTable(ident: Identifier): SparkTable = (ident.namespace,
ident.name) match {
case (Array(db), table) if databases contains db =>
- new TPCDSTable(table.toLowerCase, scales(databases indexOf db))
+ new TPCDSTable(table.toLowerCase, scales(databases indexOf db), options)
case (_, _) => throw new NoSuchTableException(ident)
}
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
index ef3b3b391..12dd69155 100644
---
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
@@ -31,11 +31,11 @@ import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-class TPCDSTable(tbl: String, scale: Int) extends SparkTable with SupportsRead
{
+class TPCDSTable(tbl: String, scale: Int, options: CaseInsensitiveStringMap)
+ extends SparkTable with SupportsRead {
// When true, use CHAR VARCHAR; otherwise use STRING
- // TODO: make it configurable
- val useAnsiStringType: Boolean = false
+ val useAnsiStringType: Boolean = options.getBoolean("useAnsiStringType",
false)
val tablePartitionColumns: Map[String, Array[String]] = Map(
"catalog_sales" -> Array("cs_sold_date_sk"),
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
new file mode 100644
index 000000000..261093001
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpcds
+
+import org.apache.spark.sql.SparkSession
+
+object LocalSparkSession {
+ def stop(spark: SparkSession): Unit = {
+ if (spark != null) {
+ spark.stop()
+ }
+ // To avoid RPC rebinding to the same port, since it doesn't unbind
immediately on shutdown
+ System.clearProperty("spark.driver.port")
+ }
+
+ /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
+ def withSparkSession[T](sc: SparkSession)(f: SparkSession => T): T = {
+ try {
+ f(sc)
+ } finally {
+ stop(sc)
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
index 9a60e0782..85664d6fa 100644
---
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
@@ -58,4 +58,9 @@ class TPCDSCatalogSuite extends KyuubiFunSuite {
assert(spark.table("tpcds.sf1.web_sales").count === 719384)
assert(spark.table("tpcds.sf1.web_site").count === 30)
}
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.stop()
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
new file mode 100644
index 000000000..4fc84c1db
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpcds
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+import org.apache.kyuubi.KyuubiFunSuite
+import
org.apache.kyuubi.spark.connector.tpcds.LocalSparkSession.withSparkSession
+
+class TPCDSTableSuite extends KyuubiFunSuite {
+
+ test("useAnsiStringType (true,false)") {
+ Seq(true, false).foreach(key => {
+ val sparkConf = new SparkConf().setMaster("local[*]")
+ .set("spark.ui.enabled", "false")
+ .set("spark.sql.catalogImplementation", "in-memory")
+ .set("spark.sql.catalog.tpcds", classOf[TPCDSCatalog].getName)
+ .set("spark.sql.catalog.tpcds.useAnsiStringType", key.toString)
+ withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) {
spark =>
+ val rows = spark.sql("desc tpcds.sf1.call_center").collect()
+ rows.foreach(row => {
+ val dataType = row.getString(1)
+ row.getString(0) match {
+ case "cc_call_center_id" =>
+ if (key) {
+ assert(dataType == "char(16)")
+ } else {
+ assert(dataType == "string")
+ }
+ case "cc_name" =>
+ if (key) {
+ assert(dataType == "varchar(50)")
+ } else {
+ assert(dataType == "string")
+ }
+ case _ =>
+ }
+ })
+ }
+ })
+ }
+}