QiangCai commented on a change in pull request #3538: [CARBONDATA-3637]
Optimize insert into flow
URL: https://github.com/apache/carbondata/pull/3538#discussion_r378724111
##########
File path:
integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -953,28 +983,46 @@ object CarbonDataRDDFactory {
/**
* Execute load process to load from input dataframe
+ *
+ * @param sqlContext sql context
+ * @param dataFrame optional dataframe for insert
+ * @param scanResultRDD optional internal row rdd for direct insert
+ * @param carbonLoadModel load model
+ * @return Return an array that contains all of the elements in
NewDataFrameLoaderRDD.
*/
private def loadDataFrame(
sqlContext: SQLContext,
dataFrame: Option[DataFrame],
+ scanResultRDD: Option[RDD[InternalRow]],
carbonLoadModel: CarbonLoadModel
): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
try {
- val rdd = dataFrame.get.rdd
-
+ val rdd = if (dataFrame.isDefined) {
+ dataFrame.get.rdd
+ } else {
+ scanResultRDD.get
+ }
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.length
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
nodeNumOfData,
sqlContext.sparkContext)
- val newRdd = new DataLoadCoalescedRDD[Row](sqlContext.sparkSession, rdd,
nodes.toArray
- .distinct)
-
+ val newRdd =
+ if (dataFrame.isDefined) {
+ new DataLoadCoalescedRDD[Row](
+ sqlContext.sparkSession, dataFrame.get.rdd, nodes.toArray.distinct)
+ } else {
+ new DataLoadCoalescedRDD[InternalRow](
+ sqlContext.sparkSession,
+ scanResultRDD.get,
+ nodes.toArray.distinct)
+ }
new NewDataFrameLoaderRDD(
sqlContext.sparkSession,
new DataLoadResultImpl(),
carbonLoadModel,
+ dataFrame.isDefined,
Review comment:
no need this parameter if loadDataFrame method
better to set carbonLoadModel.setLoadWithoutConverterWithoutReArrangeStep
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services