This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new df7339c [CARBONDATA-3392] Make LRU mandatory for index server
df7339c is described below
commit df7339ce005be48dfb440e4cd02f640d6555e887
Author: kunal642 <[email protected]>
AuthorDate: Wed May 15 16:40:28 2019 +0530
[CARBONDATA-3392] Make LRU mandatory for index server
Background:
Currently LRU is optional for the user to configure, but this will raise
some concerns in case of index server because the invalid segments have to be
constantly removed from the cache in case of update/delete/compaction scenarios.
Therefore if clear segment job is failed then the job would not fail bu
there has to be a mechanism to prevent that segment from being in cache forever.
To prevent the above mentioned scenario LRU cache size for executor is a
mandatory property for the index server application.
This closes #3222
---
.../carbondata/core/datamap/DataMapUtil.java | 10 +++++-
.../carbondata/core/util/BlockletDataMapUtil.java | 2 +-
.../hadoop/api/CarbonTableInputFormat.java | 39 +++++++++++++---------
.../carbondata/indexserver/DataMapJobs.scala | 18 ----------
.../indexserver/DistributedPruneRDD.scala | 12 +++++--
.../carbondata/indexserver/IndexServer.scala | 19 +++++------
.../spark/rdd/CarbonDataRDDFactory.scala | 10 ++++--
.../sql/execution/command/cache/CacheUtil.scala | 15 +++++++--
.../command/cache/CarbonShowCacheCommand.scala | 23 ++++++++-----
9 files changed, 86 insertions(+), 62 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index e20f19a..2371a10 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -115,7 +115,15 @@ public class DataMapUtil {
DistributableDataMapFormat dataMapFormat = new
DistributableDataMapFormat(carbonTable,
validAndInvalidSegmentsInfo.getValidSegments(), invalidSegment, true,
dataMapToClear);
- dataMapJob.execute(dataMapFormat);
+ try {
+ dataMapJob.execute(dataMapFormat);
+ } catch (Exception e) {
+ if
(dataMapJob.getClass().getName().equalsIgnoreCase(DISTRIBUTED_JOB_NAME)) {
+ LOGGER.warn("Failed to clear distributed cache.", e);
+ } else {
+ throw e;
+ }
+ }
}
public static void executeClearDataMapJob(CarbonTable carbonTable, String
jobClassName)
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index c90c3dc..68aad72 100644
---
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -228,7 +228,7 @@ public class BlockletDataMapUtil {
List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
new ArrayList<>();
String mergeFilePath =
identifier.getIndexFilePath() + CarbonCommonConstants.FILE_SEPARATOR +
identifier
- .getMergeIndexFileName();
+ .getIndexFileName();
segmentIndexFileStore.readMergeFile(mergeFilePath);
List<String> indexFiles =
segmentIndexFileStore.getCarbonMergeFileToIndexFilesMap().get(mergeFilePath);
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index dd86dcb..274c7ef 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -557,22 +557,31 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
}
if (isIUDTable || isUpdateFlow) {
Map<String, Long> blockletToRowCountMap = new HashMap<>();
- if
(CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
- table.getTableName())) {
- List<InputSplit> extendedBlocklets =
CarbonTableInputFormat.convertToCarbonInputSplit(
- getDistributedSplit(table, null, partitions, filteredSegment,
- allSegments.getInvalidSegments(), toBeCleanedSegments));
- for (InputSplit extendedBlocklet : extendedBlocklets) {
- CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
- String filePath = blocklet.getFilePath();
- String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
- blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
- (long) blocklet.getDetailInfo().getRowCount());
+ if (CarbonProperties.getInstance()
+ .isDistributedPruningEnabled(table.getDatabaseName(),
table.getTableName())) {
+ try {
+ List<InputSplit> extendedBlocklets =
CarbonTableInputFormat.convertToCarbonInputSplit(
+ getDistributedSplit(table, null, partitions, filteredSegment,
+ allSegments.getInvalidSegments(), toBeCleanedSegments));
+ for (InputSplit extendedBlocklet : extendedBlocklets) {
+ CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
+ blockletToRowCountMap.put(blocklet.getSegmentId() + "," +
blocklet.getFilePath(),
+ (long) blocklet.getDetailInfo().getRowCount());
+ }
+ } catch (Exception e) {
+ // Check if fallback is disabled then directly throw exception
otherwise try driver
+ // pruning.
+ if (CarbonProperties.getInstance().isFallBackDisabled()) {
+ throw e;
+ }
+ TableDataMap defaultDataMap =
DataMapStoreManager.getInstance().getDefaultDataMap(table);
+ blockletToRowCountMap
+ .putAll(defaultDataMap.getBlockRowCount(filteredSegment,
partitions, defaultDataMap));
}
} else {
TableDataMap defaultDataMap =
DataMapStoreManager.getInstance().getDefaultDataMap(table);
- blockletToRowCountMap.putAll(
- defaultDataMap.getBlockRowCount(filteredSegment, partitions,
defaultDataMap));
+ blockletToRowCountMap
+ .putAll(defaultDataMap.getBlockRowCount(filteredSegment,
partitions, defaultDataMap));
}
// key is the (segmentId","+blockletPath) and key is the row count of
that blocklet
for (Map.Entry<String, Long> eachBlocklet :
blockletToRowCountMap.entrySet()) {
@@ -603,8 +612,8 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
}
} else {
long totalRowCount = 0L;
- if
(CarbonProperties.getInstance().isDistributedPruningEnabled(table.getDatabaseName(),
- table.getTableName())) {
+ if (CarbonProperties.getInstance()
+ .isDistributedPruningEnabled(table.getDatabaseName(),
table.getTableName())) {
List<InputSplit> extendedBlocklets =
CarbonTableInputFormat.convertToCarbonInputSplit(
getDistributedSplit(table, null, partitions, filteredSegment,
allSegments.getInvalidSegments(), new ArrayList<String>()));
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index b03beca..57bdf34 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -60,21 +60,3 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
}
}
-
-class DistributedClearCacheJob extends AbstractDataMapJob {
-
- val LOGGER: Logger =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- override def execute(dataMapFormat: DistributableDataMapFormat):
util.List[ExtendedBlocklet] = {
- if (LOGGER.isDebugEnabled) {
- val messageSize = SizeEstimator.estimate(dataMapFormat)
- LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
- }
- val (response, time) = logTime {
- IndexServer.getClient.invalidateCache(dataMapFormat)
- new util.ArrayList[ExtendedBlocklet]()
- }
- LOGGER.info(s"Time taken to get response from server: $time ms")
- response
- }
-}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index fd59e2b..d2dab2d 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.datamap.DistributableDataMapFormat
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.rdd.CarbonRDD
import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -60,7 +61,11 @@ private[indexserver] class DistributedPruneRDD(@transient
private val ss: SparkS
}
override protected def getPreferredLocations(split: Partition): Seq[String]
= {
- split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
+ if (split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations !=
null) {
+ split.asInstanceOf[DataMapRDDPartition].inputSplit.getLocations.toSeq
+ } else {
+ Seq()
+ }
}
override def internalCompute(split: Partition,
@@ -112,7 +117,10 @@ private[indexserver] class DistributedPruneRDD(@transient
private val ss: SparkS
override protected def internalGetPartitions: Array[Partition] = {
val job = Job.getInstance(FileFactory.getConfiguration)
val splits = dataMapFormat.getSplits(job).asScala
- if (dataMapFormat.isFallbackJob || splits.isEmpty) {
+ val isDistributedPruningEnabled = CarbonProperties.getInstance()
+
.isDistributedPruningEnabled(dataMapFormat.getCarbonTable.getDatabaseName,
+ dataMapFormat.getCarbonTable.getTableName)
+ if (!isDistributedPruningEnabled || dataMapFormat.isFallbackJob ||
splits.isEmpty) {
splits.zipWithIndex.map {
f => new DataMapRDDPartition(id, f._2, f._1)
}.toArray
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 194f4cb..e738fb3 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -45,11 +45,6 @@ trait ServerInterface {
def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
/**
- * Invalidate the cache for the provided table.
- */
- def invalidateCache(request: DistributableDataMapFormat): Unit
-
- /**
* Get the cache size for the specified table.
*/
def showCache(tableName: String) : Array[String]
@@ -85,6 +80,10 @@ object IndexServer extends ServerInterface {
private val numHandlers: Int =
CarbonProperties.getInstance().getNumberOfHandlersForIndexServer
+ private val isExecutorLRUConfigured: Boolean =
+ CarbonProperties.getInstance
+ .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE)
!= null
+
/**
* Getting sparkSession from ActiveSession because in case of embedded mode
the session would
* have already been created whereas in case of distributed mode the session
would be
@@ -103,15 +102,10 @@ object IndexServer extends ServerInterface {
def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
= doAs {
val splits = new DistributedPruneRDD(sparkSession, request).collect()
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
- splits.map(_._2)
- }
-
- override def invalidateCache(request: DistributableDataMapFormat): Unit =
doAs {
- val splits = new DistributedPruneRDD(sparkSession, request).collect()
- DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
if (request.isJobToClearDataMaps) {
DistributedRDDUtils.invalidateCache(request.getCarbonTable.getTableUniqueName)
}
+ splits.map(_._2)
}
override def invalidateSegmentCache(databaseName: String, tableName: String,
@@ -131,6 +125,9 @@ object IndexServer extends ServerInterface {
throw new RuntimeException(
s"Please set ${ CarbonCommonConstants.CARBON_ENABLE_INDEX_SERVER }" +
s" as true to use index server")
+ } else if (!isExecutorLRUConfigured) {
+ throw new RuntimeException(s"Executor LRU cache size is not set. Please
set using " +
+ s"${
CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE }")
} else {
createCarbonSession()
LOGGER.info("Starting Index Cache Server")
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 682e76c..f4422a8 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -257,8 +257,14 @@ object CarbonDataRDDFactory {
// Remove compacted segments from executor cache.
if (CarbonProperties.getInstance().isDistributedPruningEnabled(
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)) {
-
IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
- carbonLoadModel.getTableName, compactedSegments.asScala.toArray)
+ try {
+
IndexServer.getClient.invalidateSegmentCache(carbonLoadModel.getDatabaseName,
+ carbonLoadModel.getTableName,
compactedSegments.asScala.toArray)
+ } catch {
+ case ex: Exception =>
+ LOGGER.warn(s"Clear cache job has failed for ${carbonLoadModel
+ .getDatabaseName}.${carbonLoadModel.getTableName}", ex)
+ }
}
// giving the user his error for telling in the beeline if his
triggered table
// compaction is failed.
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
index 158bd1f..1707e78 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CacheUtil.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.execution.command.cache
-import org.apache.hadoop.mapred.JobConf
import scala.collection.JavaConverters._
+import org.apache.log4j.Logger
+
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.cache.CacheType
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
@@ -35,6 +37,8 @@ import
org.apache.carbondata.processing.merger.CarbonDataMergerUtil
object CacheUtil {
+ val LOGGER: Logger =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
/**
* Given a carbonTable, returns the list of all carbonindex files
*
@@ -51,8 +55,13 @@ object CacheUtil {
carbonTable.getTableName)) {
val invalidSegmentIds =
validAndInvalidSegmentsInfo.getInvalidSegments.asScala
.map(_.getSegmentNo).toArray
-
IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName,
carbonTable
- .getTableName, invalidSegmentIds)
+ try {
+
IndexServer.getClient.invalidateSegmentCache(carbonTable.getDatabaseName,
carbonTable
+ .getTableName, invalidSegmentIds)
+ } catch {
+ case e: Exception =>
+ LOGGER.warn("Failed to clear cache from executors. ", e)
+ }
}
validAndInvalidSegmentsInfo.getValidSegments.asScala.flatMap {
segment =>
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
index 6f6ddea..1c3af69 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonShowCacheCommand.scala
@@ -218,16 +218,21 @@ case class CarbonShowCacheCommand(tableIdentifier:
Option[TableIdentifier],
case None => ""
}
val (result, time) = CarbonScalaUtil.logTime {
- IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
- .groupBy(_.head).map { t =>
- var sum = 0L
- var length = 0
- t._2.foreach {
- arr =>
- sum += arr(2).toLong
- length += arr(1).toInt
+ try {
+ IndexServer.getClient.showCache(tableUniqueName).map(_.split(":"))
+ .groupBy(_.head).map { t =>
+ var sum = 0L
+ var length = 0
+ t._2.foreach {
+ arr =>
+ sum += arr(2).toLong
+ length += arr(1).toInt
+ }
+ (t._1, length, sum)
}
- (t._1, length, sum)
+ } catch {
+ case e: Exception =>
+ throw new RuntimeException("Failed to get Cache Information. ", e)
}
}
LOGGER.info(s"Time taken to get cache results from Index Server is $time
ms")