This is an automated email from the ASF dual-hosted git repository.
yikaifei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 46f8e0ca9 [KYUUBI #5017] [KSHC] Support Parquet/Orc provider is
splitable
46f8e0ca9 is described below
commit 46f8e0ca94e09b7a06ad540b31a70437566ab845
Author: yikaifei <[email protected]>
AuthorDate: Thu Jul 6 19:21:05 2023 +0800
[KYUUBI #5017] [KSHC] Support Parquet/Orc provider is splitable
### _Why are the changes needed?_
This PR amins to support Parquet/Orc provider is splitable.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5017 from Yikf/KSHC-support-split.
Closes #5017
9dc3d3d56 [yikaifei] Support Parquet/Orc provider is splitable
Authored-by: yikaifei <[email protected]>
Signed-off-by: yikaifei <[email protected]>
---
.../kyuubi/spark/connector/hive/read/HiveScan.scala | 16 +++++++++++++++-
.../spark/connector/hive/HiveCatalogSuite.scala | 19 +++++++++++++++++++
2 files changed, 34 insertions(+), 1 deletion(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
index 5895ecf03..518a9cb68 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveScan.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.connector.read.PartitionReaderFactory
@@ -54,6 +54,15 @@ case class HiveScan(
private val partFileToHivePartMap: mutable.Map[PartitionedFile,
HivePartition] = mutable.Map()
+ override def isSplitable(path: Path): Boolean = {
+ catalogTable.provider.map(_.toUpperCase(Locale.ROOT)).exists {
+ case "PARQUET" => true
+ case "ORC" => true
+ case "HIVE" => isHiveOrcOrParquet(catalogTable.storage)
+ case _ => super.isSplitable(path)
+ }
+ }
+
override def createReaderFactory(): PartitionReaderFactory = {
val hiveConf = fileIndex.hiveCatalog.hadoopConfiguration()
addCatalogTableConfToConf(hiveConf, catalogTable)
@@ -142,6 +151,11 @@ case class HiveScan(
}
}
+ private def isHiveOrcOrParquet(storage: CatalogStorageFormat): Boolean = {
+ val serde = storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
+ serde.contains("parquet") || serde.contains("orc")
+ }
+
def toAttributes(structType: StructType): Seq[AttributeReference] =
structType.map(f => AttributeReference(f.name, f.dataType, f.nullable,
f.metadata)())
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 5e0be5ce6..4719544de 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper
+import org.apache.kyuubi.spark.connector.hive.read.HiveScan
class HiveCatalogSuite extends KyuubiHiveTest {
@@ -339,4 +340,22 @@ class HiveCatalogSuite extends KyuubiHiveTest {
catalog.dropNamespace(testNs, cascade = false)
}
+
+ test("Support Parquet/Orc provider is splitable") {
+ val parquet_table = Identifier.of(testNs, "parquet_table")
+ val parProps: util.Map[String, String] = new util.HashMap[String, String]()
+ parProps.put(TableCatalog.PROP_PROVIDER, "parquet")
+ val pt = catalog.createTable(parquet_table, schema,
Array.empty[Transform], parProps)
+ val parScan = pt.asInstanceOf[HiveTable]
+
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
+ assert(parScan.isSplitable(new Path("empty")))
+
+ val orc_table = Identifier.of(testNs, "orc_table")
+ val orcProps: util.Map[String, String] = new util.HashMap[String, String]()
+ orcProps.put(TableCatalog.PROP_PROVIDER, "orc")
+ val ot = catalog.createTable(orc_table, schema, Array.empty[Transform],
orcProps)
+ val orcScan = ot.asInstanceOf[HiveTable]
+
.newScanBuilder(CaseInsensitiveStringMap.empty()).build().asInstanceOf[HiveScan]
+ assert(orcScan.isSplitable(new Path("empty")))
+ }
}