This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e72c0fc  [CARBONDATA-3492] Pre-priming cache
e72c0fc is described below

commit e72c0fc08d79e05974a503630fb38f54411afa4c
Author: vikramahuja1001 <[email protected]>
AuthorDate: Thu Sep 12 20:55:32 2019 +0530

    [CARBONDATA-3492] Pre-priming cache
    
    Implementation of the feature Pre-priming cache in index server
    
    This closes #3420
---
 .../core/constants/CarbonCommonConstants.java      |  6 +++
 .../carbondata/core/datamap/DataMapUtil.java       |  2 +-
 .../core/datamap/DistributableDataMapFormat.java   | 19 ++++++--
 .../carbondata/core/util/CarbonProperties.java     |  9 ++++
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  2 +-
 .../org/apache/carbondata/events/Events.scala      |  9 ++++
 .../carbondata/events/IndexServerEvents.scala      | 29 ++++++++++++
 .../indexserver/DistributedCountRDD.scala          |  5 ++
 .../indexserver/DistributedRDDUtils.scala          | 55 ++++++++++++++++++++++
 .../carbondata/indexserver/IndexServer.scala       | 41 ++++++++++++----
 .../spark/rdd/CarbonDataRDDFactory.scala           | 13 +++--
 .../spark/rdd/CarbonTableCompactor.scala           | 21 ++++++++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |  2 +
 .../command/indexserver/PrePrimingListener.scala   | 46 ++++++++++++++++++
 .../command/management/CarbonLoadDataCommand.scala | 18 +++++--
 .../mutation/CarbonProjectForDeleteCommand.scala   | 13 +++--
 .../mutation/CarbonProjectForUpdateCommand.scala   | 10 ++--
 .../command/mutation/DeleteExecution.scala         | 44 ++++++++++++-----
 .../processing/datamap/DataMapWriterListener.java  |  3 --
 19 files changed, 305 insertions(+), 42 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ac77582..4a66fa8 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2207,6 +2207,12 @@ public final class CarbonCommonConstants {
   public static final String CARBON_ENABLE_INDEX_SERVER = 
"carbon.enable.index.server";
 
   /**
+   * Configured property to enable/disable prepriming in index server
+   */
+  public static final String CARBON_INDEXSEVER_ENABLE_PREPRIMING =
+          "carbon.indexserver.enable.prepriming";
+
+  /**
    * Property is used to enable/disable fallback for indexserver.
    * Used for testing purposes only.
    */
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
index bca7409..8e63449 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapUtil.java
@@ -250,7 +250,7 @@ public class DataMapUtil {
     invalidSegmentNo.addAll(segmentsToBeRefreshed);
     DistributableDataMapFormat dataMapFormat =
         new DistributableDataMapFormat(carbonTable, resolver, validSegments, 
invalidSegmentNo,
-            partitionsToPrune, false, level, isFallbackJob);
+            partitionsToPrune, false, level, isFallbackJob, false);
     if (isCountJob) {
       dataMapFormat.setCountStarJob();
       dataMapFormat.setIsWriteToFile(false);
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index b430c5d..5af68b9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -92,6 +92,9 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
 
   private boolean isCountStarJob = false;
 
+  // Whether AsyncCall to the Index Server(true in the case of prepriming)
+  private boolean isAsyncCall;
+
   DistributableDataMapFormat() {
 
   }
@@ -100,14 +103,14 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
       List<Segment> validSegments, List<String> invalidSegments, boolean 
isJobToClearDataMaps,
       String dataMapToClear) throws IOException {
     this(table, null, validSegments, invalidSegments, null,
-        isJobToClearDataMaps, null, false);
+        isJobToClearDataMaps, null, false, false);
     this.dataMapToClear = dataMapToClear;
   }
 
   public DistributableDataMapFormat(CarbonTable table, FilterResolverIntf 
filterResolverIntf,
       List<Segment> validSegments, List<String> invalidSegments, 
List<PartitionSpec> partitions,
-      boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean 
isFallbackJob)
-      throws IOException {
+      boolean isJobToClearDataMaps, DataMapLevel dataMapLevel, boolean 
isFallbackJob,
+      boolean isAsyncCall) throws IOException {
     this.table = table;
     this.filterResolverIntf = filterResolverIntf;
     this.validSegments = validSegments;
@@ -119,6 +122,7 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     this.isJobToClearDataMaps = isJobToClearDataMaps;
     this.dataMapLevel = dataMapLevel;
     this.isFallbackJob = isFallbackJob;
+    this.isAsyncCall = isAsyncCall;
   }
 
   @Override
