This is an automated email from the ASF dual-hosted git repository.

yikaifei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46f8e0ca9 [KYUUBI #5017] [KSHC] Support Parquet/Orc provider is 
splitable
46f8e0ca9 is described below

commit 46f8e0ca94e09b7a06ad540b31a70437566ab845
Author: yikaifei <[email protected]>
AuthorDate: Thu Jul 6 19:21:05 2023 +0800

    [KYUUBI #5017] [KSHC] Support Parquet/Orc provider is splitable
    
    ### _Why are the changes needed?_
    
    This PR amins to support Parquet/Orc provider is splitable.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5017 from Yikf/KSHC-support-split.
    
    Closes #5017
    
    9dc3d3d56 [yikaifei] Support Parquet/Orc provider is splitable
    
    Authored-by: yikaifei <[email protected]>
    Signed-off-by: yikaifei <[email protected]>
---
 .../kyuubi/spark/connector/hive/read/HiveScan.scala   | 16 +++++++++++++++-
 .../spark/connector/hive/HiveCatalogSuite.scala       | 19 +++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 5895ecf03..518a9cb68 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -54,6 +54,15 @@ case class HiveScan(
 
   private val partFileToHivePartMap: mutable.Map[PartitionedFile, 
HivePartition] = mutable.Map()
 
+  override def isSplitable(path: Path): Boolean = {
+    catalogTable.provider.map(_.toUpperCase(Locale.ROOT)).exists {
+      case "PARQUET" => true
+      case "ORC" => true
+      case "HIVE" => isHiveOrcOrParquet(catalogTable.storage)
+      case _ => super.isSplitable(path)
+    }
+  }
+
   override def createReaderFactory(): PartitionReaderFactory = {
     val hiveConf = fileIndex.hiveCatalog.hadoopConfiguration()
     addCatalogTableConfToConf(hiveConf, catalogTable)
@@ -142,6 +151,11 @@ case class HiveScan(
     }
   }
 
+  private def isHiveOrcOrParquet(storage: CatalogStorageFormat): Boolean = {
+    val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+    serde.contains("parquet") || serde.contains("orc")
+  }
+
   def toAttributes(structType: StructType): Seq[AttributeReference] =
     structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)())
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 5e0be5ce6..4719544de 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+import org.apache.kyuubi.spark.connector.hive.read.HiveScan
 
 class HiveCatalogSuite extends KyuubiHiveTest {
 
@@ -339,4 +340,22 @@ class HiveCatalogSuite extends KyuubiHiveTest {
 
     catalog.dropNamespace(testNs, cascade = false)
   }
+
+  test("Support Parquet/Orc provider is splitable") {
+    val parquet_table = Identifier.of(testNs, "parquet_table")
+    val parProps: util.Map[String, String] = new util.HashMap[String, String]()
+    parProps.put(TableCatalog.PROP_PROVIDER, "parquet")
+    val pt = catalog.createTable(parquet_table, schema, 
Array.empty[Transform], parProps)
+    val parScan = pt.asInstanceOf[HiveTable]
+      
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
+    assert(parScan.isSplitable(new Path("empty")))
+
+    val orc_table = Identifier.of(testNs, "orc_table")
+    val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
+    orcProps.put(TableCatalog.PROP_PROVIDER, "orc")
+    val ot = catalog.createTable(orc_table, schema, Array.empty[Transform], 
orcProps)
+    val orcScan = ot.asInstanceOf[HiveTable]
+      
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
+    assert(orcScan.isSplitable(new Path("empty")))
+  }
 }

Reply via email to