Indhumathi27 commented on a change in pull request #3776:
URL: https://github.com/apache/carbondata/pull/3776#discussion_r434305132
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
+ String segmentFileName, String partitionPath) throws IOException {
+ Map<String, String> IndexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
Review comment:
```suggestion
Map<String, String> indexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
```
##########
File path:
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
##########
@@ -566,6 +567,18 @@ class StandardPartitionTableLoadingTestCase extends
QueryTest with BeforeAndAfte
assert(ex.getMessage().equalsIgnoreCase("Cannot use all columns for
partition columns;"))
}
+ test("test partition without merge index files for segment") {
+ sql("DROP TABLE IF EXISTS new_par")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
"false")
+ sql("CREATE TABLE new_par (a INT, b INT) PARTITIONED BY (country STRING)
STORED AS carbondata")
+ sql("INSERT INTO new_par PARTITION(country='India') SELECT 1,2")
+ sql("INSERT INTO new_par PARTITION(country='India') SELECT 3,4")
+ sql("INSERT INTO new_par PARTITION(country='China') SELECT 5,6")
+ sql("INSERT INTO new_par PARTITION(country='China') SELECT 7,8")
+ checkAnswer(sql("SELECT COUNT(*) FROM new_par"), Seq(Row(4)))
Review comment:
please check for index files also
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
Review comment:
Please add method description, as why this is required
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
+ String segmentFileName, String partitionPath) throws IOException {
+ Map<String, String> IndexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
+
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+ List<String> partitionList =
+ (List<String>)
ObjectSerializationUtil.convertStringToObject(partitionPath);
+ SegmentFileStore.SegmentFile finalSegmentFile = null;
+ boolean isRelativePath;
+ String path;
+ for (String partition : partitionList) {
+ isRelativePath = false;
+ path = partition;
+ if (path.startsWith(loadModel.getTablePath())) {
+ path = path.substring(loadModel.getTablePath().length());
+ isRelativePath = true;
+ }
+ SegmentFileStore.SegmentFile segmentFile = new
SegmentFileStore.SegmentFile();
+ SegmentFileStore.FolderDetails folderDetails = new
SegmentFileStore.FolderDetails();
+ Set<String> set = new HashSet<String>();
Review comment:
Rename `set` variable
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
+ String segmentFileName, String partitionPath) throws IOException {
+ Map<String, String> IndexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
+
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+ List<String> partitionList =
+ (List<String>)
ObjectSerializationUtil.convertStringToObject(partitionPath);
+ SegmentFileStore.SegmentFile finalSegmentFile = null;
+ boolean isRelativePath;
+ String path;
+ for (String partition : partitionList) {
+ isRelativePath = false;
+ path = partition;
+ if (path.startsWith(loadModel.getTablePath())) {
+ path = path.substring(loadModel.getTablePath().length());
+ isRelativePath = true;
+ }
+ SegmentFileStore.SegmentFile segmentFile = new
SegmentFileStore.SegmentFile();
+ SegmentFileStore.FolderDetails folderDetails = new
SegmentFileStore.FolderDetails();
+ Set<String> set = new HashSet<String>();
Review comment:
```suggestion
Set<String> set =
Collections.singleton(IndexFileNameMap.get(partition));
```
and remove next line
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
+ String segmentFileName, String partitionPath) throws IOException {
+ Map<String, String> IndexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
+
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+ List<String> partitionList =
+ (List<String>)
ObjectSerializationUtil.convertStringToObject(partitionPath);
+ SegmentFileStore.SegmentFile finalSegmentFile = null;
+ boolean isRelativePath;
+ String path;
Review comment:
Can rename to partitionLoc
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -266,7 +270,16 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
&& operationContext != null) {
uuid = operationContext.getProperty("uuid").toString();
}
+ String segmentFileName = SegmentFileStore.genSegmentFileName(
+ loadModel.getSegmentId(),
String.valueOf(loadModel.getFactTimeStamp()));
+ boolean isMergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance()
Review comment:
Variable `isMergeIndexEnabled` is already defined in caller method Line
No:192. Please move the code before calling `commitJobForPartition` and reuse
##########
File path:
core/src/main/java/org/apache/carbondata/core/preagg/TimeSeriesUDF.java
##########
@@ -176,9 +176,9 @@ private void initialize() {
} catch (IllegalArgumentException ex) {
LOGGER.warn("Invalid value set for first of the week. Considering the
default value as: "
+ CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT);
- firstDayOfWeek = DaysOfWeekEnum.valueOf(CarbonProperties.getInstance()
-
.getProperty(CarbonCommonConstants.CARBON_TIMESERIES_FIRST_DAY_OF_WEEK_DEFAULT)
- .toUpperCase()).getOrdinal();
+ firstDayOfWeek =
Review comment:
please add why this is modified in PR description
##########
File path:
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
##########
@@ -302,6 +317,54 @@ private void commitJobForPartition(JobContext context,
boolean overwriteSet,
commitJobFinal(context, loadModel, operationContext, carbonTable,
uniqueId);
}
+ @SuppressWarnings("unchecked")
+ private void writeSegmentWithoutMergeIndex(JobContext context,
CarbonLoadModel loadModel,
+ String segmentFileName, String partitionPath) throws IOException {
+ Map<String, String> IndexFileNameMap = (Map<String, String>)
ObjectSerializationUtil
+
.convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
+ List<String> partitionList =
+ (List<String>)
ObjectSerializationUtil.convertStringToObject(partitionPath);
+ SegmentFileStore.SegmentFile finalSegmentFile = null;
+ boolean isRelativePath;
+ String path;
+ for (String partition : partitionList) {
+ isRelativePath = false;
+ path = partition;
+ if (path.startsWith(loadModel.getTablePath())) {
+ path = path.substring(loadModel.getTablePath().length());
+ isRelativePath = true;
+ }
+ SegmentFileStore.SegmentFile segmentFile = new
SegmentFileStore.SegmentFile();
+ SegmentFileStore.FolderDetails folderDetails = new
SegmentFileStore.FolderDetails();
+ Set<String> set = new HashSet<String>();
+ set.add(IndexFileNameMap.get(partition));
+ folderDetails.setFiles(set);
+ List<String> partitions = new ArrayList<String>();
Review comment:
```suggestion
List<String> partitions =
Collections.singletonList(path.substring(path.indexOf("/") + 1));```
and remove next line
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]