Repository: carbondata
Updated Branches:
  refs/heads/master 4612e0031 -> a26be1b18


[CARBONDATA-2714][Merge Index] Fixed block dataMap cache refresh issue after 
creation of merge index file

Things handled as part of this PR

Fixed block dataMap cache refresh issue after creation of merge index file
Problem:
Block DataMap cache not getting refreshed after creation of merge index file 
due to which queries still look for index file and fail.

Analysis:
Merge index file creation involves modification of segment file. If a query is 
executed without merge Index file creation then cache will be loaded. Once 
merge Index file is created the index file entries will be removed from segment 
file and merge index file entry will be added. In this process the cache is not 
getting refreshed and the tableIdentifiers created still have the 
mergeIndexFIleName as null.

Fix:
After updating table status file clear the dataMap cache for all segmentId's on 
which dataMap is being created

This closes #2515


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a26be1b1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a26be1b1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a26be1b1

Branch: refs/heads/master
Commit: a26be1b181f952d860050e65b6cf1ad85d0bfea5
Parents: 4612e00
Author: manishgupta88 <tomanishgupt...@gmail.com>
Authored: Tue Jul 17 14:24:54 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Jul 18 10:30:33 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 29 +++++++++++-
 .../core/writer/CarbonIndexFileMergeWriter.java |  2 +-
 .../CarbonIndexFileMergeTestCase.scala          | 47 +++++++++++++++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../sql/events/MergeIndexEventListener.scala    | 17 ++++++-
 5 files changed, 91 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 3d08a2d..9681e37 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -36,7 +36,9 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -280,9 +282,10 @@ public class SegmentFileStore {
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean updateSegmentFile(String tablePath, String segmentId, 
String segmentFile,
-      String tableId, SegmentFileStore segmentFileStore) throws IOException {
+  public static boolean updateSegmentFile(CarbonTable carbonTable, String 
segmentId,
+      String segmentFile, String tableId, SegmentFileStore segmentFileStore) 
throws IOException {
     boolean status = false;
+    String tablePath = carbonTable.getTablePath();
     String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
     if (!FileFactory.isFileExist(tableStatusPath)) {
       return status;
@@ -316,6 +319,8 @@ public class SegmentFileStore {
 
         SegmentStatusManager
             .writeLoadDetailsIntoFile(tableStatusPath, 
listOfLoadFolderDetailsArray);
+        // clear dataMap cache for the segmentId for which the table status 
file is getting updated
+        clearBlockDataMapCache(carbonTable, segmentId);
         status = true;
       } else {
         LOGGER.error(
@@ -333,6 +338,26 @@ public class SegmentFileStore {
     return status;
   }
 
+  /**
+   * After updating table status file clear the dataMap cache for all 
segmentId's on which
+   * dataMap is being created because flows like merge index file creation 
involves modification of
+   * segment file and once segment file is modified the cache for that segment 
need to be cleared
+   * otherwise the old cache will be used which is stale
+   *
+   * @param carbonTable
+   * @param segmentId
+   */
+  public static void clearBlockDataMapCache(CarbonTable carbonTable, String 
segmentId) {
+    TableDataMap defaultDataMap = 
DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable);
+    Segment segment = new Segment(segmentId);
+    List<Segment> segments = new ArrayList<>();
+    segments.add(segment);
+    LOGGER.info(
+        "clearing cache while updating segment file entry in table status file 
for segmentId: "
+            + segmentId);
+    defaultDataMap.clear(segments);
+  }
+
   private static CarbonFile[] getSegmentFiles(String segmentPath) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     if (carbonFile.exists()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index b080f52..1634091 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -151,7 +151,7 @@ public class CarbonIndexFileMergeWriter {
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-    SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, 
newSegmentFileName,
+    SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName,
         table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
 
     for (CarbonFile file : indexFiles) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 8ee2275..173c14f 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,17 +17,24 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
+import java.util
+
+import scala.collection.JavaConverters._
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
+import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier
+import 
org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 
@@ -44,6 +51,7 @@ class CarbonIndexFileMergeTestCase
     CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
     sql("DROP TABLE IF EXISTS nonindexmerge")
     sql("DROP TABLE IF EXISTS indexmerge")
+    sql("DROP TABLE IF EXISTS merge_index_cache")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
         CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
@@ -444,6 +452,43 @@ class CarbonIndexFileMergeTestCase
     sql("DROP TABLE IF EXISTS streamingTable")
   }
 
+  test("verify driver cache gets updated after creating merge Index file") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, 
"false")
+    sql("DROP TABLE IF EXISTS merge_index_cache")
+    sql(
+      """
+        | CREATE TABLE merge_index_cache(id INT, name STRING, city STRING, age 
INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE merge_index_cache 
OPTIONS('header'='false')")
+    sql("""Select count(*) from merge_index_cache""").collect()
+    // merge Index fileName should be null as merge Index file is not created
+    assert(mergeFileNameIsNull("0", "default", "merge_index_cache"))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE merge_index_cache COMPACT 'SEGMENT_INDEX'")
+    sql("""Select count(*) from merge_index_cache""").collect()
+    // once merge file is created cache should be refreshed in the same 
session and identifiers
+    // should contain mergeIndex file name
+    assert(!mergeFileNameIsNull("0", "default", "merge_index_cache"))
+  }
+
+  private def mergeFileNameIsNull(segmentId: String, dbName: String, 
tableName: String): Boolean = {
+    val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), 
tableName)(sqlContext.sparkSession)
+    val dataMapFactory = 
DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable)
+      .getDataMapFactory
+    val method = classOf[BlockletDataMapFactory]
+      .getDeclaredMethod("getTableBlockIndexUniqueIdentifiers", 
classOf[Segment])
+    method.setAccessible(true)
+    val segment = new Segment(segmentId)
+    val identifiers = method.invoke(dataMapFactory, segment)
+      .asInstanceOf[util.Set[TableBlockIndexUniqueIdentifier]].asScala
+    assert(identifiers.size == 1)
+    identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
+  }
+
   private def getIndexFileCount(tableName: String, segment: String): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 40d5c0d..804193c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -519,7 +519,7 @@ object CarbonDataRDDFactory {
           String.valueOf(carbonLoadModel.getFactTimeStamp))
 
       SegmentFileStore.updateSegmentFile(
-        carbonTable.getTablePath,
+        carbonTable,
         carbonLoadModel.getSegmentId,
         segmentFileName,
         carbonTable.getCarbonTableIdentifier.getTableId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a26be1b1/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index a58e405..dd64423 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.common.logging.{LogService, 
LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, 
AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener}
@@ -67,6 +68,8 @@ class MergeIndexEventListener extends OperationEventListener 
with Logging {
               segmentFileNameMap,
               carbonTable.getTablePath,
               carbonTable, false)
+            // clear Block dataMap Cache
+            clearBlockDataMapCache(carbonTable, Seq(loadModel.getSegmentId))
           }
         }
       case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
@@ -120,6 +123,8 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
                 carbonMainTable.getTablePath,
                 carbonMainTable,
                 true)
+              // clear Block dataMap Cache
+              clearBlockDataMapCache(carbonMainTable, validSegmentIds)
               val requestMessage = "Compaction request completed for table "
               s"${ carbonMainTable.getDatabaseName }.${ 
carbonMainTable.getTableName }"
               LOGGER.audit(requestMessage)
@@ -168,13 +173,23 @@ class MergeIndexEventListener extends 
OperationEventListener with Logging {
     // So, it is enough to do merge index only for 0.2 as it is the only valid 
segment in this list
     val validMergedSegIds = validSegments
       .filter { seg => mergedSegmentIds.contains(seg.getSegmentNo) 
}.map(_.getSegmentNo)
-    if (null != validMergedSegIds && !mergedSegmentIds.isEmpty) {
+    if (null != validMergedSegIds && !validMergedSegIds.isEmpty) {
       CommonUtil.mergeIndexFiles(sparkContext,
           validMergedSegIds,
           segmentFileNameMap,
           carbonTable.getTablePath,
           carbonTable,
           false)
+      // clear Block dataMap Cache
+      clearBlockDataMapCache(carbonTable, validMergedSegIds)
     }
   }
+
+  private def clearBlockDataMapCache(carbonTable: CarbonTable, segmentIds: 
Seq[String]): Unit = {
+    // clear driver Block dataMap cache for each segment
+    segmentIds.foreach { segmentId =>
+      SegmentFileStore.clearBlockDataMapCache(carbonTable, segmentId)
+    }
+  }
+
 }

Reply via email to