This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new e72c0fc [CARBONDATA-3492] Pre-priming cache
e72c0fc is described below
commit e72c0fc08d79e05974a503630fb38f54411afa4c
Author: vikramahuja1001 <[email protected]>
AuthorDate: Thu Sep 12 20:55:32 2019 +0530
[CARBONDATA-3492] Pre-priming cache
Implementation of the feature Pre-priming cache in index server
This closes #3420
---
.../core/constants/CarbonCommonConstants.java | 6 +++
.../carbondata/core/datamap/DataMapUtil.java | 2 +-
.../core/datamap/DistributableDataMapFormat.java | 19 ++++++--
.../carbondata/core/util/CarbonProperties.java | 9 ++++
.../carbondata/hadoop/api/CarbonInputFormat.java | 2 +-
.../org/apache/carbondata/events/Events.scala | 9 ++++
.../carbondata/events/IndexServerEvents.scala | 29 ++++++++++++
.../indexserver/DistributedCountRDD.scala | 5 ++
.../indexserver/DistributedRDDUtils.scala | 55 ++++++++++++++++++++++
.../carbondata/indexserver/IndexServer.scala | 41 ++++++++++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 13 +++--
.../spark/rdd/CarbonTableCompactor.scala | 21 ++++++++-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 2 +
.../command/indexserver/PrePrimingListener.scala | 46 ++++++++++++++++++
.../command/management/CarbonLoadDataCommand.scala | 18 +++++--
.../mutation/CarbonProjectForDeleteCommand.scala | 13 +++--
.../mutation/CarbonProjectForUpdateCommand.scala | 10 ++--
.../command/mutation/DeleteExecution.scala | 44 ++++++++++++-----
.../processing/datamap/DataMapWriterListener.java | 3 --
19 files changed, 305 insertions(+), 42 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ac77582..4a66fa8 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2207,6 +2207,12 @@ public final class CarbonCommonConstants {
public static final String CARBON_ENABLE_INDEX_SERVER =
"carbon.enable.index.server";
/**
+ * Configured property to enable/disable prepriming in index server
+ */
+ public static final String CARBON_INDEXSEVER_ENABLE_PREPRIMING =
+ "carbon.indexserver.enable.prepriming";
+
+ /**
* Property is used to enable/disable fallback for indexserver.
* Used for testing purposes only.
*/
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index bca7409..8e63449 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -250,7 +250,7 @@ public class DataMapUtil {
invalidSegmentNo.addAll(segmentsToBeRefreshed);
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(carbonTable, resolver, validSegments,
invalidSegmentNo,
- partitionsToPrune, false, level, isFallbackJob);
+ partitionsToPrune, false, level, isFallbackJob, false);
if (isCountJob) {
dataMapFormat.setCountStarJob();
dataMapFormat.setIsWriteToFile(false);
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index b430c5d..5af68b9 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -92,6 +92,9 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
private boolean isCountStarJob = false;
+ // Whether AsyncCall to the Index Server(true in the case of prepriming)
+ private boolean isAsyncCall;
+
DistributableDataMapFormat() {
}
@@ -100,14 +103,14 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
List<Segment> validSegments, List<String> invalidSegments, boolean
isJobToClearDataMaps,
String dataMapToClear) throws IOException {
this(table, null, validSegments, invalidSegments, null,
- isJobToClearDataMaps, null, false);
+ isJobToClearDataMaps, null, false, false);
this.dataMapToClear = dataMapToClear;
}
public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf
filterResolverIntf,
List<Segment> validSegments, List<String> invalidSegments,
List<PartitionSpec> partitions,
- boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean
isFallbackJob)
- throws IOException {
+ boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean
isFallbackJob,
+ boolean isAsyncCall) throws IOException {
this.table = table;
this.filterResolverIntf = filterResolverIntf;
this.validSegments = validSegments;
@@ -119,6 +122,7 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
this.isJobToClearDataMaps = isJobToClearDataMaps;
this.dataMapLevel = dataMapLevel;
this.isFallbackJob = isFallbackJob;
+ this.isAsyncCall = isAsyncCall;
}
@Override
@@ -242,6 +246,7 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
out.writeUTF(queryId);
out.writeBoolean(isWriteToFile);
out.writeBoolean(isCountStarJob);
+ out.writeBoolean(isAsyncCall);
}
@Override
@@ -288,6 +293,7 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
this.queryId = in.readUTF();
this.isWriteToFile = in.readBoolean();
this.isCountStarJob = in.readBoolean();
+ this.isAsyncCall = in.readBoolean();
}
private void initReadCommittedScope() throws IOException {
@@ -312,6 +318,13 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
}
/**
+ * @return Whether asyncCall to the IndexServer.
+ */
+ public boolean ifAsyncCall() {
+ return isAsyncCall;
+ }
+
+ /**
* @return Whether the job is to clear cached datamaps or not.
*/
public boolean isJobToClearDataMaps() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 80882e8..f77f4d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1676,6 +1676,15 @@ public final class CarbonProperties {
return isServerEnabledByUser;
}
+ /**
+ * Check if user has enabled/disabled the use of pre-priming for index server
+ */
+ public boolean isIndexServerPrePrimingEnabled() {
+ String configuredValue = carbonProperties.getProperty(
+ CarbonCommonConstants.CARBON_INDEXSEVER_ENABLE_PREPRIMING);
+ return Boolean.parseBoolean(configuredValue);
+ }
+
public String getIndexServerIP() {
return
carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_IP, "");
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 42fae67..3aad01c 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -422,7 +422,7 @@ m filterExpression
List<PartitionSpec> partitionNames, List<Segment> validSegments) throws
IOException {
DistributableDataMapFormat dataMapFormat =
new DistributableDataMapFormat(table, null, validSegments, new
ArrayList<String>(),
- partitionNames, false, null, false);
+ partitionNames, false, null, false, false);
dataMapFormat.setIsWriteToFile(false);
try {
DataMapJob dataMapJob =
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index e6b9213..ce78271 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -198,3 +198,12 @@ trait BuildDataMapEventsInfo {
val identifier: AbsoluteTableIdentifier
val dataMapNames: scala.collection.mutable.Seq[String]
}
+
+/**
+ * EventInfo for prepriming on IndexServer. This event is used to
+ * fire a call to the index serevr when the load is complete.
+ */
+trait IndexServerEventInfo {
+ val carbonTable: CarbonTable
+ val sparkSession: SparkSession
+}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
new file mode 100644
index 0000000..fb86cd3
--- /dev/null
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+// Event for Prepriming in cache
+case class IndexServerLoadEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ segment: List[Segment],
+ invalidsegment: List[String])
+ extends Event with IndexServerEventInfo
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
index 4a080fa..930586f 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -84,6 +84,11 @@ class DistributedCountRDD(@transient ss: SparkSession,
dataMapFormat: Distributa
} else {
0L
}
+ if (dataMapFormat.ifAsyncCall()) {
+ // to clear cache of invalid segments during pre-priming in index server
+
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
+ dataMapFormat.getInvalidSegments)
+ }
Iterator((executorIP + "_" + cacheSize.toString,
results.map(_._2.toLong).sum.toString))
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 4819779..233ad4d 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -22,13 +22,26 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.DFSClient.Conf
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.spark.Partition
+import org.apache.spark.sql.SparkSession
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import
org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope,
TableStatusReadCommittedScope}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
object DistributedRDDUtils {
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
// Segment number to executorNode mapping
val tableToExecutorMapping: ConcurrentHashMap[String,
ConcurrentHashMap[String, String]] =
new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]()
@@ -338,4 +351,46 @@ object DistributedRDDUtils {
formatter.format(new Date())
}
+ /**
+ * This function creates an event for prepriming of the index server
+ */
+ def triggerPrepriming(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ invalidSegments: Seq[String],
+ operationContext: OperationContext,
+ conf: Configuration,
+ segmentId: List[String]): Unit = {
+ if (carbonTable.isTransactionalTable) {
+ val readCommittedScope = new
TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+ carbonTable.getTablePath), conf)
+ val validSegments: Seq[Segment] = segmentId.map {
+ segmentToPrime =>
+ val loadDetailsForCurrentSegment = readCommittedScope
+
.getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
+
+ new Segment(segmentToPrime,
+ loadDetailsForCurrentSegment.getSegmentFile,
+ readCommittedScope,
+ loadDetailsForCurrentSegment)
+ }
+ val indexServerEnabled =
CarbonProperties.getInstance().isDistributedPruningEnabled(
+ carbonTable.getDatabaseName, carbonTable.getTableName)
+ val prePrimingEnabled =
CarbonProperties.getInstance().isIndexServerPrePrimingEnabled
+ if (indexServerEnabled && prePrimingEnabled) {
+ LOGGER.info(s" Loading segments for the table: ${
carbonTable.getTableName } in the cache")
+ val indexServerLoadEvent: IndexServerLoadEvent =
+ IndexServerLoadEvent(
+ sparkSession,
+ carbonTable,
+ validSegments.toList,
+ invalidSegments.toList
+ )
+ OperationListenerBus.getInstance().fireEvent(indexServerLoadEvent,
operationContext)
+ LOGGER.info(s" Segments for the table: ${ carbonTable.getTableName }
loaded in the cache")
+ } else {
+ LOGGER.info(s" Unable to load segments for the table: ${
carbonTable.getTableName }" +
+ s" in the cache")
+ }
+ }
+ }
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 75af667..926d2bd 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.indexserver
import java.net.InetSocketAddress
import java.security.PrivilegedAction
import java.util.UUID
+import java.util.concurrent.{Executors, ExecutorService}
import scala.collection.JavaConverters._
@@ -84,6 +85,14 @@ object IndexServer extends ServerInterface {
private val numHandlers: Int =
CarbonProperties.getInstance().getNumberOfHandlersForIndexServer
+ private lazy val indexServerExecutorService: Option[ExecutorService] = {
+ if (CarbonProperties.getInstance().isDistributedPruningEnabled("", "")) {
+ Some(Executors.newFixedThreadPool(1))
+ } else {
+ None
+ }
+ }
+
private val isExecutorLRUConfigured: Boolean =
CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE)
!= null
@@ -103,18 +112,34 @@ object IndexServer extends ServerInterface {
})
}
+ private def submitAsyncTask[T](t: => Unit): Unit = {
+ indexServerExecutorService.get.submit(new Runnable {
+ override def run(): Unit = {
+ t
+ }
+ })
+ }
+
def getCount(request: DistributableDataMapFormat): LongWritable = {
doAs {
- if (!request.isFallbackJob) {
- sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
request.getTaskGroupId)
- sparkSession.sparkContext
- .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+ lazy val getCountTask = {
+ if (!request.isFallbackJob) {
+ sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
request.getTaskGroupId)
+ sparkSession.sparkContext
+ .setLocalProperty("spark.job.description",
request.getTaskGroupDesc)
+ }
+ val splits = new DistributedCountRDD(sparkSession, request).collect()
+ if (!request.isFallbackJob) {
+ DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+ }
+ new LongWritable(splits.map(_._2.toLong).sum)
}
- val splits = new DistributedCountRDD(sparkSession, request).collect()
- if (!request.isFallbackJob) {
- DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+ if (request.ifAsyncCall) {
+ submitAsyncTask(getCountTask)
+ new LongWritable(0)
+ } else {
+ getCountTask
}
- new LongWritable(splits.map(_._2.toLong).sum)
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b289045..031f539 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -56,17 +56,18 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock,
LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier,
ColumnarFormatVersion, SegmentFileStore}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties,
CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus}
+import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails,
CSVInputFormat, StringArrayWritable}
@@ -591,6 +592,12 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
}
+ // code to handle Pre-Priming cache for loading
+
+ if (carbonLoadModel.getSegmentId != null) {
+ DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
carbonTable, Seq(),
+ operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
+ }
try {
// compaction handling
if (carbonTable.isHivePartitionTable) {
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index b0f90c7..0fed2ef 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -41,14 +41,15 @@ import
org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.datastore.row.CarbonRow
import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
-import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.events._
import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat, CarbonTableOutputFormat}
import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.FailureCauses
import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -340,6 +341,24 @@ class CarbonTableCompactor(carbonLoadModel:
CarbonLoadModel,
} else {
LOGGER.info(s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
+
+ // Prepriming index for compaction
+ val segmentsForPriming = if
(compactionType.equals(CompactionType.IUD_DELETE_DELTA) ||
+ compactionType.equals(CompactionType.IUD_UPDDEL_DELTA)) {
+ validSegments.asScala.map(_.getSegmentNo).toList
+ } else if (compactionType.equals(CompactionType.MAJOR) ||
+ compactionType.equals(CompactionType.MINOR) ||
+ compactionType.equals(CompactionType.CUSTOM)) {
+ scala.List(mergedLoadNumber)
+ } else {
+ scala.List()
+ }
+ DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
+ carbonTable,
+ validSegments.asScala.map(_.getSegmentNo).toList,
+ operationContext,
+ FileFactory.getConfiguration,
+ segmentsForPriming)
}
} else {
LOGGER.error(s"Compaction request failed for table " +
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index f2a52d2..3616cf6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import
org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTa
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.events.{MergeBloomIndexEventListener,
MergeIndexEventListener}
import org.apache.spark.sql.execution.command.cache._
+import
org.apache.spark.sql.execution.command.indexserver.PrePrimingEventListener
import org.apache.spark.sql.execution.command.mv._
import org.apache.spark.sql.execution.command.preaaggregate._
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
@@ -169,6 +170,7 @@ object CarbonEnv {
.addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
.addListener(classOf[DeleteFromTablePreEvent],
DeletePreAggregatePreListener)
.addListener(classOf[DeleteFromTablePreEvent],
DeletePreAggregatePreListener)
+ .addListener(classOf[IndexServerLoadEvent], PrePrimingEventListener)
.addListener(classOf[AlterTableDropColumnPreEvent],
PreAggregateDropColumnPreListener)
.addListener(classOf[AlterTableRenamePreEvent], RenameTablePreListener)
.addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
new file mode 100644
index 0000000..4d5e633
--- /dev/null
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.indexserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.datamap.{DistributableDataMapFormat, Segment}
+import org.apache.carbondata.events.{Event, IndexServerLoadEvent,
+ OperationContext, OperationEventListener}
+import org.apache.carbondata.indexserver.IndexServer
+
+// Listener for the PrePriming Event. This listener calls the index server
using an Asynccall
+object PrePrimingEventListener extends OperationEventListener {
+ override def onEvent(event: Event,
+ operationContext: OperationContext): Unit = {
+ val prePrimingEvent = event.asInstanceOf[IndexServerLoadEvent]
+ val carbonTable = prePrimingEvent.carbonTable
+ val dataMapFormat = new DistributableDataMapFormat(carbonTable,
+ null,
+ prePrimingEvent.segment.asJava,
+ prePrimingEvent.invalidsegment.asJava,
+ null,
+ false,
+ null,
+ false,
+ true)
+ if (prePrimingEvent.segment.length != 0) {
+ IndexServer.getClient.getCount(dataMapFormat)
+ }
+ }
+}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b2f9a1e..280eb12 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -51,23 +51,25 @@ import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants,
CarbonLoadOptionConstants, SortScopeOptions}
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.{DictionaryServer,
NonSecureDictionaryServer}
import
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier,
SegmentFileStore}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable,
TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent,
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent,
BuildDataMapPreExecutionEvent, IndexServerLoadEvent, OperationContext,
OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.indexserver.DistributedRDDUtils
import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum,
TableProcessingOperations}
import
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent,
LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -866,6 +868,16 @@ case class CarbonLoadDataCommand(
case _ =>
}
}
+
+ // Prepriming for Partition table here
+ if (carbonLoadModel.getSegmentId != null) {
+ DistributedRDDUtils.triggerPrepriming(sparkSession,
+ table,
+ Seq(),
+ operationContext,
+ hadoopConf,
+ List(carbonLoadModel.getSegmentId))
+ }
}
try {
val compactedSegments = new util.ArrayList[String]()
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 45c73ac..87e6d01 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -27,15 +27,20 @@ import org.apache.spark.sql.types.LongType
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{DeleteFromTablePostEvent,
DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{DeleteFromTablePostEvent,
DeleteFromTablePreEvent,
+ IndexServerLoadEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.loading.FailureCauses
@@ -128,8 +133,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
isUpdateOperation = false)
- DeleteExecution.clearDistributedSegmentCache(carbonTable,
deletedSegments)(sparkSession)
-
val allDataMapSchemas = DataMapStoreManager.getInstance
.getDataMapSchemasOfTable(carbonTable).asScala
.filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -138,6 +141,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
DataMapStatusManager.truncateDataMap(allDataMapSchemas)
}
+ // prepriming for delete command
+ DeleteExecution.reloadDistributedSegmentCache(carbonTable,
+ deletedSegments, operationContext)(sparkSession)
+
// trigger post event for Delete from table
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
DeleteFromTablePostEvent(sparkSession, carbonTable)
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 686990d..a574569 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -33,14 +33,17 @@ import
org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus,
UpdateTablePostEvent, UpdateTablePreEvent}
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.loading.FailureCauses
@@ -166,8 +169,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
executionErrors,
segmentsToBeDeleted)
- DeleteExecution
- .clearDistributedSegmentCache(carbonTable,
segmentsToBeDeleted)(sparkSession)
+ // prepriming for update command
+ DeleteExecution.reloadDistributedSegmentCache(carbonTable,
+ segmentsToBeDeleted, operationContext)(sparkSession)
} else {
throw new ConcurrentOperationException(carbonTable, "compaction",
"update")
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 08b0dd0..625f594 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,12 +41,13 @@ import
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil,
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil,
ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext,
OperationListenerBus}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat,
CarbonTableInputFormat}
-import org.apache.carbondata.indexserver.IndexServer
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -328,18 +329,35 @@ object DeleteExecution {
(segmentsTobeDeleted, operatedRowCount)
}
- def clearDistributedSegmentCache(carbonTable: CarbonTable,
- segmentsToBeCleared: Seq[Segment])(sparkSession: SparkSession): Unit = {
- if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
- .getDatabaseName, carbonTable.getTableName)) {
- try {
- IndexServer.getClient
- .invalidateSegmentCache(carbonTable,
segmentsToBeCleared.map(_.getSegmentNo)
- .toArray, SparkSQLUtil.getTaskGroupId(sparkSession))
- } catch {
- case _: Exception =>
- LOGGER.warn(s"Clearing of invalid segments for ${
- carbonTable.getTableUniqueName} has failed")
+ /**
+ * This function fires an event for pre-priming in the index server
+ */
+ def reloadDistributedSegmentCache(carbonTable: CarbonTable, deletedSegments:
Seq[Segment],
+ operationContext: OperationContext)(sparkSession: SparkSession): Unit = {
+ if (carbonTable.isTransactionalTable) {
+ val readCommittedScope = new
TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+ carbonTable.getTablePath), FileFactory.getConfiguration)
+ deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
+ val indexServerEnabled =
CarbonProperties.getInstance().isDistributedPruningEnabled(
+ carbonTable.getDatabaseName, carbonTable.getTableName)
+ val prePrimingEnabled =
CarbonProperties.getInstance().isIndexServerPrePrimingEnabled()
+ if (indexServerEnabled && prePrimingEnabled) {
+ LOGGER.info(s"Loading segments for table: ${ carbonTable.getTableName
} in the cache")
+ val indexServerLoadEvent: IndexServerLoadEvent =
+ IndexServerLoadEvent(
+ sparkSession,
+ carbonTable,
+ deletedSegments.toList,
+ deletedSegments.map(_.getSegmentNo).toList
+ )
+ OperationListenerBus.getInstance().fireEvent(indexServerLoadEvent,
operationContext)
+ LOGGER.info(s"Segments for table: ${
+ carbonTable
+ .getTableName
+ } has been loaded in the cache")
+ } else {
+ LOGGER.info(
+ s"Segments for table:" + s" ${ carbonTable.getTableName } not loaded
in the cache")
}
}
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 54a2b71..0aa69f5 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -62,9 +62,6 @@ public class DataMapWriterListener {
*/
public void registerAllWriter(CarbonTable carbonTable, String segmentId,
String taskNo, SegmentProperties segmentProperties) {
- // clear cache in executor side
- DataMapStoreManager.getInstance()
- .clearDataMaps(carbonTable.getCarbonTableIdentifier());
List<TableDataMap> tableIndices;
try {
tableIndices =
DataMapStoreManager.getInstance().getAllDataMap(carbonTable);