This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 802890a75 [KYUUBI #2542] [Subtask] Kyuubi Spark TPC-DS Connector - 
Make useAnsiStringType configurable
802890a75 is described below

commit 802890a759929a01ae74ee1b03dccb7f2750f8ab
Author: sychen <[email protected]>
AuthorDate: Tue May 10 13:24:03 2022 +0800

    [KYUUBI #2542] [Subtask] Kyuubi Spark TPC-DS Connector - Make 
useAnsiStringType configurable
    
    ### _Why are the changes needed?_
    close #2542
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #2610 from cxzl25/KYUUBI-2542.
    
    Closes #2542
    
    cefc1949 [sychen] useAnsiStringType
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../spark/connector/tpcds/TPCDSCatalog.scala       |  8 ++-
 .../kyuubi/spark/connector/tpcds/TPCDSTable.scala  |  6 +--
 .../spark/connector/tpcds/LocalSparkSession.scala  | 39 +++++++++++++++
 .../spark/connector/tpcds/TPCDSCatalogSuite.scala  |  5 ++
 .../spark/connector/tpcds/TPCDSTableSuite.scala    | 58 ++++++++++++++++++++++
 5 files changed, 111 insertions(+), 5 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
index 13b3f55b6..9026cfe95 100644
--- 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalog.scala
@@ -38,9 +38,13 @@ class TPCDSCatalog extends TableCatalog {
 
   val databases: Array[String] = scales.map("sf" + _)
 
+  var options: CaseInsensitiveStringMap = _
+
   override def name: String = "tpcds"
 
-  override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {}
+  override def initialize(name: String, options: CaseInsensitiveStringMap): 
Unit = {
+    this.options = options
+  }
 
   override def listTables(namespace: Array[String]): Array[Identifier] = 
namespace match {
     case Array(db) if databases contains db => 
tables.map(Identifier.of(namespace, _))
@@ -49,7 +53,7 @@ class TPCDSCatalog extends TableCatalog {
 
   override def loadTable(ident: Identifier): SparkTable = (ident.namespace, 
ident.name) match {
     case (Array(db), table) if databases contains db =>
-      new TPCDSTable(table.toLowerCase, scales(databases indexOf db))
+      new TPCDSTable(table.toLowerCase, scales(databases indexOf db), options)
     case (_, _) => throw new NoSuchTableException(ident)
   }
 
diff --git 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
index ef3b3b391..12dd69155 100644
--- 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/main/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTable.scala
@@ -31,11 +31,11 @@ import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
-class TPCDSTable(tbl: String, scale: Int) extends SparkTable with SupportsRead 
{
+class TPCDSTable(tbl: String, scale: Int, options: CaseInsensitiveStringMap)
+  extends SparkTable with SupportsRead {
 
   // When true, use CHAR VARCHAR; otherwise use STRING
-  // TODO: make it configurable
-  val useAnsiStringType: Boolean = false
+  val useAnsiStringType: Boolean = options.getBoolean("useAnsiStringType", 
false)
 
   val tablePartitionColumns: Map[String, Array[String]] = Map(
     "catalog_sales" -> Array("cs_sold_date_sk"),
diff --git 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
new file mode 100644
index 000000000..261093001
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/LocalSparkSession.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpcds
+
+import org.apache.spark.sql.SparkSession
+
+object LocalSparkSession {
+  def stop(spark: SparkSession): Unit = {
+    if (spark != null) {
+      spark.stop()
+    }
+    // To avoid RPC rebinding to the same port, since it doesn't unbind 
immediately on shutdown
+    System.clearProperty("spark.driver.port")
+  }
+
+  /** Runs `f` by passing in `sc` and ensures that `sc` is stopped. */
+  def withSparkSession[T](sc: SparkSession)(f: SparkSession => T): T = {
+    try {
+      f(sc)
+    } finally {
+      stop(sc)
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
index 9a60e0782..85664d6fa 100644
--- 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSCatalogSuite.scala
@@ -58,4 +58,9 @@ class TPCDSCatalogSuite extends KyuubiFunSuite {
     assert(spark.table("tpcds.sf1.web_sales").count === 719384)
     assert(spark.table("tpcds.sf1.web_site").count === 30)
   }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.stop()
+  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
new file mode 100644
index 000000000..4fc84c1db
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-tpcds/src/test/scala/org/apache/kyuubi/spark/connector/tpcds/TPCDSTableSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.spark.connector.tpcds
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+import org.apache.kyuubi.KyuubiFunSuite
+import 
org.apache.kyuubi.spark.connector.tpcds.LocalSparkSession.withSparkSession
+
+class TPCDSTableSuite extends KyuubiFunSuite {
+
+  test("useAnsiStringType (true,false)") {
+    Seq(true, false).foreach(key => {
+      val sparkConf = new SparkConf().setMaster("local[*]")
+        .set("spark.ui.enabled", "false")
+        .set("spark.sql.catalogImplementation", "in-memory")
+        .set("spark.sql.catalog.tpcds", classOf[TPCDSCatalog].getName)
+        .set("spark.sql.catalog.tpcds.useAnsiStringType", key.toString)
+      withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { 
spark =>
+        val rows = spark.sql("desc tpcds.sf1.call_center").collect()
+        rows.foreach(row => {
+          val dataType = row.getString(1)
+          row.getString(0) match {
+            case "cc_call_center_id" =>
+              if (key) {
+                assert(dataType == "char(16)")
+              } else {
+                assert(dataType == "string")
+              }
+            case "cc_name" =>
+              if (key) {
+                assert(dataType == "varchar(50)")
+              } else {
+                assert(dataType == "string")
+              }
+            case _ =>
+          }
+        })
+      }
+    })
+  }
+}

Reply via email to