This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8f0724e [CARBONDATA-3482] Fixed NPE in Concurrent query
8f0724e is described below
commit 8f0724e4256e960608aa6a0d66593acd2ceaa84e
Author: kunal642 <[email protected]>
AuthorDate: Mon Jul 29 14:31:31 2019 +0530
[CARBONDATA-3482] Fixed NPE in Concurrent query
Problem: In case of concurrent queries if Q1 is loading cache and Q2 is
removing from cache then Q2 may remove the segmentPropertiesIndex which Q1 has
allocated and is about to access. This will cause NullPointerException .
Solution: Instead of adding index in BlockDataMap keep the reference of
segmentPropertiesWrapper to be used.
This closes #3351
---
.../block/SegmentPropertiesAndSchemaHolder.java | 21 ++++++-------
.../core/indexstore/BlockletDataMapIndexStore.java | 2 +-
.../indexstore/blockletindex/BlockDataMap.java | 36 ++++++++--------------
.../indexstore/blockletindex/BlockletDataMap.java | 7 +----
...ryWithColumnMetCacheAndCacheLevelProperty.scala | 27 ++--------------
5 files changed, 26 insertions(+), 67 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
index f2f2d8c..056a0e7 100644
---
a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
+++
b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentPropertiesAndSchemaHolder.java
@@ -98,7 +98,7 @@ public class SegmentPropertiesAndSchemaHolder {
* @param columnCardinality
* @param segmentId
*/
- public int addSegmentProperties(CarbonTable carbonTable,
+ public SegmentPropertiesWrapper addSegmentProperties(CarbonTable carbonTable,
List<ColumnSchema> columnsInTable, int[] columnCardinality, String
segmentId) {
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
segmentPropertiesWrapper =
new
SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper(carbonTable,
@@ -137,7 +137,7 @@ public class SegmentPropertiesAndSchemaHolder {
.addMinMaxColumns(carbonTable);
}
}
- return segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex();
+ return
getSegmentPropertiesWrapper(segmentIdSetAndIndexWrapper.getSegmentPropertiesIndex());
}
/**
@@ -222,17 +222,14 @@ public class SegmentPropertiesAndSchemaHolder {
* Method to remove the given segment ID
*
* @param segmentId
- * @param segmentPropertiesIndex
* @param clearSegmentWrapperFromMap flag to specify whether to clear
segmentPropertiesWrapper
* from Map if all the segment's using it
have become stale
*/
- public void invalidate(String segmentId, int segmentPropertiesIndex,
+ public void invalidate(String segmentId, SegmentPropertiesWrapper
segmentPropertiesWrapper,
boolean clearSegmentWrapperFromMap) {
- SegmentPropertiesWrapper segmentPropertiesWrapper =
- indexToSegmentPropertiesWrapperMapping.get(segmentPropertiesIndex);
- if (null != segmentPropertiesWrapper) {
- SegmentIdAndSegmentPropertiesIndexWrapper
segmentIdAndSegmentPropertiesIndexWrapper =
- segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
+ SegmentIdAndSegmentPropertiesIndexWrapper
segmentIdAndSegmentPropertiesIndexWrapper =
+ segmentPropWrapperToSegmentSetMap.get(segmentPropertiesWrapper);
+ if (segmentIdAndSegmentPropertiesIndexWrapper != null) {
synchronized
(getOrCreateTableLock(segmentPropertiesWrapper.getTableIdentifier())) {
segmentIdAndSegmentPropertiesIndexWrapper.removeSegmentId(segmentId);
// if after removal of given SegmentId, the segmentIdSet becomes empty
that means this
@@ -240,14 +237,16 @@ public class SegmentPropertiesAndSchemaHolder {
// removed from all the holders
if (clearSegmentWrapperFromMap &&
segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet
.isEmpty()) {
-
indexToSegmentPropertiesWrapperMapping.remove(segmentPropertiesIndex);
+ indexToSegmentPropertiesWrapperMapping
+
.remove(segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
segmentPropWrapperToSegmentSetMap.remove(segmentPropertiesWrapper);
} else if (!clearSegmentWrapperFromMap
&&
segmentIdAndSegmentPropertiesIndexWrapper.segmentIdSet.isEmpty()) {
// min max columns can very when cache is modified. So even though
entry is not required
// to be deleted from map clear the column cache so that it can
filled again
segmentPropertiesWrapper.clear();
- LOGGER.info("cleared min max for segmentProperties at index: " +
segmentPropertiesIndex);
+ LOGGER.info("cleared min max for segmentProperties at index: "
+ +
segmentIdAndSegmentPropertiesIndexWrapper.getSegmentPropertiesIndex());
}
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index ce1e8ac..32ee9cb 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -230,7 +230,7 @@ public class BlockletDataMapIndexStore
// as segmentId will be same for all the dataMaps and
segmentProperties cache is
// maintained at segment level so it need to be called only once for
clearing
SegmentPropertiesAndSchemaHolder.getInstance()
- .invalidate(segmentId, dataMaps.get(0).getSegmentPropertiesIndex(),
+ .invalidate(segmentId,
dataMaps.get(0).getSegmentPropertiesWrapper(),
tableSegmentUniqueIdentifierWrapper.isAddTableBlockToUnsafeAndLRUCache());
}
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 24ad43a..f168761 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -99,7 +99,8 @@ public class BlockDataMap extends CoarseGrainDataMap
/**
* index of segmentProperties in the segmentProperties holder
*/
- protected int segmentPropertiesIndex;
+ protected transient SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
+ segmentPropertiesWrapper;
/**
* flag to check for store from 1.1 or any prior version
*/
@@ -204,10 +205,10 @@ public class BlockDataMap extends CoarseGrainDataMap
DataFileFooter fileFooter) throws IOException {
List<ColumnSchema> columnInTable = fileFooter.getColumnInTable();
int[] columnCardinality =
fileFooter.getSegmentInfo().getColumnCardinality();
- segmentPropertiesIndex = SegmentPropertiesAndSchemaHolder.getInstance()
+ segmentPropertiesWrapper = SegmentPropertiesAndSchemaHolder.getInstance()
.addSegmentProperties(blockletDataMapInfo.getCarbonTable(),
columnInTable, columnCardinality,
blockletDataMapInfo.getSegmentId());
- return getSegmentProperties();
+ return segmentPropertiesWrapper.getSegmentProperties();
}
/**
@@ -485,8 +486,7 @@ public class BlockDataMap extends CoarseGrainDataMap
return getTableTaskInfo(SUMMARY_INDEX_PATH);
}
// create the segment directory path
- String tablePath = SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getTableIdentifier().getTablePath();
+ String tablePath =
segmentPropertiesWrapper.getTableIdentifier().getTablePath();
String segmentId = getTableTaskInfo(SUMMARY_SEGMENTID);
return CarbonTablePath.getSegmentPath(tablePath, segmentId);
}
@@ -620,8 +620,7 @@ public class BlockDataMap extends CoarseGrainDataMap
}
protected List<CarbonColumn> getMinMaxCacheColumns() {
- return SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getMinMaxCacheColumns();
+ return segmentPropertiesWrapper.getMinMaxCacheColumns();
}
/**
@@ -1019,18 +1018,15 @@ public class BlockDataMap extends CoarseGrainDataMap
}
protected SegmentProperties getSegmentProperties() {
- return SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentProperties(segmentPropertiesIndex);
+ return segmentPropertiesWrapper.getSegmentProperties();
}
public int[] getColumnCardinality() {
- return SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnCardinality();
+ return segmentPropertiesWrapper.getColumnCardinality();
}
public List<ColumnSchema> getColumnSchema() {
- return SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getColumnsInTable();
+ return segmentPropertiesWrapper.getColumnsInTable();
}
protected AbstractMemoryDMStore getMemoryDMStore(boolean addToUnsafe)
@@ -1045,14 +1041,10 @@ public class BlockDataMap extends CoarseGrainDataMap
}
protected CarbonRowSchema[] getFileFooterEntrySchema() {
- return SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockFileFooterEntrySchema();
+ return segmentPropertiesWrapper.getBlockFileFooterEntrySchema();
}
protected CarbonRowSchema[] getTaskSummarySchema() {
- SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
segmentPropertiesWrapper =
- SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchemaForBlock(true,
isFilePathStored);
} catch (MemoryException e) {
@@ -1080,12 +1072,8 @@ public class BlockDataMap extends CoarseGrainDataMap
}
}
- public void setSegmentPropertiesIndex(int segmentPropertiesIndex) {
- this.segmentPropertiesIndex = segmentPropertiesIndex;
- }
-
- public int getSegmentPropertiesIndex() {
- return segmentPropertiesIndex;
+ public SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
getSegmentPropertiesWrapper() {
+ return segmentPropertiesWrapper;
}
@Override public int getNumberOfEntries() {
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 23d39ce..11f24f5 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -27,7 +27,6 @@ import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMapModel;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -83,9 +82,6 @@ public class BlockletDataMap extends BlockDataMap implements
Serializable {
if (isLegacyStore) {
return super.getTaskSummarySchema();
}
- SegmentPropertiesAndSchemaHolder.SegmentPropertiesWrapper
segmentPropertiesWrapper =
- SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertiesIndex);
try {
return segmentPropertiesWrapper.getTaskSummarySchemaForBlocklet(false,
isFilePathStored);
} catch (MemoryException e) {
@@ -98,8 +94,7 @@ public class BlockletDataMap extends BlockDataMap implements
Serializable {
if (isLegacyStore) {
return super.getFileFooterEntrySchema();
}
- return SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(segmentPropertiesIndex).getBlockletFileFooterEntrySchema();
+ return segmentPropertiesWrapper.getBlockletFileFooterEntrySchema();
}
/**
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
index ac0ca8b..5c34fe4 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithColumnMetCacheAndCacheLevelProperty.scala
@@ -88,9 +88,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
extends QueryTest with Be
private def validateMinMaxColumnsCacheLength(dataMaps: List[DataMap[_ <:
Blocklet]],
expectedLength: Int, storeBlockletCount: Boolean = false): Boolean = {
- val index =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
- val summarySchema = SegmentPropertiesAndSchemaHolder.getInstance()
-
.getSegmentPropertiesWrapper(index).getTaskSummarySchemaForBlock(storeBlockletCount,
false)
+ val segmentPropertiesWrapper =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesWrapper
+ val summarySchema =
segmentPropertiesWrapper.getTaskSummarySchemaForBlock(storeBlockletCount, false)
val minSchemas =
summarySchema(BlockletDataMapRowIndexes.TASK_MIN_VALUES_INDEX)
.asInstanceOf[CarbonRowSchema.StructCarbonRowSchema]
.getChildSchemas
@@ -107,15 +106,10 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
extends QueryTest with Be
assert(dataMaps.nonEmpty)
assert(dataMaps(0).isInstanceOf[BlockDataMap])
assert(validateMinMaxColumnsCacheLength(dataMaps, 3, true))
- var segmentPropertyIndex =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
-
// alter table to add column_meta_cache and cache_level
sql(
"alter table metaCache set tblproperties('column_meta_cache'='c2,c1',
'CACHE_LEVEL'='BLOCKLET')")
- var wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertyIndex)
// after alter operation cache should be cleaned and cache should be
evicted
- assert(null == wrapper)
checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
// validate dataMap is non empty, its an instance of BlockletDataMap and
minMaxSchema length
// is 1
@@ -125,23 +119,11 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
extends QueryTest with Be
assert(validateMinMaxColumnsCacheLength(dataMaps, 2))
// alter table to add same value as previous with order change for
column_meta_cache and cache_level
- segmentPropertyIndex =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache set tblproperties('column_meta_cache'='c1,c2',
'CACHE_LEVEL'='BLOCKLET')")
- wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertyIndex)
- // after alter operation cache should not be cleaned as value are unchanged
- assert(null != wrapper)
-
- // alter table to cache no column in column_meta_cache
- segmentPropertyIndex =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache set tblproperties('column_meta_cache'='')")
- wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertyIndex)
-
// after alter operation cache should be cleaned and cache should be
evicted
- assert(null == wrapper)
checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
// validate dataMap is non empty, its an instance of BlockletDataMap and
minMaxSchema length
// is 0
@@ -151,13 +133,8 @@ class TestQueryWithColumnMetCacheAndCacheLevelProperty
extends QueryTest with Be
assert(validateMinMaxColumnsCacheLength(dataMaps, 0))
// alter table to cache no column in column_meta_cache
- segmentPropertyIndex =
dataMaps(0).asInstanceOf[BlockDataMap].getSegmentPropertiesIndex
sql(
"alter table metaCache unset tblproperties('column_meta_cache',
'cache_level')")
- wrapper = SegmentPropertiesAndSchemaHolder.getInstance()
- .getSegmentPropertiesWrapper(segmentPropertyIndex)
- // after alter operation cache should be cleaned and cache should be
evicted
- assert(null == wrapper)
checkAnswer(sql("select * from metaCache"), Row("a", "aa", "aaa"))
// validate dataMap is non empty, its an instance of BlockletDataMap and
minMaxSchema length
// is 3