@@ -242,6 +246,7 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     out.writeUTF(queryId);
     out.writeBoolean(isWriteToFile);
     out.writeBoolean(isCountStarJob);
+    out.writeBoolean(isAsyncCall);
   }
 
   @Override
@@ -288,6 +293,7 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     this.queryId = in.readUTF();
     this.isWriteToFile = in.readBoolean();
     this.isCountStarJob = in.readBoolean();
+    this.isAsyncCall = in.readBoolean();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -312,6 +318,13 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
   }
 
   /**
+   * @return Whether asyncCall to the IndexServer.
+   */
+  public boolean ifAsyncCall() {
+    return isAsyncCall;
+  }
+
+  /**
    * @return Whether the job is to clear cached datamaps or not.
    */
   public boolean isJobToClearDataMaps() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 80882e8..f77f4d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1676,6 +1676,15 @@ public final class CarbonProperties {
     return isServerEnabledByUser;
   }
 
+  /**
+   * Check if user has enabled/disabled the use of pre-priming for index server
+   */
+  public boolean isIndexServerPrePrimingEnabled() {
+    String configuredValue = carbonProperties.getProperty(
+            CarbonCommonConstants.CARBON_INDEXSEVER_ENABLE_PREPRIMING);
+    return Boolean.parseBoolean(configuredValue);
+  }
+
   public String getIndexServerIP() {
     return 
carbonProperties.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_IP, "");
   }
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 42fae67..3aad01c 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -422,7 +422,7 @@ m filterExpression
       List<PartitionSpec> partitionNames, List<Segment> validSegments) throws 
IOException {
     DistributableDataMapFormat dataMapFormat =
         new DistributableDataMapFormat(table, null, validSegments, new 
ArrayList<String>(),
-            partitionNames, false, null, false);
+            partitionNames, false, null, false, false);
     dataMapFormat.setIsWriteToFile(false);
     try {
       DataMapJob dataMapJob =
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index e6b9213..ce78271 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -198,3 +198,12 @@ trait BuildDataMapEventsInfo {
   val identifier: AbsoluteTableIdentifier
   val dataMapNames: scala.collection.mutable.Seq[String]
 }
+
+/**
+ * EventInfo for prepriming on IndexServer. This event is used to
+ * fire a call to the index serevr when the load is complete.
+ */
+trait IndexServerEventInfo {
+  val carbonTable: CarbonTable
+  val sparkSession: SparkSession
+}
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
new file mode 100644
index 0000000..fb86cd3
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IndexServerEvents.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+// Event for Prepriming in cache
+case class IndexServerLoadEvent(sparkSession: SparkSession,
+    carbonTable: CarbonTable,
+    segment: List[Segment],
+    invalidsegment: List[String])
+  extends Event with IndexServerEventInfo
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
index 4a080fa..930586f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedCountRDD.scala
@@ -84,6 +84,11 @@ class DistributedCountRDD(@transient ss: SparkSession, 
dataMapFormat: Distributa
     } else {
       0L
     }
+    if (dataMapFormat.ifAsyncCall()) {
+      // to clear cache of invalid segments during pre-priming in index server
+      
DataMapStoreManager.getInstance().clearInvalidSegments(dataMapFormat.getCarbonTable,
+        dataMapFormat.getInvalidSegments)
+    }
     Iterator((executorIP + "_" + cacheSize.toString, 
results.map(_._2.toLong).sum.toString))
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 4819779..233ad4d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -22,13 +22,26 @@ import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.DFSClient.Conf
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.spark.Partition
+import org.apache.spark.sql.SparkSession
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import 
org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, 
TableStatusReadCommittedScope}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
 
 object DistributedRDDUtils {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
   // Segment number to executorNode mapping
   val tableToExecutorMapping: ConcurrentHashMap[String, 
ConcurrentHashMap[String, String]] =
     new ConcurrentHashMap[String, ConcurrentHashMap[String, String]]()
@@ -338,4 +351,46 @@ object DistributedRDDUtils {
     formatter.format(new Date())
   }
 
+  /**
+   * This function creates an event for prepriming of the index server
+   */
+  def triggerPrepriming(sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      invalidSegments: Seq[String],
+      operationContext: OperationContext,
+      conf: Configuration,
+      segmentId: List[String]): Unit = {
+    if (carbonTable.isTransactionalTable) {
+      val readCommittedScope = new 
TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+        carbonTable.getTablePath), conf)
+      val validSegments: Seq[Segment] = segmentId.map {
+        segmentToPrime =>
+          val loadDetailsForCurrentSegment = readCommittedScope
+            
.getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
+
+          new Segment(segmentToPrime,
+            loadDetailsForCurrentSegment.getSegmentFile,
+            readCommittedScope,
+            loadDetailsForCurrentSegment)
+      }
+      val indexServerEnabled = 
CarbonProperties.getInstance().isDistributedPruningEnabled(
+        carbonTable.getDatabaseName, carbonTable.getTableName)
+      val prePrimingEnabled = 
CarbonProperties.getInstance().isIndexServerPrePrimingEnabled
+      if (indexServerEnabled && prePrimingEnabled) {
+        LOGGER.info(s" Loading segments for the table: ${ 
carbonTable.getTableName } in the cache")
+        val indexServerLoadEvent: IndexServerLoadEvent =
+          IndexServerLoadEvent(
+            sparkSession,
+            carbonTable,
+            validSegments.toList,
+            invalidSegments.toList
+          )
+        OperationListenerBus.getInstance().fireEvent(indexServerLoadEvent, 
operationContext)
+        LOGGER.info(s" Segments for the table: ${ carbonTable.getTableName } 
loaded in the cache")
+      } else {
+        LOGGER.info(s" Unable to load segments for the table: ${ 
carbonTable.getTableName }" +
+                    s" in the cache")
+      }
+    }
+  }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index 75af667..926d2bd 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.indexserver
 import java.net.InetSocketAddress
 import java.security.PrivilegedAction
 import java.util.UUID
