Github user sounakr commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/492#discussion_r94932078
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---
@@ -358,66 +372,176 @@ private Expression getFilterPredicates(Configuration
configuration) {
* @return list of table block
* @throws IOException
*/
- private List<TableBlockInfo> getTableBlockInfo(JobContext job, String
segmentId)
- throws IOException {
+ private List<TableBlockInfo> getTableBlockInfo(JobContext job,
+ TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys,
+ List<String> updatedTaskList,
+ UpdateVO updateDetails,
+ SegmentUpdateStatusManager updateStatusManager,
+ String segmentId)
+ throws IOException {
List<TableBlockInfo> tableBlockInfoList = new
ArrayList<TableBlockInfo>();
// get file location of all files of given segment
JobContext newJob =
new JobContextImpl(new Configuration(job.getConfiguration()),
job.getJobID());
- newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
segmentId + "");
+ newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
+ tableSegmentUniqueIdentifier.getSegmentId() + "");
// identify table blocks
for (InputSplit inputSplit : getSplitsInternal(newJob)) {
CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
- BlockletInfos blockletInfos = new
BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
- carbonInputSplit.getNumberOfBlocklets());
- tableBlockInfoList.add(
- new TableBlockInfo(carbonInputSplit.getPath().toString(),
carbonInputSplit.getStart(),
- segmentId, carbonInputSplit.getLocations(),
carbonInputSplit.getLength(),
- blockletInfos, carbonInputSplit.getVersion()));
+ // if blockname and update block name is same then cmpare its time
stamp with
+ // tableSegmentUniqueIdentifiertimestamp if time stamp is greater
+ // then add as TableInfo object.
+ if (isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit,
updateDetails,
+ updateStatusManager, segmentId)) {
+ BlockletInfos blockletInfos = new
BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0,
+ carbonInputSplit.getNumberOfBlocklets());
+ tableBlockInfoList.add(
+ new TableBlockInfo(carbonInputSplit.getPath().toString(),
carbonInputSplit.getStart(),
+ tableSegmentUniqueIdentifier.getSegmentId(),
carbonInputSplit.getLocations(),
+ carbonInputSplit.getLength(), blockletInfos,
carbonInputSplit.getVersion(),
+ carbonInputSplit.getBlockStorageIdMap()));
+ }
}
return tableBlockInfoList;
}
+ private boolean isValidBlockBasedOnUpdateDetails(
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys,
CarbonInputSplit carbonInputSplit,
+ UpdateVO updateDetails, SegmentUpdateStatusManager
updateStatusManager, String segmentId) {
+ String taskID = null;
+ if (null != carbonInputSplit) {
+ if (!updateStatusManager.isBlockValid(segmentId,
carbonInputSplit.getPath().getName())) {
+ return false;
+ }
+
+ if (null == taskKeys) {
+ return true;
+ }
+
+ taskID =
CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+ String bucketNo =
+
CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+ SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+ new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+ String blockTimestamp = carbonInputSplit.getPath().getName()
+ .substring(carbonInputSplit.getPath().getName().lastIndexOf('-')
+ 1,
+ carbonInputSplit.getPath().getName().lastIndexOf('.'));
+ if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+ && Long.parseLong(blockTimestamp) <
updateDetails.getUpdateDeltaStartTimestamp())) {
+ if (!taskKeys.contains(taskBucketHolder)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
/**
* It returns index for each task file.
* @param job
* @param absoluteTableIdentifier
* @param segmentId
* @return
* @throws IOException
+ * @throws IndexBuilderException
*/
private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex>
getSegmentAbstractIndexs(
JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier,
String segmentId,
- CacheClient cacheClient) throws IOException {
+ CacheClient cacheClient, SegmentUpdateStatusManager
updateStatusManager) throws IOException {
Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex>
segmentIndexMap = null;
+ SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+ List<String> updatedTaskList = null;
+ boolean isSegmentUpdated = false;
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
new TableSegmentUniqueIdentifier(absoluteTableIdentifier,
segmentId);
- SegmentTaskIndexWrapper segmentTaskIndexWrapper =
(SegmentTaskIndexWrapper)
+ SegmentStatusManager statusManager = new
SegmentStatusManager(absoluteTableIdentifier);
+ segmentTaskIndexWrapper =
cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+ UpdateVO updateDetails =
updateStatusManager.getInvalidTimestampRange(segmentId);
if (null != segmentTaskIndexWrapper) {
segmentIndexMap =
segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+ taskKeys = segmentIndexMap.keySet();
+ isSegmentUpdated = true;
+ updatedTaskList =
+ statusManager.getUpdatedTasksDetailsForSegment(segmentId,
updateStatusManager);
+ }
}
-
// if segment tree is not loaded, load the segment tree
- if (segmentIndexMap == null) {
- // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
- List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job,
segmentId);
- // getFileStatusOfSegments(job, new int[]{ segmentId },
fileStatusList);
-
- Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new
HashMap<>();
- segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
-
- // get Btree blocks for given segment
-
tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
- segmentTaskIndexWrapper =
-
cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
- segmentIndexMap =
segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ if (segmentIndexMap == null || isSegmentUpdated) {
+ // if the segment is updated only the updated blocks TableInfo
instance has to be
+ // retrieved. the same will be filtered based on taskKeys , if the
task is same
+ // for the block then dont add it since already its btree is loaded.
+ List<TableBlockInfo> tableBlockInfoList =
+ getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys,
updatedTaskList,
+ updateStatusManager.getInvalidTimestampRange(segmentId),
updateStatusManager,
+ segmentId);
+ if (!tableBlockInfoList.isEmpty()) {
+ // getFileStatusOfSegments(job, new int[]{ segmentId },
fileStatusList);
+ Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new
HashMap<>();
+ segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+ // get Btree blocks for given segment
+
tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+ tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+ segmentTaskIndexWrapper =
+
cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+ segmentIndexMap =
segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ }
}
return segmentIndexMap;
}
+ public BlockMappingVO getBlockRowCount(JobContext job,
--- End diff --
Done
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---