ajantha-bhat commented on a change in pull request #3538: [CARBONDATA-3637] 
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378801215
 
 

 ##########
 File path: 
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ##########
 @@ -953,28 +983,46 @@ object CarbonDataRDDFactory {
 
   /**
    * Execute load process to load from input dataframe
+   *
+   * @param sqlContext sql context
+   * @param dataFrame optional dataframe for insert
+   * @param scanResultRDD optional internal row rdd for direct insert
+   * @param carbonLoadModel load model
+   * @return Return an array that contains all of the elements in 
NewDataFrameLoaderRDD.
    */
   private def loadDataFrame(
       sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
+      scanResultRDD: Option[RDD[InternalRow]],
       carbonLoadModel: CarbonLoadModel
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     try {
-      val rdd = dataFrame.get.rdd
-
+      val rdd = if (dataFrame.isDefined) {
+        dataFrame.get.rdd
+      } else {
+        scanResultRDD.get
+      }
       val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
         DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
       }.distinct.length
       val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
         nodeNumOfData,
         sqlContext.sparkContext)
-      val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd, 
nodes.toArray
-        .distinct)
-
+      val newRdd =
+        if (dataFrame.isDefined) {
+          new DataLoadCoalescedRDD[Row](
+            sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray.distinct)
+        } else {
+          new DataLoadCoalescedRDD[InternalRow](
+            sqlContext.sparkSession,
+            scanResultRDD.get,
+            nodes.toArray.distinct)
+        }
       new NewDataFrameLoaderRDD(
         sqlContext.sparkSession,
         new DataLoadResultImpl(),
         carbonLoadModel,
+        dataFrame.isDefined,
 
 Review comment:
   yes. done like that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to