+import java.util.concurrent.{Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
 
@@ -84,6 +85,14 @@ object IndexServer extends ServerInterface {
 
   private val numHandlers: Int = 
CarbonProperties.getInstance().getNumberOfHandlersForIndexServer
 
+  private lazy val indexServerExecutorService: Option[ExecutorService] = {
+    if (CarbonProperties.getInstance().isDistributedPruningEnabled("", "")) {
+      Some(Executors.newFixedThreadPool(1))
+    } else {
+      None
+    }
+  }
+
   private val isExecutorLRUConfigured: Boolean =
     CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE) 
!= null
@@ -103,18 +112,34 @@ object IndexServer extends ServerInterface {
     })
   }
 
+  private def submitAsyncTask[T](t: => Unit): Unit = {
+    indexServerExecutorService.get.submit(new Runnable {
+      override def run(): Unit = {
+        t
+      }
+    })
+  }
+
   def getCount(request: DistributableDataMapFormat): LongWritable = {
     doAs {
-      if (!request.isFallbackJob) {
-        sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
request.getTaskGroupId)
-        sparkSession.sparkContext
-          .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
+      lazy val getCountTask = {
+        if (!request.isFallbackJob) {
+          sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
request.getTaskGroupId)
+          sparkSession.sparkContext
+            .setLocalProperty("spark.job.description", 
request.getTaskGroupDesc)
+        }
+        val splits = new DistributedCountRDD(sparkSession, request).collect()
+        if (!request.isFallbackJob) {
+          DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+        }
+        new LongWritable(splits.map(_._2.toLong).sum)
       }
-      val splits = new DistributedCountRDD(sparkSession, request).collect()
-      if (!request.isFallbackJob) {
-        DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
+      if (request.ifAsyncCall) {
+        submitAsyncTask(getCountTask)
+        new LongWritable(0)
+      } else {
+        getCountTask
       }
-      new LongWritable(splits.map(_._2.toLong).sum)
     }
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b289045..031f539 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -56,17 +56,18 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, 
ColumnarFormatVersion, SegmentFileStore}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, 
CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
+import org.apache.carbondata.indexserver.{DistributedRDDUtils, IndexServer}
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, 
CSVInputFormat, StringArrayWritable}
@@ -591,6 +592,12 @@ object CarbonDataRDDFactory {
                      s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
       }
 
