This is an automated email from the ASF dual-hosted git repository.
qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 20f9a78 [CARBONDATA-4026] Fix Thread leakage while Loading
20f9a78 is described below
commit 20f9a78445fd74c1c7ece744f25b165ba74dfb8f
Author: haomarch <[email protected]>
AuthorDate: Mon Oct 12 02:56:04 2020 +0800
[CARBONDATA-4026] Fix Thread leakage while Loading
Why is this PR needed?
A few code of Loading/InsertStage won't shutdown executorservice. leads to
thread leakage which will degrade the performance of the driver and executor.
What changes were proposed in this PR?
Shutdown executorservices as soon as finish using them.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3976
---
.../core/statusmanager/StageInputCollector.java | 8 ++++-
.../carbon/flink/TestCarbonPartitionWriter.scala | 3 +-
.../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 34 +++++++++++++---------
.../management/CarbonInsertFromStageCommand.scala | 15 ++++++++--
.../carbondata/TestStreamingTableOpName.scala | 1 +
5 files changed, 43 insertions(+), 18 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java
index 0a3c35d..ba58852 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInputCollector.java
@@ -66,7 +66,13 @@ public class StageInputCollector {
if (stageInputFiles.size() > 0) {
int numThreads = Math.min(Math.max(stageInputFiles.size(), 1), 10);
ExecutorService executorService =
Executors.newFixedThreadPool(numThreads);
- return createInputSplits(executorService, stageInputFiles);
+ try {
+ return createInputSplits(executorService, stageInputFiles);
+ } finally {
+ if (executorService != null && !executorService.isShutdown()) {
+ executorService.shutdownNow();
+ }
+ }
} else {
return new ArrayList<>(0);
}
diff --git
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index fabf844..3b18931 100644
---
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -18,7 +18,7 @@
package org.apache.carbon.flink
import java.text.SimpleDateFormat
-import java.util.concurrent.Executors
+import java.util.concurrent.{Executors, TimeUnit}
import java.util.{Base64, Properties}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
@@ -202,6 +202,7 @@ class TestCarbonPartitionWriter extends QueryTest with
BeforeAndAfterAll{
}).get()
}
checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+ executorService.shutdownNow()
}
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index 695ee27..70a9dd8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import java.util.concurrent.Executors
+import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
import scala.collection.JavaConverters._
@@ -158,20 +158,26 @@ object CarbonMergeFilesRDD {
// remove all tmp folder of index files
val startDelete = System.currentTimeMillis()
val numThreads = Math.min(Math.max(partitionInfo.size(), 1), 10)
- val executorService = Executors.newFixedThreadPool(numThreads)
- val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
- partitionInfo
- .asScala
- .map { partitionPath =>
- executorService.submit(new Runnable {
- override def run(): Unit = {
- ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
- FileFactory.deleteAllCarbonFilesOfDir(
- FileFactory.getCarbonFile(partitionPath + "/" +
tempFolderPath))
- }
- })
+ val executorService: ExecutorService =
Executors.newFixedThreadPool(numThreads)
+ try {
+ val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+ partitionInfo
+ .asScala
+ .map { partitionPath =>
+ executorService.submit(new Runnable {
+ override def run(): Unit = {
+ ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+ FileFactory.deleteAllCarbonFilesOfDir(
+ FileFactory.getCarbonFile(partitionPath + "/" +
tempFolderPath))
+ }
+ })
+ }
+ .map(_.get())
+ } finally {
+ if (executorService != null && !executorService.isShutdown) {
+ executorService.shutdownNow()
}
- .map(_.get())
+ }
LOGGER.info("Time taken to remove partition files for all partitions: " +
(System.currentTimeMillis() - startDelete))
} else if (carbonTable.isHivePartitionTable) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index bf3d6d0..9fdfe0d 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.management
import java.io.{DataInputStream, File, InputStreamReader, IOException}
import java.util
import java.util.Collections
-import java.util.concurrent.{Callable, Executors, ExecutorService}
+import java.util.concurrent.{Callable, Executors, ExecutorService, TimeUnit}
import scala.collection.JavaConverters._
import scala.util.control.Breaks.{break, breakable}
@@ -164,6 +164,7 @@ case class CarbonInsertFromStageCommand(
} catch {
case ex: Throwable =>
LOGGER.error(s"failed to insert
${table.getDatabaseName}.${table.getTableName}", ex)
+ shutdownExecutorService(executorService)
throw ex
} finally {
lock.unlock()
@@ -190,6 +191,8 @@ case class CarbonInsertFromStageCommand(
case ex: Throwable =>
LOGGER.error(s"failed to insert
${table.getDatabaseName}.${table.getTableName}", ex)
throw ex
+ } finally {
+ shutdownExecutorService(executorService)
}
Seq.empty
}
@@ -227,6 +230,7 @@ case class CarbonInsertFromStageCommand(
throw new RuntimeException(s"Failed to lock table status for " +
s"${table.getDatabaseName}.${table.getTableName}")
}
+ var executorService: ExecutorService = null
try {
val segments = SegmentStatusManager.readTableStatusFile(
CarbonTablePath.getTableStatusFilePath(table.getTablePath)
@@ -243,7 +247,7 @@ case class CarbonInsertFromStageCommand(
LOGGER.info(s"Segment $segmentId is in SUCCESS state, about to
delete " +
s"${stageFileNames.length} stage files")
val numThreads = Math.min(Math.max(stageFileNames.length, 1), 10)
- val executorService = Executors.newFixedThreadPool(numThreads)
+ executorService = Executors.newFixedThreadPool(numThreads)
stageFileNames.map { fileName =>
executorService.submit(new Runnable {
override def run(): Unit = {
@@ -269,6 +273,7 @@ case class CarbonInsertFromStageCommand(
if (lock != null) {
lock.unlock()
}
+ shutdownExecutorService(executorService)
}
LOGGER.info(s"Finish recovery, delete snapshot file: $snapshotFilePath")
FileFactory.getCarbonFile(snapshotFilePath).delete()
@@ -761,6 +766,12 @@ case class CarbonInsertFromStageCommand(
}
}
+ private def shutdownExecutorService(executorService: ExecutorService): Unit
= {
+ if (executorService != null && !executorService.isShutdown) {
+ executorService.shutdownNow()
+ }
+ }
+
override protected def opName: String = "INSERT STAGE"
}
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
index b38626f..9814c39 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala
@@ -244,6 +244,7 @@ class TestStreamingTableOpName extends QueryTest with
BeforeAndAfterAll {
val msg = intercept[Exception] {
future.get()
}
+ pool.shutdownNow()
assert(msg.getMessage.contains("is not a streaming table"))
} finally {
if (server != null) {