This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1cca062 [CARBONDATA-3919] Improve concurrent query performance
1cca062 is described below
commit 1cca06233b957aa6bf7874991248f5bce0670131
Author: ajantha-bhat <[email protected]>
AuthorDate: Wed Jul 22 16:43:15 2020 +0530
[CARBONDATA-3919] Improve concurrent query performance
Why is this PR needed?
1. when 500 queries executed concurrently.
checkIfRefreshIsNeeded method was synchronized. so only one thread was
working at a time.
But actually synchronization is required only when schema modified to drop
tables. Not
for whole function.
2. TokenCache.obtainTokensForNamenodes was causing a performance bottleneck
for concurrent
queries.so, removed it
What changes were proposed in this PR?
1. Synchronize only remove table part. Observed 500 query total performance
improved from
10s to 3 seconds in cluster.
2. Avoid calling the API.
This closes #3858
---
.../carbondata/hadoop/api/CarbonInputFormat.java | 4 ----
.../apache/spark/sql/hive/CarbonFileMetastore.scala | 20 ++++++++++++++++----
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 130e0d9..557fbfa 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.log4j.Logger;
/**
@@ -472,9 +471,6 @@ public abstract class CarbonInputFormat<T> extends
FileInputFormat<Void, T> {
QueryStatisticsRecorder recorder =
CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
- // get tokens for all the required FileSystem for table path
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { new Path(carbonTable.getTablePath()) },
job.getConfiguration());
List<ExtendedBlocklet> prunedBlocklets =
getPrunedBlocklets(job, carbonTable, expression, segmentIds,
invalidSegments,
segmentsToBeRefreshed);
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c9f78b5..b16579e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -67,7 +67,7 @@ private object CarbonFileMetastore {
final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
- localTimeStamp: Long): Boolean = synchronized {
+ localTimeStamp: Long): Boolean = {
val schemaFilePath =
CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath)
val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath)
if (schemaCarbonFile.exists()) {
@@ -81,9 +81,21 @@ private object CarbonFileMetastore {
case None => true
}
if (isSchemaModified) {
- CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
- .getCarbonTableIdentifier.getTableUniqueName)
- IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+ if (CarbonMetadata.getInstance()
+ .getCarbonTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableUniqueName) != null) {
+ synchronized {
+ if (CarbonMetadata.getInstance()
+ .getCarbonTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableUniqueName) != null) {
+ CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier.getTableUniqueName)
+
IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+ }
+ }
+ }
true
} else {
localTimeStamp != newTime