This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new bd16325 [CARBONDATA-3384] Fix NullPointerException for update/delete
using index server
bd16325 is described below
commit bd1632564acb248db7080b9fd5f76b8e8da79101
Author: kunal642 <[email protected]>
AuthorDate: Wed May 15 11:35:18 2019 +0530
[CARBONDATA-3384] Fix NullPointerException for update/delete using index
server
Problem:
After update the segment cache is cleared from the executor, then in any
subsequent query only one index file is considered for creating the
BlockUniqueIdentifier. Therefore the query throws NullPointer when accessing
the segmentProperties.
Solution:
Consider all index file for the segment for Identifier creation.
This closes #3218
---
.../indexstore/blockletindex/BlockletDataMapFactory.java | 4 ++--
.../carbondata/hadoop/api/CarbonTableInputFormat.java | 4 +++-
.../indexserver/InvalidateSegmentCacheRDD.scala | 16 ++++++++++------
3 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index e4a3ad8..446507f 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -344,6 +344,7 @@ public class BlockletDataMapFactory extends
CoarseGrainDataMapFactory
Set<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
segmentMap.get(distributable.getSegment().getSegmentNo());
if (tableBlockIndexUniqueIdentifiers == null) {
+ tableBlockIndexUniqueIdentifiers = new HashSet<>();
Set<String> indexFiles =
distributable.getSegment().getCommittedIndexFile().keySet();
for (String indexFile : indexFiles) {
CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
@@ -363,10 +364,9 @@ public class BlockletDataMapFactory extends
CoarseGrainDataMapFactory
identifiersWrapper.add(
new
TableBlockIndexUniqueIdentifierWrapper(tableBlockIndexUniqueIdentifier,
this.getCarbonTable()));
- tableBlockIndexUniqueIdentifiers = new HashSet<>();
tableBlockIndexUniqueIdentifiers.add(tableBlockIndexUniqueIdentifier);
- segmentMap.put(distributable.getSegment().getSegmentNo(),
tableBlockIndexUniqueIdentifiers);
}
+ segmentMap.put(distributable.getSegment().getSegmentNo(),
tableBlockIndexUniqueIdentifiers);
} else {
for (TableBlockIndexUniqueIdentifier tableBlockIndexUniqueIdentifier :
tableBlockIndexUniqueIdentifiers) {
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 458c95e..dd86dcb 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -564,7 +564,9 @@ public class CarbonTableInputFormat<T> extends
CarbonInputFormat<T> {
allSegments.getInvalidSegments(), toBeCleanedSegments));
for (InputSplit extendedBlocklet : extendedBlocklets) {
CarbonInputSplit blocklet = (CarbonInputSplit) extendedBlocklet;
- blockletToRowCountMap.put(blocklet.getSegmentId() + "," +
blocklet.getFilePath(),
+ String filePath = blocklet.getFilePath();
+ String blockName = filePath.substring(filePath.lastIndexOf("/") + 1);
+ blockletToRowCountMap.put(blocklet.getSegmentId() + "," + blockName,
(long) blocklet.getDetailInfo().getRowCount());
}
} else {
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
index 1aa8cd9..bc83d2f 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/InvalidateSegmentCacheRDD.scala
@@ -43,12 +43,16 @@ class InvalidateSegmentCacheRDD(@transient private val ss:
SparkSession, databas
}
override protected def internalGetPartitions: Array[Partition] = {
- executorsList.zipWithIndex.map {
- case (executor, idx) =>
- // create a dummy split for each executor to accumulate the cache size.
- val dummySplit = new CarbonInputSplit()
- dummySplit.setLocation(Array(executor))
- new DataMapRDDPartition(id, idx, dummySplit)
+ if (invalidSegmentIds.isEmpty) {
+ Array()
+ } else {
+ executorsList.zipWithIndex.map {
+ case (executor, idx) =>
+ // create a dummy split for each executor to accumulate the cache
size.
+ val dummySplit = new CarbonInputSplit()
+ dummySplit.setLocation(Array(executor))
+ new DataMapRDDPartition(id, idx, dummySplit)
+ }
}
}
}