This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 6ab3647 [CARBONDATA-4146]Query fails and the error message "unable to
get file status" is displayed. query is normal after the "drop metacache on
table" command is executed.
6ab3647 is described below
commit 6ab364791b814f099de4473a5de2f73fa769de26
Author: liuhe0702 <[email protected]>
AuthorDate: Wed Mar 10 20:54:25 2021 +0800
[CARBONDATA-4146]Query fails and the error message "unable to get file
status" is displayed.
query is normal after the "drop metacache on table" command is executed.
Why is this PR needed?
During compact execution, the status of the new segment is set to success
before index
files are merged. After index files are merged, the carbonindex files are
deleted.
As a result, the query task cannot find the cached carbonindex files.
What changes were proposed in this PR?
Set the status of the new segment to succeeded after index files are merged.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #4104
---
.../spark/rdd/CarbonTableCompactor.scala | 61 ++++++++++++++--------
.../org/apache/spark/util/MergeIndexUtil.scala | 30 ++++++++++-
.../merger/CompactionResultSortProcessor.java | 7 ++-
.../merger/RowResultMergerProcessor.java | 8 ++-
4 files changed, 81 insertions(+), 25 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 68edd09..3a4df12 100644
---
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -111,6 +111,9 @@ class CarbonTableCompactor(
}
}
try {
+ // need to be cleared for multiple compactions.
+ // only contains the segmentIds which have to be compacted.
+ compactedSegments.clear()
scanSegmentsAndSubmitJob(validSegments, compactedSegments,
compactedLoad)
} catch {
case e: Exception =>
@@ -333,26 +336,46 @@ class CarbonTableCompactor(
if (finalMergeStatus) {
val mergedLoadNumber =
CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
- var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
var segmentFileName: String = null
+
+ val isMergeIndex = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean
+
+ if (compactionType != CompactionType.IUD_DELETE_DELTA && isMergeIndex) {
+ MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
+ }
+
if (carbonTable.isHivePartitionTable) {
- val readPath =
-
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
- CarbonCommonConstants.FILE_SEPARATOR +
carbonLoadModel.getFactTimeStamp + ".tmp"
- // Merge all partition files into a single file.
- segmentFileName =
- mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
- val segmentFile = SegmentFileStore
- .mergeSegmentFiles(readPath,
- segmentFileName,
-
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
- if (segmentFile != null) {
- SegmentFileStore
- .moveFromTempFolder(segmentFile,
- carbonLoadModel.getFactTimeStamp + ".tmp",
- carbonLoadModel.getTablePath)
+ if (isMergeIndex) {
+ val segmentTmpFileName = carbonLoadModel.getFactTimeStamp +
CarbonTablePath.SEGMENT_EXT
+ segmentFileName = mergedLoadNumber +
CarbonCommonConstants.UNDERSCORE + segmentTmpFileName
+ val segmentTmpFile = FileFactory.getCarbonFile(
+ CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
segmentTmpFileName))
+ if (!segmentTmpFile.renameForce(
+ CarbonTablePath.getSegmentFilePath(carbonTable.getTablePath,
segmentFileName))) {
+ throw new Exception(s"Rename segment file from
${segmentTmpFileName} " +
+ s"to ${segmentFileName} failed.")
+ }
+ } else {
+ val readPath =
+
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR +
carbonLoadModel.getFactTimeStamp + ".tmp"
+ // Merge all partition files into a single file.
+ segmentFileName =
+ mergedLoadNumber + CarbonCommonConstants.UNDERSCORE +
carbonLoadModel.getFactTimeStamp
+ val mergedSegmetFile = SegmentFileStore
+ .mergeSegmentFiles(readPath,
+ segmentFileName,
+
CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
+ if (mergedSegmetFile != null) {
+ SegmentFileStore
+ .moveFromTempFolder(mergedSegmetFile,
+ carbonLoadModel.getFactTimeStamp + ".tmp",
+ carbonLoadModel.getTablePath)
+ }
+ segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
}
- segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
} else {
// Get the segment files each updated segment in case of IUD compaction
val segmentMetaDataInfo =
CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
@@ -399,10 +422,6 @@ class CarbonTableCompactor(
s"${ carbonLoadModel.getTableName }")
}
- if (compactionType != CompactionType.IUD_DELETE_DELTA) {
- MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
- }
-
val compactionLoadStatusPostEvent =
AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
carbonTable,
carbonMergerMapping,
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
index 6b0bbb6..e6a2ad3 100644
---
a/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
@@ -30,6 +30,7 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.ObjectSerializationUtil
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
object MergeIndexUtil {
@@ -43,11 +44,36 @@ object MergeIndexUtil {
val sparkSession = compactionCallableModel.sqlContext.sparkSession
if (!carbonTable.isStreamingSink) {
val mergedSegmentIds = getMergedSegmentIds(mergedLoads)
+ val carbonLoadModel = compactionCallableModel.carbonLoadModel
+ val segmentFileNameMap: java.util.Map[String, String] = new
util.HashMap[String, String]()
+ var partitionInfo: java.util.List[String] = null
+ var tempFolderPath: String = null
+ var currPartitionSpec: Option[String] = None
+
+ mergedSegmentIds.foreach{ mergedSegmentId =>
+ segmentFileNameMap.put(mergedSegmentId,
String.valueOf(carbonLoadModel.getFactTimeStamp))
+ }
+
+ if (carbonTable.isHivePartitionTable) {
+ tempFolderPath = carbonLoadModel.getFactTimeStamp.toString + ".tmp"
+ if (compactionCallableModel.compactedPartitions != null) {
+ val partitionSpecs =
compactionCallableModel.compactedPartitions.getOrElse(List.empty)
+ currPartitionSpec =
Some(ObjectSerializationUtil.convertObjectToString(
+ new util.ArrayList(partitionSpecs.asJava)))
+ partitionInfo = partitionSpecs.map(_.getLocation.toString).asJava
+ }
+ }
+
CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
mergedSegmentIds,
- SegmentStatusManager.mapSegmentToStartTime(carbonTable),
+ segmentFileNameMap,
carbonTable.getTablePath,
- carbonTable, false)
+ carbonTable,
+ false,
+ partitionInfo,
+ tempFolderPath,
+ false,
+ currPartitionSpec)
}
}
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 3f27eee..2efd56b 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -190,7 +190,12 @@ public class CompactionResultSortProcessor extends
AbstractResultProcessor {
LOGGER.error(e.getLocalizedMessage(), e);
throw e;
} finally {
- if (partitionSpec != null) {
+ boolean isMergeIndex =
Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+ // mergeIndex is true, the segment file not need to be written
+ // and will be written during merging index
+ if (partitionSpec != null && !isMergeIndex) {
try {
SegmentFileStore
.writeSegmentFile(carbonLoadModel.getTablePath(),
carbonLoadModel.getTaskNo(),
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index c044257..6d8cae7 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,6 +34,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.processing.exception.SliceMergerException;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.sort.CarbonPriorityQueue;
@@ -185,7 +186,12 @@ public class RowResultMergerProcessor extends
AbstractResultProcessor {
if (isDataPresent) {
this.dataHandler.closeHandler();
}
- if (partitionSpec != null) {
+ boolean isMergeIndex =
Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+ CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+ // mergeIndex is true, the segment file not need to be written
+ // and will be written during merging index
+ if (partitionSpec != null && !isMergeIndex) {
SegmentFileStore.writeSegmentFile(loadModel.getTablePath(),
loadModel.getTaskNo(),
partitionSpec.getLocation().toString(),
loadModel.getFactTimeStamp() + "",
partitionSpec.getPartitions());