+      // code to handle Pre-Priming cache for loading
+
+      if (carbonLoadModel.getSegmentId != null) {
+        DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
+          operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
+      }
       try {
         // compaction handling
         if (carbonTable.isHivePartitionTable) {
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index b0f90c7..0fed2ef 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -41,14 +41,15 @@ import 
org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{StructField, StructType}
-import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.CarbonProjection
+import org.apache.carbondata.indexserver.DistributedRDDUtils
 import org.apache.carbondata.processing.loading.FailureCauses
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -340,6 +341,24 @@ class CarbonTableCompactor(carbonLoadModel: 
CarbonLoadModel,
       } else {
         LOGGER.info(s"Compaction request completed for table " +
                     s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+
+        // Prepriming index for compaction
+        val segmentsForPriming = if 
(compactionType.equals(CompactionType.IUD_DELETE_DELTA) ||
+            compactionType.equals(CompactionType.IUD_UPDDEL_DELTA)) {
+            validSegments.asScala.map(_.getSegmentNo).toList
+        } else if (compactionType.equals(CompactionType.MAJOR) ||
+                   compactionType.equals(CompactionType.MINOR) ||
+                   compactionType.equals(CompactionType.CUSTOM)) {
+            scala.List(mergedLoadNumber)
+        } else {
+          scala.List()
+        }
+        DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession,
+          carbonTable,
+          validSegments.asScala.map(_.getSegmentNo).toList,
+          operationContext,
+          FileFactory.getConfiguration,
+          segmentsForPriming)
       }
     } else {
       LOGGER.error(s"Compaction request failed for table " +
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index f2a52d2..3616cf6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,6 +24,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTa
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.events.{MergeBloomIndexEventListener, 
MergeIndexEventListener}
 import org.apache.spark.sql.execution.command.cache._
+import 
org.apache.spark.sql.execution.command.indexserver.PrePrimingEventListener
 import org.apache.spark.sql.execution.command.mv._
 import org.apache.spark.sql.execution.command.preaaggregate._
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction
@@ -169,6 +170,7 @@ object CarbonEnv {
       .addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
       .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
       .addListener(classOf[DeleteFromTablePreEvent], 
DeletePreAggregatePreListener)
+      .addListener(classOf[IndexServerLoadEvent], PrePrimingEventListener)
       .addListener(classOf[AlterTableDropColumnPreEvent], 
PreAggregateDropColumnPreListener)
       .addListener(classOf[AlterTableRenamePreEvent], RenameTablePreListener)
       .addListener(classOf[AlterTableColRenameAndDataTypeChangePreEvent],
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
new file mode 100644
index 0000000..4d5e633
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/indexserver/PrePrimingListener.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.indexserver
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.datamap.{DistributableDataMapFormat, Segment}
+import org.apache.carbondata.events.{Event, IndexServerLoadEvent,
+  OperationContext, OperationEventListener}
+import org.apache.carbondata.indexserver.IndexServer
+
+// Listener for the PrePriming Event. This listener calls the index server 
using an Asynccall
+object PrePrimingEventListener extends OperationEventListener {
+  override def onEvent(event: Event,
+      operationContext: OperationContext): Unit = {
+    val prePrimingEvent = event.asInstanceOf[IndexServerLoadEvent]
+    val carbonTable = prePrimingEvent.carbonTable
+    val dataMapFormat = new DistributableDataMapFormat(carbonTable,
+      null,
+      prePrimingEvent.segment.asJava,
+      prePrimingEvent.invalidsegment.asJava,
+      null,
+      false,
+      null,
+      false,
+      true)
+    if (prePrimingEvent.segment.length != 0) {
+      IndexServer.getClient.getCount(dataMapFormat)
+    }
+  }
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b2f9a1e..280eb12 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -51,23 +51,25 @@ import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants, SortScopeOptions}
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, 
NonSecureDictionaryServer}
 import 
org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
SegmentFileStore}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, 
BuildDataMapPreExecutionEvent, IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
+import org.apache.carbondata.indexserver.DistributedRDDUtils
 import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, 
TableProcessingOperations}
 import 
org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, 
LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -866,6 +868,16 @@ case class CarbonLoadDataCommand(
           case _ =>
         }
       }
+
+      // Prepriming for Partition table here
+      if (carbonLoadModel.getSegmentId != null) {
+        DistributedRDDUtils.triggerPrepriming(sparkSession,
+          table,
+          Seq(),
+          operationContext,
+          hadoopConf,
+          List(carbonLoadModel.getSegmentId))
+      }
     }
     try {
       val compactedSegments = new util.ArrayList[String]()
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 45c73ac..87e6d01 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -27,15 +27,20 @@ import org.apache.spark.sql.types.LongType
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{DeleteFromTablePostEvent, 
DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{DeleteFromTablePostEvent, 
DeleteFromTablePreEvent,
+  IndexServerLoadEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 
@@ -128,8 +133,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
       HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
         isUpdateOperation = false)
 
-      DeleteExecution.clearDistributedSegmentCache(carbonTable, 
deletedSegments)(sparkSession)
-
       val allDataMapSchemas = DataMapStoreManager.getInstance
         .getDataMapSchemasOfTable(carbonTable).asScala
         .filter(dataMapSchema => null != dataMapSchema.getRelationIdentifier &&
@@ -138,6 +141,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
         DataMapStatusManager.truncateDataMap(allDataMapSchemas)
       }
 
+      // prepriming for delete command
+      DeleteExecution.reloadDistributedSegmentCache(carbonTable,
+        deletedSegments, operationContext)(sparkSession)
+
       // trigger post event for Delete from table
       val deleteFromTablePostEvent: DeleteFromTablePostEvent =
         DeleteFromTablePostEvent(sparkSession, carbonTable)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 686990d..a574569 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -33,14 +33,17 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus, 
UpdateTablePostEvent, UpdateTablePreEvent}
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 
@@ -166,8 +169,9 @@ private[sql] case class CarbonProjectForUpdateCommand(
             executionErrors,
             segmentsToBeDeleted)
 
-          DeleteExecution
-            .clearDistributedSegmentCache(carbonTable, 
segmentsToBeDeleted)(sparkSession)
+          // prepriming for update command
+          DeleteExecution.reloadDistributedSegmentCache(carbonTable,
+            segmentsToBeDeleted, operationContext)(sparkSession)
 
         } else {
           throw new ConcurrentOperationException(carbonTable, "compaction", 
"update")
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 08b0dd0..625f594 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -41,12 +41,13 @@ import 
org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, 
DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
+import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
+import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, 
OperationListenerBus}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, 
CarbonTableInputFormat}
-import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -328,18 +329,35 @@ object DeleteExecution {
     (segmentsTobeDeleted, operatedRowCount)
   }
 
