Repository: carbondata Updated Branches: refs/heads/master 410df072d -> 4c9af60d4
[CARBONDATA-1913][global_sort] Global Sort data dataload fails for big data with RPC timeout exception When gloabl sort option is used for big data then for some times it fails with RPC timeout after 120s.This is happening because the driver is not able to unpersist rdd cache with in 120s.The issue is happening due to rdd unpersist blocking call. Sometimes spark is not ableto unppersist the rdd in default spark.rpc.askTimeout or spark.network.timeout time.Clean cache only if persisted and keeping unpersist non-blocking. Non blocking will not wait for cache clean up task to finish and non-blocking call will not have any functional impact as spark automatically monitors the cache usage on each node and drops out old data partitions in a least-recently used (LRU) fashion. This closes #1685 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4c9af60d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4c9af60d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4c9af60d Branch: refs/heads/master Commit: 4c9af60d4af81687ce25bd76cc075c868a5e431c Parents: 410df07 Author: mohammadshahidkhan <[email protected]> Authored: Tue Dec 19 19:18:06 2017 +0530 Committer: kumarvishal <[email protected]> Committed: Wed Dec 20 18:36:52 2017 +0530 ---------------------------------------------------------------------- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4c9af60d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 2537a0c..781b484 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -136,8 +136,12 @@ object DataLoadProcessBuilderOnSpark { DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast, writeStepRowCounter)) - // clean cache - convertRDD.unpersist() + // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will + // not have any functional impact as spark automatically monitors the cache usage on each node + // and drops out old data partiotions in a least-recently used (LRU) fashion. + if (numPartitions > 1) { + convertRDD.unpersist(false) + } // Log the number of rows in each step LOGGER.info("Total rows processed in step Input Processor: " + inputStepRowCounter.value)
