This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f0724e  [CARBONDATA-3482] Fixed NPE in Concurrent query
8f0724e is described below

commit 8f0724e4256e960608aa6a0d66593acd2ceaa84e
Author: kunal642 <[email protected]>
AuthorDate: Mon Jul 29 14:31:31 2019 +0530

    [CARBONDATA-3482] Fixed NPE in Concurrent query
    
    Problem: In case of concurrent queries if Q1 is loading cache and Q2 is 
removing from cache then Q2 may remove the segmentPropertiesIndex which Q1 has 
allocated and is about to access. This will cause NullPointerException .
    
    Solution: Instead of adding index in BlockDataMap keep the reference of 
segmentPropertiesWrapper to be used.
    
    This closes #3351
---
 .../block/SegmentPropertiesAndSchemaHolder.java    | 21 ++++++-------
 .../core/indexstore/BlockletDataMapIndexStore.java |  2 +-
 .../indexstore/blockletindex/BlockDataMap.java     | 36 ++++++++--------------
 .../indexstore/blockletindex/BlockletDataMap.java  |  7 +----
 ...ryWithColumnMetCacheAndCacheLevelProperty.scala | 27 ++--------------
 5 files changed, 26 insertions(+), 67 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index f2f2d8c..056a0e7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -98,7 +98,7 @@ public class SegmentPropertiesAndSchemaHolder {
    * @param columnCardinality
    * @param segmentId
    */
-  public int addSegmentProperties(CarbonTable carbonTable,
+  public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable,
       List<ColumnSchema> columnsInTable, int[] columnCardinality, String 
segmentId) {
     SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper 
segmentPropertiesWrapper =
         new 
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable,
@@ -137,7 +137,7 @@ public class SegmentPropertiesAndSchemaHolder {
             .addMinMaxColumns(carbonTable);
       }
     }
-    return segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex();
+    return 
getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex());
   }
 
   /**
@@ -222,17 +222,14 @@ public class SegmentPropertiesAndSchemaHolder {
    * Method to remove the given segment ID
    *
    * @param segmentId
-   * @param segmentPropertiesIndex
    * @param clearSegmentWrapperFromMap flag to specify whether to clear 
segmentPropertiesWrapper
    *                                   from Map if all the segment's using it 
have become stale
    */
