This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 36ee528 [CARBONDATA-3395] Fix Exception when concurrent readers built
with same split object
36ee528 is described below
commit 36ee52836c7bb7bc8e7a4cc6c294d7b77fdba2ee
Author: ajantha-bhat <[email protected]>
AuthorDate: Fri May 24 19:50:57 2019 +0530
[CARBONDATA-3395] Fix Exception when concurrent readers built with same
split object
problem: Fix Exception when concurrent readers built with same split object
cause: In CarbonInputSplit, BlockletDetailInfo and BlockletInfo are made
lazy. so, BlockletInfo is prepared during reader builder.
so, when two readers work on same split object, the state of this object is
changed and leading to array out of bound issue.
solution: a) synchronize BlockletInfo creation,
b) load BlockletDetailInfo before passing to reader inside getSplit() API
itself.
c) Failure case get the proper identifier to cleanup the datamaps.
d) build_with_splits, need to handle default projection filling if not
configured.
This closes #3232
---
.../carbondata/core/indexstore/BlockletDetailInfo.java | 6 +++++-
.../carbondata/hadoop/api/CarbonFileInputFormat.java | 16 ++++++++++------
.../apache/carbondata/sdk/file/CarbonReaderBuilder.java | 14 ++++++++++----
3 files changed, 25 insertions(+), 11 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index a5aa899..af07f09 100644
---
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -108,7 +108,11 @@ public class BlockletDetailInfo implements Serializable,
Writable {
public BlockletInfo getBlockletInfo() {
if (null == blockletInfo) {
try {
- setBlockletInfoFromBinary();
+ synchronized (this) {
+ if (null == blockletInfo) {
+ setBlockletInfoFromBinary();
+ }
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index e83f898..1f34c4f 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -200,17 +200,21 @@ public class CarbonFileInputFormat<T> extends
CarbonInputFormat<T> implements Se
}
});
}
- if (getColumnProjection(job.getConfiguration()) == null) {
- // If the user projection is empty, use default all columns as
projections.
- // All column name will be filled inside getSplits, so can update only
here.
- String[] projectionColumns = projectAllColumns(carbonTable);
- setColumnProjection(job.getConfiguration(), projectionColumns);
- }
+ setAllColumnProjectionIfNotConfigured(job, carbonTable);
return splits;
}
return null;
}
+ public void setAllColumnProjectionIfNotConfigured(JobContext job,
CarbonTable carbonTable) {
+ if (getColumnProjection(job.getConfiguration()) == null) {
+ // If the user projection is empty, use default all columns as
projections.
+ // All column name will be filled inside getSplits, so can update only
here.
+ String[] projectionColumns = projectAllColumns(carbonTable);
+ setColumnProjection(job.getConfiguration(), projectionColumns);
+ }
+ }
+
private List<CarbonFile> getAllCarbonDataFiles(String tablePath) {
List<CarbonFile> carbonFiles;
try {
diff --git
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 6ead50d..2db92ea 100644
---
a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++
b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -358,8 +358,8 @@ public class CarbonReaderBuilder {
}
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
- DataMapStoreManager.getInstance()
- .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
+ DataMapStoreManager.getInstance().clearDataMaps(
+
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
throw ex;
}
}
@@ -372,6 +372,8 @@ public class CarbonReaderBuilder {
}
final Job job = new Job(new JobConf(hadoopConf));
CarbonFileInputFormat format = prepareFileInputFormat(job, false, true);
+ format.setAllColumnProjectionIfNotConfigured(job,
+ format.getOrCreateCarbonTable(job.getConfiguration()));
try {
List<RecordReader<Void, T>> readers = new ArrayList<>(1);
RecordReader reader = getRecordReader(job, format, readers, inputSplit);
@@ -383,8 +385,8 @@ public class CarbonReaderBuilder {
}
} catch (Exception ex) {
// Clear the datamap cache as it can get added in getSplits() method
- DataMapStoreManager.getInstance()
- .clearDataMaps(format.getAbsoluteTableIdentifier(hadoopConf));
+ DataMapStoreManager.getInstance().clearDataMaps(
+
format.getOrCreateCarbonTable((job.getConfiguration())).getAbsoluteTableIdentifier());
throw ex;
}
}
@@ -407,6 +409,10 @@ public class CarbonReaderBuilder {
CarbonFileInputFormat format = prepareFileInputFormat(job,
enableBlockletDistribution, false);
List<InputSplit> splits =
format.getSplits(new JobContextImpl(job.getConfiguration(), new
JobID()));
+ for (InputSplit split : splits) {
+ // Load the detailInfo
+ ((CarbonInputSplit) split).getDetailInfo();
+ }
return splits.toArray(new InputSplit[splits.size()]);
}
}