Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1876#discussion_r164655879
--- Diff:
integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
---
@@ -414,6 +416,75 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
sql("select * from casesensitivepartition where empno=17"))
}
+ test("Partition LOAD with small files") {
+ sql("DROP TABLE IF EXISTS smallpartitionfiles")
+ sql(
+ """
+ | CREATE TABLE smallpartitionfiles(id INT, name STRING, age INT)
PARTITIONED BY(city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val inputPath = new File("target/small_files").getCanonicalPath
+ val folder = new File(inputPath)
+ if (folder.exists()) {
+ FileUtils.deleteDirectory(folder)
+ }
+ folder.mkdir()
+ for (i <- 0 to 100) {
+ val file = s"$folder/file$i.csv"
+ val writer = new FileWriter(file)
+ writer.write("id,name,city,age\n")
+ writer.write(s"$i,name_$i,city_${i % 5},${ i % 100 }")
+ writer.close()
+ }
+ sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE
smallpartitionfiles")
+ FileUtils.deleteDirectory(folder)
+ val carbonTable =
CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
+ val carbonTablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+ val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+ assert(new File(segmentDir).listFiles().length < 50)
+ }
+
+ test("verify partition read with small files") {
+ try {
+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)
+ sql("DROP TABLE IF EXISTS smallpartitionfilesread")
+ sql(
+ """
+ | CREATE TABLE smallpartitionfilesread(id INT, name STRING, age
INT) PARTITIONED BY
+ | (city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val inputPath = new File("target/small_files").getCanonicalPath
+ val folder = new File(inputPath)
+ if (folder.exists()) {
+ FileUtils.deleteDirectory(folder)
+ }
+ folder.mkdir()
+ for (i <- 0 until 100) {
+ val file = s"$folder/file$i.csv"
+ val writer = new FileWriter(file)
+ writer.write("id,name,city,age\n")
+ writer.write(s"$i,name_$i,city_${ i },${ i % 100 }")
+ writer.close()
+ }
+ sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE
smallpartitionfilesread")
+ FileUtils.deleteDirectory(folder)
+ val dataFrame = sql("select * from smallpartitionfilesread")
+ val scanRdd = dataFrame.queryExecution.sparkPlan.collect {
+ case b: BatchedDataSourceScanExec if
b.rdd.isInstanceOf[CarbonScanRDD] => b.rdd
+ .asInstanceOf[CarbonScanRDD]
+ }.head
+ assert(scanRdd.getPartitions.length < 10)
+ assertResult(100)(dataFrame.collect().length)
--- End diff --
suggest to use dataFrame.count
---