Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2374#discussion_r196510278
--- Diff:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java ---
@@ -515,12 +574,73 @@ private CarbonInputSplit
convertToCarbonInputSplit(ExtendedBlocklet blocklet) th
return split;
}
+ private List<CarbonInputSplit>
convertToInputSplit4ExternalFormat(JobContext jobContext,
+ ExtendedBlocklet extendedBlocklet) throws IOException {
+ List<CarbonInputSplit> splits = new ArrayList<CarbonInputSplit>();
+ String factFilePath = extendedBlocklet.getFilePath();
+ Path path = new Path(factFilePath);
+ FileSystem fs = FileFactory.getFileSystem(path);
+ FileStatus fileStatus = fs.getFileStatus(path);
+ long length = fileStatus.getLen();
+ if (length != 0) {
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(path, 0,
length);
+ long blkSize = fileStatus.getBlockSize();
+ long minSplitSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(jobContext));
+ long maxSplitSize = getMaxSplitSize(jobContext);
+ long splitSize = computeSplitSize(blkSize, minSplitSize,
maxSplitSize);
+ long bytesRemaining = fileStatus.getLen();
+ while (((double) bytesRemaining) / splitSize > 1.1) {
+ int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
+ splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(),
path,
+ length - bytesRemaining,
+ splitSize, blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
+ bytesRemaining -= splitSize;
+ }
+ if (bytesRemaining != 0) {
+ int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
+ splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(),
path,
+ length - bytesRemaining,
+ bytesRemaining, blkLocations[blkIndex].getHosts(),
+ blkLocations[blkIndex].getCachedHosts(), FileFormat.EXTERNAL));
+ }
+ } else {
+ splits.add(new CarbonInputSplit(extendedBlocklet.getSegmentId(),
path, 0, length,
+ new String[0], FileFormat.EXTERNAL));
+ }
+ return splits;
+ }
+
@Override public RecordReader<Void, T> createRecordReader(InputSplit
inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException,
InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
QueryModel queryModel = createQueryModel(inputSplit,
taskAttemptContext);
CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
- return new CarbonRecordReader<T>(queryModel, readSupport);
+ if (inputSplit instanceof CarbonMultiBlockSplit
+ && ((CarbonMultiBlockSplit) inputSplit).getFileFormat() ==
FileFormat.EXTERNAL) {
+ return createRecordReaderForExternalFormat(queryModel, readSupport,
+
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
+ } else if (inputSplit instanceof CarbonInputSplit
+ && ((CarbonInputSplit) inputSplit).getFileFormat() ==
FileFormat.EXTERNAL) {
+ return createRecordReaderForExternalFormat(queryModel, readSupport,
+
configuration.get(CarbonCommonConstants.CARBON_EXTERNAL_FORMAT_CONF_KEY));
+ } else {
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+ }
+
+ @Since("1.4.1")
--- End diff --
I think for private method, this annotation is not required
---