This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4369472  [CARBONDATA-4042]Launch same number of task as select query 
for insert into select and ctas cases when target table is of no_sort
4369472 is described below

commit 4369472d69fd58a659f2ec62722d212256264fc1
Author: Venu Reddy <[email protected]>
AuthorDate: Thu Oct 8 12:46:33 2020 +0530

    [CARBONDATA-4042]Launch same number of task as select query for insert into
    select and ctas cases when target table is of no_sort
    
    Why is this PR needed?
    At present, When we do insert into table select from or create table as 
select
    from, we lauch one single task per node. Whereas when we do a simple select 
*
    from table query, tasks launched are equal to number of carbondata
    files(CARBON_TASK_DISTRIBUTION default is CARBON_TASK_DISTRIBUTION_BLOCK).
    
    Thus, slows down the load performance of insert into select and ctas cases.
    Refer [Community discussion regd. task lauch]
    (http://apache-carbondata-dev-mailing-list-archive.1130556.n5.nabble.com/
    
Discussion-Query-Regarding-Task-launch-mechanism-for-data-load-operations-tt98711.html)
    
    What changes were proposed in this PR?
    1. Lauch the same number of tasks as in select query for insert into select 
and
    ctas cases when the target table is of no-sort.
    2. Replaced all checkAnswerWithoutSort with checkAnswer in 
TestSIWithSecondaryIndex
    to fix random UT failure issues in that file.
    
    This closes #3972