-  def clearDistributedSegmentCache(carbonTable: CarbonTable,
-      segmentsToBeCleared: Seq[Segment])(sparkSession: SparkSession): Unit = {
-    if (CarbonProperties.getInstance().isDistributedPruningEnabled(carbonTable
-      .getDatabaseName, carbonTable.getTableName)) {
-      try {
-        IndexServer.getClient
-          .invalidateSegmentCache(carbonTable, 
segmentsToBeCleared.map(_.getSegmentNo)
-          .toArray, SparkSQLUtil.getTaskGroupId(sparkSession))
-      } catch {
-        case _: Exception =>
-          LOGGER.warn(s"Clearing of invalid segments for ${
-            carbonTable.getTableUniqueName} has failed")
+  /**
+   * This function fires an event for pre-priming in the index server
+   */
+  def reloadDistributedSegmentCache(carbonTable: CarbonTable, deletedSegments: 
Seq[Segment],
+      operationContext: OperationContext)(sparkSession: SparkSession): Unit = {
+    if (carbonTable.isTransactionalTable) {
+      val readCommittedScope = new 
TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+        carbonTable.getTablePath), FileFactory.getConfiguration)
+      deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
+      val indexServerEnabled = 
CarbonProperties.getInstance().isDistributedPruningEnabled(
+        carbonTable.getDatabaseName, carbonTable.getTableName)
+      val prePrimingEnabled = 
CarbonProperties.getInstance().isIndexServerPrePrimingEnabled()
+      if (indexServerEnabled && prePrimingEnabled) {
+        LOGGER.info(s"Loading segments for table: ${ carbonTable.getTableName 
} in the cache")
+        val indexServerLoadEvent: IndexServerLoadEvent =
+          IndexServerLoadEvent(
+            sparkSession,
+            carbonTable,
+            deletedSegments.toList,
+            deletedSegments.map(_.getSegmentNo).toList
+          )
+        OperationListenerBus.getInstance().fireEvent(indexServerLoadEvent, 
operationContext)
+        LOGGER.info(s"Segments for table: ${
+          carbonTable
+            .getTableName
+        } has been loaded in the cache")
+      } else {
+        LOGGER.info(
+          s"Segments for table:" + s" ${ carbonTable.getTableName } not loaded 
in the cache")
       }
     }
   }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 54a2b71..0aa69f5 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -62,9 +62,6 @@ public class DataMapWriterListener {
    */
   public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String taskNo, SegmentProperties segmentProperties) {
-    // clear cache in executor side
-    DataMapStoreManager.getInstance()
-        .clearDataMaps(carbonTable.getCarbonTableIdentifier());
     List<TableDataMap> tableIndices;
     try {
       tableIndices = 
DataMapStoreManager.getInstance().getAllDataMap(carbonTable);

Reply via email to