This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 4d4826e [HOTFIX] Improve select query after Update/Delete operation.
4d4826e is described below
commit 4d4826e08f7db158a6d39ebc1a2376c3cca50d30
Author: ravipesala <[email protected]>
AuthorDate: Mon Aug 12 15:30:49 2019 +0530
[HOTFIX] Improve select query after Update/Delete operation.
This PR adds mapping of segment path and corresponding delta file paths, so
that avoiding listing these files for every query
This closes #3355
---
.../carbondata/core/mutate/CarbonUpdateUtil.java | 9 ++
.../core/mutate/SegmentUpdateDetails.java | 19 +++++
.../statusmanager/SegmentUpdateStatusManager.java | 96 +++++++++++++---------
3 files changed, 87 insertions(+), 37 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 736def6..340de3e 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -145,6 +145,15 @@ public class CarbonUpdateUtil {
blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
+ // If the start and end time is different then the delta is there
in multiple files so
+ // add them to the list to get the delta files easily with out
listing.
+ if (!blockDetail.getDeleteDeltaStartTimestamp()
+ .equals(blockDetail.getDeleteDeltaEndTimestamp())) {
+
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaStartTimestamp());
+
blockDetail.addDeltaFileStamp(blockDetail.getDeleteDeltaEndTimestamp());
+ } else {
+ blockDetail.setDeltaFileStamps(null);
+ }
} else {
// add the new details to the list.
oldList.add(newBlockEntry);
diff --git
a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
index a6fbb4f..645851d 100644
---
a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
+++
b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
@@ -18,6 +18,8 @@
package org.apache.carbondata.core.mutate;
import java.io.Serializable;
+import java.util.LinkedHashSet;
+import java.util.Set;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -35,6 +37,8 @@ public class SegmentUpdateDetails implements Serializable {
private SegmentStatus segmentStatus;
private String deleteDeltaEndTimestamp = "";
private String deleteDeltaStartTimestamp = "";
+ // Set of delta timestamps to avoid listing the filesystem.
+ private Set<String> deltaFileStamps;
private String actualBlockName;
private String deletedRowsInBlock = "0";
@@ -84,6 +88,21 @@ public class SegmentUpdateDetails implements Serializable {
return this.segmentStatus;
}
+ public Set<String> getDeltaFileStamps() {
+ return deltaFileStamps;
+ }
+
+ public void addDeltaFileStamp(String deltaFileStamp) {
+ if (deltaFileStamps == null) {
+ deltaFileStamps = new LinkedHashSet<>();
+ }
+ deltaFileStamps.add(deltaFileStamp);
+ }
+
+ public void setDeltaFileStamps(Set<String> deltaFileStamps) {
+ this.deltaFileStamps = deltaFileStamps;
+ }
+
@Override public int hashCode() {
final int prime = 31;
int result = segmentName.hashCode();
diff --git
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 601abcd..c329a0a 100644
---
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
/**
@@ -71,12 +72,15 @@ public class SegmentUpdateStatusManager {
private LoadMetadataDetails[] segmentDetails;
private SegmentUpdateDetails[] updateDetails;
private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
- private boolean isStandardTable;
+ /**
+ * It contains the mapping of segment path and corresponding delete delta
file paths,
+ * avoiding listing these files for every query
+ */
+ private Map<String, List<String>> segmentDeleteDeltaListMap = new
HashMap<>();
public SegmentUpdateStatusManager(CarbonTable table,
LoadMetadataDetails[] segmentDetails) {
this.identifier = table.getAbsoluteTableIdentifier();
- this.isStandardTable = CarbonUtil.isStandardCarbonTable(table);
// current it is used only for read function scenarios, as file update
always requires to work
// on latest file status.
this.segmentDetails = segmentDetails;
@@ -86,7 +90,6 @@ public class SegmentUpdateStatusManager {
public SegmentUpdateStatusManager(CarbonTable table) {
this.identifier = table.getAbsoluteTableIdentifier();
- this.isStandardTable = CarbonUtil.isStandardCarbonTable(table);
// current it is used only for read function scenarios, as file update
always requires to work
// on latest file status.
if (!table.getTableInfo().isTransactionalTable()) {
@@ -251,23 +254,23 @@ public class SegmentUpdateStatusManager {
* @throws Exception
*/
public String[] getDeleteDeltaFilePath(String blockFilePath, String
segmentId) throws Exception {
- CarbonFile file = FileFactory.getCarbonFile(blockFilePath);
- return getDeltaFiles(file, segmentId)
+ return getDeltaFiles(blockFilePath, segmentId,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
.toArray(new String[0]);
}
/**
* Returns all delta file paths of specified block
*/
- private List<String> getDeltaFiles(CarbonFile file, String segmentId) throws
Exception {
- String completeBlockName = file.getName();
+ private List<String> getDeltaFiles(String blockPath, String segment, String
extension)
+ throws Exception {
+ Path path = new Path(blockPath);
+ String completeBlockName = path.getName();
String blockNameWithoutExtn =
completeBlockName.substring(0, completeBlockName.lastIndexOf('.'));
//blockName without timestamp
final String blockNameFromTuple =
blockNameWithoutExtn.substring(0,
blockNameWithoutExtn.lastIndexOf("-"));
- return getDeltaFiles(file, blockNameFromTuple,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT,
- segmentId);
+ return getDeltaFiles(path.getParent().toString(), blockNameFromTuple,
extension, segment);
}
/**
@@ -310,7 +313,7 @@ public class SegmentUpdateStatusManager {
* @param segment the segment name
* @return the list of delete file
*/
- private List<String> getDeltaFiles(CarbonFile blockDir, final String
blockNameFromTuple,
+ private List<String> getDeltaFiles(String blockDir, final String
blockNameFromTuple,
final String extension, String segment) throws IOException {
List<String> deleteFileList = new ArrayList<>();
for (SegmentUpdateDetails block : updateDetails) {
@@ -323,50 +326,69 @@ public class SegmentUpdateStatusManager {
return deleteFileList;
}
final long deltaEndTimeStamp = getEndTimeOfDeltaFile(extension, block);
-
- // final long deltaEndTimeStamp = block.getDeleteDeltaEndTimeAsLong();
- // final long deltaStartTimestamp =
block.getDeleteDeltaStartTimeAsLong();
- return getFilePaths(blockDir, blockNameFromTuple, extension,
deleteFileList,
- deltaStartTimestamp, deltaEndTimeStamp);
+ // If start and end time is same then it has only one delta file so
construct the file
+ // directly with available information with out listing
+ if
(block.getDeleteDeltaStartTimestamp().equals(block.getDeleteDeltaEndTimestamp()))
{
+ deleteFileList.add(
+ new
StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
+ .append(block.getBlockName()).append("-")
+
.append(block.getDeleteDeltaStartTimestamp()).append(extension).toString());
+ // If deltatimestamps list has data then it has multiple delta file
so construct the file
+ // directly with list of deltas with out listing
+ } else if (block.getDeltaFileStamps() != null &&
block.getDeltaFileStamps().size() > 0) {
+ for (String delta : block.getDeltaFileStamps()) {
+ deleteFileList.add(
+ new
StringBuilder(blockDir).append(CarbonCommonConstants.FILE_SEPARATOR)
+
.append(block.getBlockName()).append("-").append(delta).append(extension)
+ .toString());
+ }
+ } else {
+ // It is for backward compatability.It lists the files.
+ return getFilePaths(blockDir, blockNameFromTuple, extension,
deleteFileList,
+ deltaStartTimestamp, deltaEndTimeStamp);
+ }
}
}
return deleteFileList;
}
- private List<String> getFilePaths(CarbonFile blockDir, final String
blockNameFromTuple,
+ private List<String> getFilePaths(String blockDir, final String
blockNameFromTuple,
final String extension, List<String> deleteFileList, final long
deltaStartTimestamp,
final long deltaEndTimeStamp) throws IOException {
- if (null != blockDir.getParentFile()) {
- CarbonFile[] files = blockDir.getParentFile().listFiles(new
CarbonFileFilter() {
-
- @Override
- public boolean accept(CarbonFile pathName) {
+ List<String> deltaList = segmentDeleteDeltaListMap.get(blockDir);
+ if (deltaList == null) {
+ CarbonFile[] files = FileFactory.getCarbonFile(blockDir).listFiles(new
CarbonFileFilter() {
+ @Override public boolean accept(CarbonFile pathName) {
String fileName = pathName.getName();
if (fileName.endsWith(extension) && pathName.getSize() > 0) {
- String firstPart = fileName.substring(0,
fileName.lastIndexOf('.'));
- String blockName =
- firstPart.substring(0,
firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
- long timestamp = Long.parseLong(firstPart
- .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN)
+ 1,
- firstPart.length()));
- if (blockNameFromTuple.equals(blockName) && (
- (Long.compare(timestamp, deltaEndTimeStamp) <= 0) && (
- Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
- return true;
- }
+ return true;
}
return false;
}
});
-
+ deltaList = new ArrayList<>(files.length);
for (CarbonFile cfile : files) {
+ deltaList.add(cfile.getCanonicalPath());
+ }
+ segmentDeleteDeltaListMap.put(blockDir, deltaList);
+ }
+ for (String deltaFile : deltaList) {
+ String deltaFilePathName = new Path(deltaFile).getName();
+ String firstPart = deltaFilePathName.substring(0,
deltaFilePathName.lastIndexOf('.'));
+ String blockName =
+ firstPart.substring(0,
firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN));
+ long timestamp = Long.parseLong(firstPart
+ .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
firstPart.length()));
+ // It compares whether this delta file belongs to this block or not. And
also checks that
+ // corresponding delta file is valid or not by considering its load
start and end time with
+ // the file timestamp.
+ if (blockNameFromTuple.equals(blockName) && ((Long.compare(timestamp,
deltaEndTimeStamp) <= 0)
+ && (Long.compare(timestamp, deltaStartTimestamp) >= 0))) {
if (null == deleteFileList) {
- deleteFileList = new ArrayList<String>(files.length);
+ deleteFileList = new ArrayList<String>();
}
- deleteFileList.add(cfile.getCanonicalPath());
+ deleteFileList.add(deltaFile);
}
- } else {
- throw new IOException("Parent file could not found");
}
return deleteFileList;
}