-  public void invalidate(String segmentId, int segmentPropertiesIndex,
+  public void invalidate(String segmentId, SegmentPropertiesWrapper 
segmentPropertiesWrapper,
       boolean clearSegmentWrapperFromMap) {
-    SegmentPropertiesWrapper segmentPropertiesWrapper =
-        indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex);
-    if (null != segmentPropertiesWrapper) {
-      SegmentIdAndSegmentPropertiesIndexWrapper 
segmentIdAndSegmentPropertiesIndexWrapper =
-          segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
+    SegmentIdAndSegmentPropertiesIndexWrapper 
segmentIdAndSegmentPropertiesIndexWrapper =
+        segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
+    if (segmentIdAndSegmentPropertiesIndexWrapper != null) {
       synchronized 
(getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
         segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
         // if after removal of given SegmentId, the segmentIdSet becomes empty 
that means this
@@ -240,14 +237,16 @@ public class SegmentPropertiesAndSchemaHolder {
         // removed from all the holders
         if (clearSegmentWrapperFromMap && 
segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
             .isEmpty()) {
-          
indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
+          indexToSegmentPropertiesWrapperMapping
+              
.remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
           segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
         } else if (!clearSegmentWrapperFromMap
             && 
segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
           // min max columns can very when cache is modified. So even though 
entry is not required
           // to be deleted from map clear the column cache so that it can 
filled again
           segmentPropertiesWrapper.clear();
-          LOGGER.info("cleared min max for segmentProperties at index: " + 
segmentPropertiesIndex);
+          LOGGER.info("cleared min max for segmentProperties at index: "
+              + 
segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
         }
       }
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ce1e8ac..32ee9cb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -230,7 +230,7 @@ public class BlockletDataMapIndexStore
         // as segmentId will be same for all the dataMaps and 
segmentProperties cache is
         // maintained at segment level so it need to be called only once for 
clearing
         SegmentPropertiesAndSchemaHolder.getInstance()
-            .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(),
+            .invalidate(segmentId, 
dataMaps.get(0).getSegmentPropertiesWrapper(),
                 
tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache());
       }
     }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 24ad43a..f168761 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -99,7 +99,8 @@ public class BlockDataMap extends CoarseGrainDataMap
   /**
    * index of segmentProperties in the segmentProperties holder
    */
-  protected int segmentPropertiesIndex;
+  protected transient SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
+      segmentPropertiesWrapper;
   /**
    * flag to check for store from 1.1 or any prior version
    */
@@ -204,10 +205,10 @@ public class BlockDataMap extends CoarseGrainDataMap
       DataFileFooter fileFooter) throws IOException {
     List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
     int[] columnCardinality = 
fileFooter.getSegmentInfo().getColumnCardinality();
-    segmentPropertiesIndex = SegmentPropertiesAndSchemaHolder.getInstance()
+    segmentPropertiesWrapper = SegmentPropertiesAndSchemaHolder.getInstance()
         .addSegmentProperties(blockletDataMapInfo.getCarbonTable(),
             columnInTable, columnCardinality, 
blockletDataMapInfo.getSegmentId());
-    return getSegmentProperties();
+    return segmentPropertiesWrapper.getSegmentProperties();
   }
 
   /**
@@ -485,8 +486,7 @@ public class BlockDataMap extends CoarseGrainDataMap
       return getTableTaskInfo(SUMMARY_INDEX_PATH);
     }
     // create the segment directory path
-    String tablePath = SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getTableIdentifier().getTablePath();
+    String tablePath = 
segmentPropertiesWrapper.getTableIdentifier().getTablePath();
     String segmentId = getTableTaskInfo(SUMMARY_SEGMENTID);
     return CarbonTablePath.getSegmentPath(tablePath, segmentId);
   }
@@ -620,8 +620,7 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   protected List<CarbonColumn> getMinMaxCacheColumns() {
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getMinMaxCacheColumns();
+    return segmentPropertiesWrapper.getMinMaxCacheColumns();
   }
 
   /**
@@ -1019,18 +1018,15 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   protected SegmentProperties getSegmentProperties() {
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        .getSegmentProperties(segmentPropertiesIndex);
+    return segmentPropertiesWrapper.getSegmentProperties();
   }
 
   public int[] getColumnCardinality() {
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnCardinality();
+    return segmentPropertiesWrapper.getColumnCardinality();
   }
 
   public List<ColumnSchema> getColumnSchema() {
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnsInTable();
+    return segmentPropertiesWrapper.getColumnsInTable();
   }
 
   protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe)
@@ -1045,14 +1041,10 @@ public class BlockDataMap extends CoarseGrainDataMap
   }
 
   protected CarbonRowSchema[] getFileFooterEntrySchema() {
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockFileFooterEntrySchema();
+    return segmentPropertiesWrapper.getBlockFileFooterEntrySchema();
   }
 
   protected CarbonRowSchema[] getTaskSummarySchema() {
-    SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper 
segmentPropertiesWrapper =
-        SegmentPropertiesAndSchemaHolder.getInstance()
-            .getSegmentPropertiesWrapper(segmentPropertiesIndex);
     try {
       return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true, 
isFilePathStored);
     } catch (MemoryException e) {
@@ -1080,12 +1072,8 @@ public class BlockDataMap extends CoarseGrainDataMap
     }
   }
 
-  public void setSegmentPropertiesIndex(int segmentPropertiesIndex) {
-    this.segmentPropertiesIndex = segmentPropertiesIndex;
-  }
-
-  public int getSegmentPropertiesIndex() {
-    return segmentPropertiesIndex;
+  public SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper 
getSegmentPropertiesWrapper() {
+    return segmentPropertiesWrapper;
   }
 
   @Override public int getNumberOfEntries() {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 23d39ce..11f24f5 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -27,7 +27,6 @@ import java.util.List;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -83,9 +82,6 @@ public class BlockletDataMap extends BlockDataMap implements 
Serializable {
     if (isLegacyStore) {
       return super.getTaskSummarySchema();
     }
-    SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper 
segmentPropertiesWrapper =
-        SegmentPropertiesAndSchemaHolder.getInstance()
-            .getSegmentPropertiesWrapper(segmentPropertiesIndex);
     try {
       return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false, 
isFilePathStored);
     } catch (MemoryException e) {
@@ -98,8 +94,7 @@ public class BlockletDataMap extends BlockDataMap implements 
Serializable {
     if (isLegacyStore) {
       return super.getFileFooterEntrySchema();
     }
-    return SegmentPropertiesAndSchemaHolder.getInstance()
-        
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockletFileFooterEntrySchema();
+    return segmentPropertiesWrapper.getBlockletFileFooterEntrySchema();
   }
 
   /**
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index ac0ca8b..5c34fe4 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -88,9 +88,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
 
   private def validateMinMaxColumnsCacheLength(dataMaps: List[DataMap[_ <: 
Blocklet]],
       expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = {
-    val index = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
-    val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
-      
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount,
 false)
+    val segmentPropertiesWrapper = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
+    val summarySchema = 
segmentPropertiesWrapper.getTaskSummarySchemaForBlock(storeBlockletCount, false)
     val minSchemas = 
summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX)
       .asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
       .getChildSchemas
@@ -107,15 +106,10 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
     assert(dataMaps.nonEmpty)
     assert(dataMaps(0).isInstanceOf[BlockDataMap])
     assert(validateMinMaxColumnsCacheLength(dataMaps, 3, true))
-    var segmentPropertyIndex = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
-
     // alter table to add column_meta_cache and cache_level
     sql(
       "alter table metaCache set tblproperties('column_meta_cache'='c2,c1', 
'CACHE_LEVEL'='BLOCKLET')")
-    var wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
-      .getSegmentPropertiesWrapper(segmentPropertyIndex)
     // after alter operation cache should be cleaned and cache should be 
evicted
-    assert(null == wrapper)
     checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
     // validate dataMap is non empty, its an instance of BlockletDataMap and 
minMaxSchema length
     // is 1
@@ -125,23 +119,11 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
     assert(validateMinMaxColumnsCacheLength(dataMaps, 2))
 
     // alter table to add same value as previous with order change for 
column_meta_cache and cache_level
-    segmentPropertyIndex = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
     sql(
       "alter table metaCache set tblproperties('column_meta_cache'='c1,c2', 
'CACHE_LEVEL'='BLOCKLET')")
-    wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
-      .getSegmentPropertiesWrapper(segmentPropertyIndex)
-    // after alter operation cache should not be cleaned as value are unchanged
-    assert(null != wrapper)
-
-    // alter table to cache no column in column_meta_cache
-    segmentPropertyIndex = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
     sql(
       "alter table metaCache set tblproperties('column_meta_cache'='')")
-    wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
-      .getSegmentPropertiesWrapper(segmentPropertyIndex)
-
     // after alter operation cache should be cleaned and cache should be 
evicted
-    assert(null == wrapper)
     checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
     // validate dataMap is non empty, its an instance of BlockletDataMap and 
minMaxSchema length
     // is 0
@@ -151,13 +133,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty 
extends QueryTest with Be
     assert(validateMinMaxColumnsCacheLength(dataMaps, 0))
 
     // alter table to cache no column in column_meta_cache
-    segmentPropertyIndex = 
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
     sql(
       "alter table metaCache unset tblproperties('column_meta_cache', 
'cache_level')")
-    wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
-      .getSegmentPropertiesWrapper(segmentPropertyIndex)
-    // after alter operation cache should be cleaned and cache should be 
evicted
-    assert(null == wrapper)
     checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
     // validate dataMap is non empty, its an instance of BlockletDataMap and 
minMaxSchema length
     // is 3

Reply via email to