This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 686c20c  [CARBONDATA-3526]Fix cache issue during update and query
686c20c is described below

commit 686c20c0f51ad9f3d48c8ffde83e8bd4f58d254e
Author: akashrn5 <[email protected]>
AuthorDate: Thu Sep 12 14:30:48 2019 +0530

    [CARBONDATA-3526]Fix cache issue during update and query
    
    Problem:
    When multiple updates happen on table, cache is loaded
    during update operation, but since on second update the
    horizontal compaction happens inside the segment, already
    loaded into cache are invalid. So if we do clean files,
    physical deletion of horizontal compacted takes place,
    but still the cache contains old files. So when select
     query is fired, query fails with file not found exception.
    
    Solution:
    once after horizontal compaction is finished, new compacted
    files are generated, so the segments inside cache are now invalid,
    so clear the cache of invalid segment after horizontal compaction.
    During drop cache command, clear the cache of segmentMap also.
    
    This closes #3385
---
 .../sql/execution/command/cache/CarbonDropCacheCommand.scala     | 8 +++-----
 .../sql/execution/command/mutation/HorizontalCompaction.scala    | 9 ++++++++-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
index 1554f6a..7b8e10f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/cache/CarbonDropCacheCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.CacheProvider
-import org.apache.carbondata.core.datamap.DataMapUtil
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, DataMapUtil}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{DropTableCacheEvent, OperationContext, 
OperationListenerBus}
@@ -55,13 +55,11 @@ case class CarbonDropCacheCommand(tableIdentifier: 
TableIdentifier, internalCall
         carbonTable.getTableName)) {
         DataMapUtil.executeClearDataMapJob(carbonTable, 
DataMapUtil.DISTRIBUTED_JOB_NAME)
       } else {
-        val allIndexFiles = 
CacheUtil.getAllIndexFiles(carbonTable)(sparkSession)
         // Extract dictionary keys for the table and create cache keys from 
those
         val dictKeys: List[String] = CacheUtil.getAllDictCacheKeys(carbonTable)
-
         // Remove elements from cache
-        val keysToRemove = allIndexFiles ++ dictKeys
-        cache.removeAll(keysToRemove.asJava)
+        cache.removeAll(dictKeys.asJava)
+        
DataMapStoreManager.getInstance().clearDataMaps(carbonTable.getAbsoluteTableIdentifier)
       }
     }
     LOGGER.info("Drop cache request served for table " + 
carbonTable.getTableUniqueName)
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index fb20e4f..62a3486 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
@@ -106,6 +106,13 @@ object HorizontalCompaction {
       segmentUpdateStatusManager,
       deleteTimeStamp,
       segLists)
+
+    // If there are already index and data files are present for old update 
operation, then the
+    // cache will be loaded for those files during current update, but once 
after horizontal
+    // compaction is finished, new compacted files are generated, so the 
segments inside cache are
+    // now invalid, so clear the cache of invalid segment after horizontal 
compaction.
+    DataMapStoreManager.getInstance()
+      .clearInvalidSegments(carbonTable, 
segLists.asScala.map(_.getSegmentNo).asJava)
   }
 
   /**

Reply via email to