[CARBONDATA-1667] Remove direct load related code This closes #1465
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f689719 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f689719 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f689719 Branch: refs/heads/pre-aggregate Commit: 6f6897191819994da2066584721c462f03184cc6 Parents: f812e41 Author: Jacky Li <[email protected]> Authored: Sat Nov 4 18:53:46 2017 +0800 Committer: QiangCai <[email protected]> Committed: Mon Nov 6 12:21:32 2017 +0800 ---------------------------------------------------------------------- .../presto/util/CarbonDataStoreCreator.scala | 1 - .../spark/rdd/NewCarbonDataLoadRDD.scala | 35 ++++---------- .../carbondata/spark/util/DataLoadingUtil.scala | 1 - .../spark/rdd/CarbonDataRDDFactory.scala | 49 ++++++-------------- .../loading/model/CarbonLoadModel.java | 11 ----- 5 files changed, 25 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index acee71b..09cddfe 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -118,7 +118,6 @@ object CarbonDataStoreCreator { loadModel.setBadRecordsAction( TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + "force") - loadModel.setDirectLoad(true) loadModel.setIsEmptyDataBadRecord( DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 49b708c..1d6ad70 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -196,13 +196,7 @@ class NewCarbonDataLoadRDD[K, V]( if (isTableSplitPartition) { // for table split partition var splits: Array[TableSplit] = null - - if (carbonLoadModel.isDirectLoad) { - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) - } else { - splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null) - } + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) splits.zipWithIndex.map { s => // filter the same partition unique id, because only one will match, so get 0 element @@ -289,15 +283,10 @@ class NewCarbonDataLoadRDD[K, V]( val split = theSplit.asInstanceOf[CarbonTableSplitPartition] logInfo("Input split: " + split.serializableHadoopSplit.value) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) - if (carbonLoadModel.isDirectLoad) { - model = carbonLoadModel.getCopyWithPartition( - split.serializableHadoopSplit.value.getPartition.getUniqueID, - split.serializableHadoopSplit.value.getPartition.getFilesPath, - carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) - } else { - model = carbonLoadModel.getCopyWithPartition( - split.serializableHadoopSplit.value.getPartition.getUniqueID) - } + model = carbonLoadModel.getCopyWithPartition( + split.serializableHadoopSplit.value.getPartition.getUniqueID, + split.serializableHadoopSplit.value.getPartition.getFilesPath, + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID StandardLogService.setThreadName(StandardLogService .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName) @@ -320,15 +309,11 @@ class NewCarbonDataLoadRDD[K, V]( split.serializableHadoopSplit, split.nodeBlocksDetail.length) val blocksID = gernerateBlocksID carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) - if (carbonLoadModel.isDirectLoad) { - val filelist: java.util.List[String] = new java.util.ArrayList[String]( - CarbonCommonConstants.CONSTANT_SIZE_TEN) - CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",") - model = carbonLoadModel.getCopyWithPartition(partitionID, filelist, - carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) - } else { - model = carbonLoadModel.getCopyWithPartition(partitionID) - } + val filelist: java.util.List[String] = new java.util.ArrayList[String]( + CarbonCommonConstants.CONSTANT_SIZE_TEN) + CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",") + model = carbonLoadModel.getCopyWithPartition(partitionID, filelist, + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) StandardLogService.setThreadName(StandardLogService .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName) , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "") http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala index 445fdbb..5a24d7d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala @@ -285,7 +285,6 @@ object DataLoadingUtil { carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)) carbonLoadModel.setCsvHeader(fileHeader) carbonLoadModel.setColDictFilePath(column_dict) - carbonLoadModel.setDirectLoad(true) carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel)) val validatedMaxColumns = CommonUtil.validateMaxColumns( http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 628d444..cfd8cff 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -653,40 +653,21 @@ object CarbonDataRDDFactory { * 3) output Array[(partitionID,Array[BlockDetails])] to blocksGroupBy */ var splits = Array[TableSplit]() - if (carbonLoadModel.isDirectLoad) { - // get all table Splits, this part means files were divide to different partitions - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) - // get all partition blocks from file list - blocksGroupBy = splits.map { - split => - val pathBuilder = new StringBuilder() - for (path <- split.getPartition.getFilesPath.asScala) { - pathBuilder.append(path).append(",") - } - if (pathBuilder.nonEmpty) { - pathBuilder.substring(0, pathBuilder.size - 1) - } - (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(), - sqlContext.sparkContext - )) - } - } else { - // get all table Splits,when come to this, means data have been partition - splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null) - // get all partition blocks from factFilePath/uniqueID/ - blocksGroupBy = splits.map { - split => - val pathBuilder = new StringBuilder() - pathBuilder.append(carbonLoadModel.getFactFilePath) - if (!carbonLoadModel.getFactFilePath.endsWith("/") - && !carbonLoadModel.getFactFilePath.endsWith("\\")) { - pathBuilder.append("/") - } - pathBuilder.append(split.getPartition.getUniqueID).append("/") - (split.getPartition.getUniqueID, - SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext)) - } + // get all table Splits, this part means files were divide to different partitions + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) + // get all partition blocks from file list + blocksGroupBy = splits.map { + split => + val pathBuilder = new StringBuilder() + for (path <- split.getPartition.getFilesPath.asScala) { + pathBuilder.append(path).append(",") + } + if (pathBuilder.nonEmpty) { + pathBuilder.substring(0, pathBuilder.size - 1) + } + (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(), + sqlContext.sparkContext + )) } } else { /* http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f689719/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index 39ee270..6a156a6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -53,7 +53,6 @@ public class CarbonLoadModel implements Serializable { private String complexDelimiterLevel1; private String complexDelimiterLevel2; - private boolean isDirectLoad; private List<LoadMetadataDetails> loadMetadataDetails; private transient SegmentUpdateStatusManager segmentUpdateStatusManager; @@ -206,14 +205,6 @@ public class CarbonLoadModel implements Serializable { this.complexDelimiterLevel2 = complexDelimiterLevel2; } - public boolean isDirectLoad() { - return isDirectLoad; - } - - public void setDirectLoad(boolean isDirectLoad) { - this.isDirectLoad = isDirectLoad; - } - public String getAllDictPath() { return allDictPath; } @@ -383,7 +374,6 @@ public class CarbonLoadModel implements Serializable { copy.isRetentionRequest = isRetentionRequest; copy.csvHeader = csvHeader; copy.csvHeaderColumns = csvHeaderColumns; - copy.isDirectLoad = isDirectLoad; copy.csvDelimiter = csvDelimiter; copy.complexDelimiterLevel1 = complexDelimiterLevel1; copy.complexDelimiterLevel2 = complexDelimiterLevel2; @@ -434,7 +424,6 @@ public class CarbonLoadModel implements Serializable { copyObj.carbonDataLoadSchema = carbonDataLoadSchema; copyObj.csvHeader = header; copyObj.csvHeaderColumns = csvHeaderColumns; - copyObj.isDirectLoad = true; copyObj.csvDelimiter = delimiter; copyObj.complexDelimiterLevel1 = complexDelimiterLevel1; copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
