This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4369472 [CARBONDATA-4042]Launch same number of task as select query
for insert into select and ctas cases when target table is of no_sort
4369472 is described below
commit 4369472d69fd58a659f2ec62722d212256264fc1
Author: Venu Reddy <[email protected]>
AuthorDate: Thu Oct 8 12:46:33 2020 +0530
[CARBONDATA-4042]Launch same number of task as select query for insert into
select and ctas cases when target table is of no_sort
Why is this PR needed?
At present, When we do insert into table select from or create table as
select
from, we lauch one single task per node. Whereas when we do a simple select
*
from table query, tasks launched are equal to number of carbondata
files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK).
Thus, slows down the load performance of insert into select and ctas cases.
Refer [Community discussion regd. task lauch]
(http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html)
What changes were proposed in this PR?
1. Lauch the same number of tasks as in select query for insert into select
and
ctas cases when the target table is of no-sort.
2. Replaced all checkAnswerWithoutSort with checkAnswer in
TestSIWithSecondaryIndex
to fix random UT failure issues in that file.
This closes #3972
---
.../secondaryindex/TestSIWithSecondaryIndex.scala | 10 ++--
.../spark/rdd/CarbonDataRDDFactory.scala | 48 ++++++++++++++++++-
.../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +-
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 30 ++++++++++++
.../index/bloom/BloomCoarseGrainIndexSuite.scala | 54 +++++++++++++++-------
5 files changed, 120 insertions(+), 24 deletions(-)
diff --git
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index ebc98dd..b8f0e76 100644
---
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -89,7 +89,7 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("insert into table1 select 'xx', '2', 'china' union all select 'xx',
'1', 'india'")
sql("create index table1_index on table table1(id, country) as
'carbondata' properties" +
"('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
- checkAnswerWithoutSort(sql("select id, country from table1_index"),
+ checkAnswer(sql("select id, country from table1_index"),
Seq(Row("1", "india"), Row("2", "china")))
// check for valid sort_scope
checkExistence(sql("describe formatted table1_index"), true, "Sort Scope
global_sort")
@@ -112,7 +112,7 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("create index table11_index on table table11(id, country) as
'carbondata' properties" +
"('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
sql("insert into table11 select 'xx', '2', 'china' union all select 'xx',
'1', 'india'")
- checkAnswerWithoutSort(sql("select id, country from table11_index"),
+ checkAnswer(sql("select id, country from table11_index"),
Seq(Row("1", "india"), Row("2", "china")))
// check for valid sort_scope
checkExistence(sql("describe formatted table11_index"), true, "Sort Scope
global_sort")
@@ -128,7 +128,7 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
"('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
sql("insert into partition_carbon_table select 'xx', '2', 'china', '2020'
" +
"union all select 'xx', '1', 'india', '2021'")
- checkAnswerWithoutSort(sql("select id, country from
partition_carbon_table_index"),
+ checkAnswer(sql("select id, country from partition_carbon_table_index"),
Seq(Row("1", "india"), Row("2", "china")))
// check for valid sort_scope
checkExistence(sql("describe formatted partition_carbon_table_index"),
@@ -138,7 +138,7 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("create index partition_carbon_table_index on table
partition_carbon_table(" +
"id, country) as 'carbondata' properties" +
"('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
- checkAnswerWithoutSort(sql("select id, country from
partition_carbon_table_index"),
+ checkAnswer(sql("select id, country from partition_carbon_table_index"),
Seq(Row("1", "india"), Row("2", "china")))
// check for valid sort_scope
checkExistence(sql("describe formatted partition_carbon_table_index"),
@@ -155,7 +155,7 @@ class TestSIWithSecondaryIndex extends QueryTest with
BeforeAndAfterAll {
sql("drop index if exists complextable_index_1 on complextable")
sql("create index complextable_index_1 on table complextable(country,
name) " +
"as 'carbondata' properties('sort_scope'='global_sort',
'Global_sort_partitions'='3')")
- checkAnswerWithoutSort(sql("select country,name from
complextable_index_1"),
+ checkAnswer(sql("select country,name from complextable_index_1"),
Seq(Row("china", "b"), Row("china", "v"), Row("india", "v"), Row("pak",
"v"), Row("us", "b")))
// check for valid sort_scope
checkExistence(sql("describe formatted complextable_index_1"), true, "Sort
Scope global_sort")
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cb58953..50163b1 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer,
RDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer,
DataLoadWrapperRDD, RDD}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession,
SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.execution.command.{CompactionModel,
ExecutionErrors, UpdateTableModel}
@@ -391,6 +391,12 @@ object CarbonDataRDDFactory {
carbonLoadModel,
hadoopConf,
segmentMetaDataAccumulator)
+ } else if (sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+ loadDataFrameForNoSort(sqlContext,
+ None,
+ Some(convertedRdd),
+ carbonLoadModel,
+ segmentMetaDataAccumulator)
} else {
loadDataFrame(sqlContext,
None,
@@ -1122,6 +1128,46 @@ object CarbonDataRDDFactory {
}
/**
+ * Execute load process to load from input DataFrame
+ *
+ * @param sqlContext sql context
+ * @param dataFrame optional DataFrame for insert
+ * @param scanResultRDD optional internal row rdd for direct insert
+ * @param carbonLoadModel load model
+ * @param segmentMetaDataAccumulator segment metadata accumulator
+ * @return Return an array of tuple of uniqueLoadStatusId and tuple of
LoadMetadataDetails and
+ * ExecutionErrors
+ */
+ def loadDataFrameForNoSort(
+ sqlContext: SQLContext,
+ dataFrame: Option[DataFrame],
+ scanResultRDD: Option[RDD[InternalRow]],
+ carbonLoadModel: CarbonLoadModel,
+ segmentMetaDataAccumulator: CollectionAccumulator[Map[String,
SegmentMetaDataInfo]]
+ ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+ try {
+ val newRdd = if (dataFrame.isDefined) {
+ new DataLoadWrapperRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd)
+ } else {
+ // For internal row, no need of converter and re-arrange step,
+ carbonLoadModel.setLoadWithoutConverterWithoutReArrangeStep(true)
+ new DataLoadWrapperRDD[InternalRow](sqlContext.sparkSession,
scanResultRDD.get)
+ }
+ new NewDataFrameLoaderRDD(
+ sqlContext.sparkSession,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ newRdd,
+ segmentMetaDataAccumulator
+ ).collect()
+ } catch {
+ case ex: Exception =>
+ LOGGER.error("load data frame failed", ex)
+ throw ex
+ }
+ }
+
+ /**
* Execute load process to load from input file path specified in
`carbonLoadModel`
*/
private def loadDataFile(
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ff78cd..9448ede 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -258,7 +258,7 @@ class NewDataFrameLoaderRDD[K, V](
@transient private val ss: SparkSession,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- prev: DataLoadCoalescedRDD[_],
+ prev: CarbonRDD[_],
segmentMetaDataAccumulator: CollectionAccumulator[Map[String,
SegmentMetaDataInfo]]
) extends CarbonRDD[(K, V)](ss, prev) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 2f22b0b..5cc9f2b 100644
---
a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -70,3 +70,33 @@ class DataLoadCoalescedRDD[T: ClassTag](
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}
+
+class DataLoadWrapperRDD[T: ClassTag](@transient private val sparkSession:
SparkSession,
+ @transient private var prev: RDD[T])
+ extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {
+
+ override def internalGetPartitions: Array[Partition] = {
+ prev.partitions
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(new OneToOneDependency[T](prev))
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
+
+ override def internalCompute(split: Partition,
+ context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+ new Iterator[DataLoadPartitionWrap[T]] {
+ var first = false
+ override def hasNext: Boolean = !first
+ override def next: DataLoadPartitionWrap[T] = {
+ first = true
+ DataLoadPartitionWrap(firstParent[T], split)
+ }
+ }
+ }
+}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
index 47d8dd9..1b58ed3 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
@@ -17,12 +17,14 @@
package org.apache.carbondata.index.bloom
-import java.io.{File, PrintWriter}
+import java.io.{BufferedWriter, File, FileWriter, PrintWriter}
import java.util.UUID
+import scala.collection.mutable.ListBuffer
import scala.util.Random
-import org.apache.spark.sql.{DataFrame, Row}
+import au.com.bytecode.opencsv.CSVWriter
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
@@ -680,13 +682,16 @@ class BloomCoarseGrainIndexSuite extends QueryTest with
BeforeAndAfterAll with B
""".stripMargin)
// load data into table (segment1)
- sql(
- s"""
- | INSERT INTO TABLE $bloomSampleTable VALUES
- |
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',0,'S01','S02'),
- |
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',4,'S11','S12'),
- |
(102,'name2','city2',12,'s12','s22','s32','s42','s52','s62','s72','s82',5,'S21','S22')
- """.stripMargin)
+ val csv = s"$resourcesPath/bloomCoarseGrainIndexSuiteLoad.csv"
+ val rows1 = new ListBuffer[Array[String]]
+ // scalastyle:off lineLength
+ rows1 += Array("100", "name0", "city0", "10", "s10", "s20", "s30", "s40",
"s50", "s60", "s70", "s80", "0", "S01", "S02")
+ rows1 += Array("101", "name1", "city1", "11", "s11", "s21", "s31", "s41",
"s51", "s61", "s71", "s81", "4", "S11", "S12")
+ rows1 += Array("102", "name2", "city2", "12", "s12", "s22", "s32", "s42",
"s52", "s62", "s72", "s82", "5", "S21", "S22")
+ // scalastyle:on lineLength
+ createCSV(rows1, csv)
+ sql(s"""LOAD DATA LOCAL INPATH '$csv' INTO TABLE $bloomSampleTable
OPTIONS('header'='false')"""
+ .stripMargin)
// check data after columns added
var res = sql(
@@ -707,14 +712,16 @@ class BloomCoarseGrainIndexSuite extends QueryTest with
BeforeAndAfterAll with B
""".stripMargin)
// load data into table (segment2)
- sql(
- s"""
- | INSERT INTO TABLE $bloomSampleTable VALUES
- |
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',1,'S01','S02'),
- |
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',2,'S11','S12'),
- |
(102,'name2','city1',12,'s12','s22','s32','s42','s52','s62','s72','s82',3,'S21','S22')
- """.stripMargin)
-
+ val rows2 = new ListBuffer[Array[String]]
+ // scalastyle:off lineLength
+ rows2 += Array("100", "name0", "city0", "10", "s10", "s20", "s30", "s40",
"s50", "s60", "s70", "s80", "1", "S01", "S02")
+ rows2 += Array("101", "name1", "city1", "11", "s11", "s21", "s31", "s41",
"s51", "s61", "s71", "s81", "2", "S11", "S12")
+ rows2 += Array("102", "name2", "city2", "12", "s12", "s22", "s32", "s42",
"s52", "s62", "s72", "s82", "3", "S21", "S22")
+ // scalastyle:on lineLength
+ createCSV(rows2, csv)
+ sql(s"""LOAD DATA LOCAL INPATH '$csv' INTO TABLE $bloomSampleTable
OPTIONS('header'='false')"""
+ .stripMargin)
+ deleteFile(csv)
var explainString = sql(
s"""
| explain SELECT id, name, num1, dictString
@@ -987,6 +994,19 @@ class BloomCoarseGrainIndexSuite extends QueryTest with
BeforeAndAfterAll with B
CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
}
+ def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
+ val out = new BufferedWriter(new FileWriter(csvPath))
+ val writer: CSVWriter = new CSVWriter(out, ',',
CSVWriter.NO_QUOTE_CHARACTER)
+ try {
+ for (row <- rows) {
+ writer.writeNext(row)
+ }
+ } finally {
+ out.close()
+ writer.close()
+ }
+ }
+
private def createFile(fileName: String, line: Int = 10000, start: Int = 0)
= {
if (!new File(fileName).exists()) {
val write = new PrintWriter(new File(fileName))