---
 .../secondaryindex/TestSIWithSecondaryIndex.scala  | 10 ++--
 .../spark/rdd/CarbonDataRDDFactory.scala           | 48 ++++++++++++++++++-
 .../spark/rdd/NewCarbonDataLoadRDD.scala           |  2 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala    | 30 ++++++++++++
 .../index/bloom/BloomCoarseGrainIndexSuite.scala   | 54 +++++++++++++++-------
 5 files changed, 120 insertions(+), 24 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index ebc98dd..b8f0e76 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -89,7 +89,7 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("insert into table1 select 'xx', '2', 'china' union all select 'xx', 
'1', 'india'")
     sql("create index table1_index on table table1(id, country) as 
'carbondata' properties" +
         "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
-    checkAnswerWithoutSort(sql("select id, country from table1_index"),
+    checkAnswer(sql("select id, country from table1_index"),
       Seq(Row("1", "india"), Row("2", "china")))
     // check for valid sort_scope
     checkExistence(sql("describe formatted table1_index"), true, "Sort Scope 
global_sort")
@@ -112,7 +112,7 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("create index table11_index on table table11(id, country) as 
'carbondata' properties" +
         "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
     sql("insert into table11 select 'xx', '2', 'china' union all select 'xx', 
'1', 'india'")
-    checkAnswerWithoutSort(sql("select id, country from table11_index"),
+    checkAnswer(sql("select id, country from table11_index"),
       Seq(Row("1", "india"), Row("2", "china")))
     // check for valid sort_scope
     checkExistence(sql("describe formatted table11_index"), true, "Sort Scope 
global_sort")
@@ -128,7 +128,7 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
         "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
     sql("insert into partition_carbon_table select 'xx', '2', 'china', '2020' 
" +
         "union all select 'xx', '1', 'india', '2021'")
-    checkAnswerWithoutSort(sql("select id, country from 
partition_carbon_table_index"),
+    checkAnswer(sql("select id, country from partition_carbon_table_index"),
       Seq(Row("1", "india"), Row("2", "china")))
     // check for valid sort_scope
     checkExistence(sql("describe formatted partition_carbon_table_index"),
@@ -138,7 +138,7 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("create index partition_carbon_table_index on table 
partition_carbon_table(" +
         "id, country) as 'carbondata' properties" +
         "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
-    checkAnswerWithoutSort(sql("select id, country from 
partition_carbon_table_index"),
+    checkAnswer(sql("select id, country from partition_carbon_table_index"),
       Seq(Row("1", "india"), Row("2", "china")))
     // check for valid sort_scope
     checkExistence(sql("describe formatted partition_carbon_table_index"),
@@ -155,7 +155,7 @@ class TestSIWithSecondaryIndex extends QueryTest with 
BeforeAndAfterAll {
     sql("drop index if exists complextable_index_1 on complextable")
     sql("create index complextable_index_1 on table complextable(country, 
name) " +
         "as 'carbondata' properties('sort_scope'='global_sort', 
'Global_sort_partitions'='3')")
-    checkAnswerWithoutSort(sql("select country,name from 
complextable_index_1"),
+    checkAnswer(sql("select country,name from complextable_index_1"),
       Seq(Row("china", "b"), Row("china", "v"), Row("india", "v"), Row("pak", 
"v"), Row("us", "b")))
     // check for valid sort_scope
     checkExistence(sql("describe formatted complextable_index_1"), true, "Sort 
Scope global_sort")
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cb58953..50163b1 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, TaskContext}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
RDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, 
DataLoadWrapperRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession, 
SQLContext}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.command.{CompactionModel, 
ExecutionErrors, UpdateTableModel}
@@ -391,6 +391,12 @@ object CarbonDataRDDFactory {
                 carbonLoadModel,
                 hadoopConf,
                 segmentMetaDataAccumulator)
+            } else if (sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+              loadDataFrameForNoSort(sqlContext,
+                None,
+                Some(convertedRdd),
+                carbonLoadModel,
+                segmentMetaDataAccumulator)
             } else {
               loadDataFrame(sqlContext,
                 None,
@@ -1122,6 +1128,46 @@ object CarbonDataRDDFactory {
   }
 
   /**
+   * Execute load process to load from input DataFrame
+   *
+   * @param sqlContext sql context
+   * @param dataFrame optional DataFrame for insert
+   * @param scanResultRDD optional internal row rdd for direct insert
+   * @param carbonLoadModel load model
+   * @param segmentMetaDataAccumulator segment metadata accumulator
+   * @return Return an array of tuple of uniqueLoadStatusId and tuple of 
LoadMetadataDetails and
+   *         ExecutionErrors
+   */
+  def loadDataFrameForNoSort(
+      sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
+      carbonLoadModel: CarbonLoadModel,
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
+  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    try {
+      val newRdd = if (dataFrame.isDefined) {
+        new DataLoadWrapperRDD[Row](sqlContext.sparkSession, dataFrame.get.rdd)
+      } else {
+        // For internal row, no need of converter and re-arrange step,
+        carbonLoadModel.setLoadWithoutConverterWithoutReArrangeStep(true)
+        new DataLoadWrapperRDD[InternalRow](sqlContext.sparkSession, 
scanResultRDD.get)
+      }
+      new NewDataFrameLoaderRDD(
+        sqlContext.sparkSession,
+        new DataLoadResultImpl(),
+        carbonLoadModel,
+        newRdd,
+        segmentMetaDataAccumulator
+      ).collect()
+    } catch {
+      case ex: Exception =>
+        LOGGER.error("load data frame failed", ex)
+        throw ex
+    }
+  }
+
+  /**
    * Execute load process to load from input file path specified in 
`carbonLoadModel`
    */
   private def loadDataFile(
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ff78cd..9448ede 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -258,7 +258,7 @@ class NewDataFrameLoaderRDD[K, V](
     @transient private val ss: SparkSession,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    prev: DataLoadCoalescedRDD[_],
+    prev: CarbonRDD[_],
     segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]]
     ) extends CarbonRDD[(K, V)](ss, prev) {
 
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 2f22b0b..5cc9f2b 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -70,3 +70,33 @@ class DataLoadCoalescedRDD[T: ClassTag](
     partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
   }
 }
+
+class DataLoadWrapperRDD[T: ClassTag](@transient private val sparkSession: 
SparkSession,
+    @transient private var prev: RDD[T])
+  extends CarbonRDD[DataLoadPartitionWrap[T]](sparkSession, Nil) {
+
+  override def internalGetPartitions: Array[Partition] = {
+    prev.partitions
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(new OneToOneDependency[T](prev))
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
+
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+    new Iterator[DataLoadPartitionWrap[T]] {
+      var first = false
+      override def hasNext: Boolean = !first
+      override def next: DataLoadPartitionWrap[T] = {
+        first = true
+        DataLoadPartitionWrap(firstParent[T], split)
+      }
+    }
+  }
+}
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
index 47d8dd9..1b58ed3 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/index/bloom/BloomCoarseGrainIndexSuite.scala
@@ -17,12 +17,14 @@
 
 package org.apache.carbondata.index.bloom
 
-import java.io.{File, PrintWriter}
+import java.io.{BufferedWriter, File, FileWriter, PrintWriter}
 import java.util.UUID
 
+import scala.collection.mutable.ListBuffer
 import scala.util.Random
 
-import org.apache.spark.sql.{DataFrame, Row}
+import au.com.bytecode.opencsv.CSVWriter
+import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SparkSession}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -680,13 +682,16 @@ class BloomCoarseGrainIndexSuite extends QueryTest with 
BeforeAndAfterAll with B
          """.stripMargin)
 
     // load data into table (segment1)
-    sql(
-      s"""
-         | INSERT INTO TABLE $bloomSampleTable VALUES
-         | 
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',0,'S01','S02'),
-         | 
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',4,'S11','S12'),
-         | 
(102,'name2','city2',12,'s12','s22','s32','s42','s52','s62','s72','s82',5,'S21','S22')
-           """.stripMargin)
+    val csv = s"$resourcesPath/bloomCoarseGrainIndexSuiteLoad.csv"
+    val rows1 = new ListBuffer[Array[String]]
+    // scalastyle:off lineLength
+    rows1 += Array("100", "name0", "city0", "10", "s10", "s20", "s30", "s40", 
"s50", "s60", "s70", "s80", "0", "S01", "S02")
+    rows1 += Array("101", "name1", "city1", "11", "s11", "s21", "s31", "s41", 
"s51", "s61", "s71", "s81", "4", "S11", "S12")
+    rows1 += Array("102", "name2", "city2", "12", "s12", "s22", "s32", "s42", 
"s52", "s62", "s72", "s82", "5", "S21", "S22")
+    // scalastyle:on lineLength
+    createCSV(rows1, csv)
+    sql(s"""LOAD DATA LOCAL INPATH '$csv' INTO TABLE $bloomSampleTable 
OPTIONS('header'='false')"""
+      .stripMargin)
 
     // check data after columns added
     var res = sql(
@@ -707,14 +712,16 @@ class BloomCoarseGrainIndexSuite extends QueryTest with 
BeforeAndAfterAll with B
       """.stripMargin)
 
     // load data into table (segment2)
-    sql(
-      s"""
-         | INSERT INTO TABLE $bloomSampleTable VALUES
-         | 
(100,'name0','city0',10,'s10','s20','s30','s40','s50','s60','s70','s80',1,'S01','S02'),
-         | 
(101,'name1','city1',11,'s11','s21','s31','s41','s51','s61','s71','s81',2,'S11','S12'),
-         | 
(102,'name2','city1',12,'s12','s22','s32','s42','s52','s62','s72','s82',3,'S21','S22')
-           """.stripMargin)
-
+    val rows2 = new ListBuffer[Array[String]]
+    // scalastyle:off lineLength
+    rows2 += Array("100", "name0", "city0", "10", "s10", "s20", "s30", "s40", 
"s50", "s60", "s70", "s80", "1", "S01", "S02")
+    rows2 += Array("101", "name1", "city1", "11", "s11", "s21", "s31", "s41", 
"s51", "s61", "s71", "s81", "2", "S11", "S12")
+    rows2 += Array("102", "name2", "city2", "12", "s12", "s22", "s32", "s42", 
"s52", "s62", "s72", "s82", "3", "S21", "S22")
+    // scalastyle:on lineLength
+    createCSV(rows2, csv)
+    sql(s"""LOAD DATA LOCAL INPATH '$csv' INTO TABLE $bloomSampleTable 
OPTIONS('header'='false')"""
+      .stripMargin)
+    deleteFile(csv)
     var explainString = sql(
       s"""
          | explain SELECT id, name, num1, dictString
@@ -987,6 +994,19 @@ class BloomCoarseGrainIndexSuite extends QueryTest with 
BeforeAndAfterAll with B
         CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
   }
 
+  def createCSV(rows: ListBuffer[Array[String]], csvPath: String): Unit = {
+    val out = new BufferedWriter(new FileWriter(csvPath))
+    val writer: CSVWriter = new CSVWriter(out, ',', 
CSVWriter.NO_QUOTE_CHARACTER)
+    try {
+      for (row <- rows) {
+        writer.writeNext(row)
+      }
+    } finally {
+      out.close()
+      writer.close()
+    }
+  }
+
   private def createFile(fileName: String, line: Int = 10000, start: Int = 0) 
= {
     if (!new File(fileName).exists()) {
       val write = new PrintWriter(new File(fileName))

Reply via email to