Repository: carbondata
Updated Branches:
  refs/heads/master 410df072d -> 4c9af60d4


[CARBONDATA-1913][global_sort] Global Sort data dataload fails for big data 
with RPC timeout exception

When gloabl sort option is used for big data then for some times it fails with 
RPC timeout after 120s.This is happening because the driver is not able to 
unpersist rdd cache with in 120s.The issue is happening due to rdd unpersist 
blocking call. Sometimes spark is not ableto unppersist the rdd in default 
spark.rpc.askTimeout or spark.network.timeout time.Clean cache only if 
persisted and keeping unpersist non-blocking. Non blocking will not wait for 
cache clean up task to finish and non-blocking call will not have any 
functional impact as spark automatically monitors the cache usage on each node 
and drops out old data partitions in a least-recently used (LRU) fashion.

This closes #1685


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c9af60d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c9af60d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c9af60d

Branch: refs/heads/master
Commit: 4c9af60d4af81687ce25bd76cc075c868a5e431c
Parents: 410df07
Author: mohammadshahidkhan <[email protected]>
Authored: Tue Dec 19 19:18:06 2017 +0530
Committer: kumarvishal <[email protected]>
Committed: Wed Dec 20 18:36:52 2017 +0530

----------------------------------------------------------------------
 .../spark/load/DataLoadProcessBuilderOnSpark.scala           | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9af60d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 2537a0c..781b484 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -136,8 +136,12 @@ object DataLoadProcessBuilderOnSpark {
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, 
modelBroadcast,
         writeStepRowCounter))
 
-    // clean cache
-    convertRDD.unpersist()
+    // clean cache only if persisted and keeping unpersist non-blocking as 
non-blocking call will
+    // not have any functional impact as spark automatically monitors the 
cache usage on each node
+    // and drops out old data partiotions in a least-recently used (LRU) 
fashion.
+    if (numPartitions > 1) {
+      convertRDD.unpersist(false)
+    }
 
     // Log the number of rows in each step
     LOGGER.info("Total rows processed in step Input Processor: " + 
inputStepRowCounter.value)

Reply via email to