This is an automated email from the ASF dual-hosted git repository.
liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1d316fc [CARBONDATA-3877] Reduce read tablestatus overhead during
inserting into partition table
1d316fc is described below
commit 1d316fc0ee3af9ecfda7208379051905b8460dde
Author: haomarch <[email protected]>
AuthorDate: Sun Jun 28 19:21:19 2020 +0800
[CARBONDATA-3877] Reduce read tablestatus overhead during inserting into
partition table
Why is this PR needed?
Currently during inserting into a partition table, there are a lot of
tablestauts read operations, but when storing table status file in object
store, reading of table status file may fail (receive IOException or
JsonSyntaxException) when table status file is being modifying, which leading
to High failure rate when concurrent insert into a partition table.
What changes were proposed in this PR?
(1) Three codes was removed:calcute sizeinbytes, clean segments,
deleteLoadsAndUpdateMetadata
'calcute sizeinbytes' is useless during inserting into flow. 'clean
segments' and 'deleteLoadsAndUpdateMetadata' are supported by 'clean files'
command, which can be removed from inserting into flow.
(2) Reduce duplicate tablestatus operations and limit the conditions to get
tablestatus.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3800
---
.../carbondata/core/metadata/SegmentFileStore.java | 17 +++++++++++++++++
.../hadoop/api/CarbonOutputCommitter.java | 11 +++++++----
.../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 7 ++++---
.../management/CarbonInsertIntoCommand.scala | 2 --
.../command/management/CommonLoadUtils.scala | 5 +----
.../datasources/SparkCarbonTableFormat.scala | 21 ++++++++++++++++-----
.../SILoadEventListenerForFailedSegments.scala | 3 ++-
.../loading/TableProcessingOperations.java | 3 +--
8 files changed, 48 insertions(+), 21 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 335e0f5..2f274c3 100644
---
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1149,6 +1149,23 @@ public class SegmentFileStore {
* @throws IOException
*/
public static List<PartitionSpec> getPartitionSpecs(String segmentId, String
tablePath,
+ String segmentFilePath, String loadStartTime) throws IOException {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath,
segmentFilePath);
+ List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+ for (PartitionSpec spec : partitionSpecs) {
+ spec.setUuid(segmentId + "_" + loadStartTime);
+ }
+ return partitionSpecs;
+ }
+
+ /**
+ * Get the partition specs of the segment
+ * @param segmentId
+ * @param tablePath
+ * @return
+ * @throws IOException
+ */
+ public static List<PartitionSpec> getPartitionSpecs(String segmentId, String
tablePath,
LoadMetadataDetails[] details)
throws IOException {
LoadMetadataDetails segEntry = null;
diff --git
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 02c8d4c..4b8fc43 100644
---
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -232,10 +232,13 @@ public class CarbonOutputCommitter extends
FileOutputCommitter {
String segmentsToBeDeleted =
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED,
"");
List<Segment> segmentDeleteList =
Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
- Set<Segment> segmentSet = new HashSet<>(
- new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
-
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
- .getValidSegments());
+ Set<Segment> segmentSet = new HashSet<>();
+ if (updateTime != null || uniqueId != null) {
+ segmentSet = new HashSet<>(
+ new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
+ .getValidSegments());
+ }
if (updateTime != null) {
CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable,
updateTime, true,
segmentDeleteList);
diff --git
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index ebac5e4..695ee27 100644
---
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -229,14 +229,15 @@ class CarbonMergeFilesRDD(
override def internalGetPartitions: Array[Partition] = {
if (isHivePartitionedTable) {
- val metadataDetails = SegmentStatusManager
-
.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
// in case of partition table make rdd partitions per partition of the
carbon table
val partitionPaths: java.util.Map[String, java.util.List[String]] = new
java.util.HashMap()
if (partitionInfo == null || partitionInfo.isEmpty) {
segments.foreach(segment => {
+ val loadStartTime = segmentFileNameToSegmentIdMap.get(segment)
+ val segmentFileName = SegmentFileStore.genSegmentFileName(
+ segment, loadStartTime) + CarbonTablePath.SEGMENT_EXT
val partitionSpecs = SegmentFileStore
- .getPartitionSpecs(segment, carbonTable.getTablePath,
metadataDetails)
+ .getPartitionSpecs(segment, carbonTable.getTablePath,
segmentFileName, loadStartTime)
.asScala.map(_.getLocation.toString)
partitionPaths.put(segment, partitionSpecs.asJava)
})
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 8dfad76..8c14917 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -202,8 +202,6 @@ case class CarbonInsertIntoCommand(databaseNameOp:
Option[String],
updateModel = None,
operationContext = operationContext)
- // Clean up the old invalid segment data before creating a new entry for
new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false,
currPartitions)
// add the start entry for the new load in the table status file
if (!table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 20d29d8..71649b8 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -126,7 +126,6 @@ object CommonLoadUtils {
TableIdentifier(tableName, databaseNameOp))).collect {
case l: LogicalRelation => l
}.head
- sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
finalPartition = getCompletePartitionValues(partition, table)
}
(sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition)
@@ -843,8 +842,6 @@ object CommonLoadUtils {
def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
val table =
loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val catalogTable: CatalogTable =
loadParams.logicalPartitionRelation.catalogTable.get
- // Clean up the already dropped partitioned data
- SegmentFileStore.cleanSegments(table, null, false)
CarbonUtils.threadSet("partition.operationcontext",
loadParams.operationContext)
val attributes = if (loadParams.scanResultRDD.isDefined) {
// take the already re-arranged attributes
@@ -1102,7 +1099,7 @@ object CommonLoadUtils {
val specs =
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
loadParams.carbonLoadModel.getTablePath,
-
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath)))
+ loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
if (specs != null) {
specs.asScala.map { spec =>
Row(spec.getPartitions.asScala.mkString("/"),
spec.getLocation.toString, spec.getUuid)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index f1f0b80..225daf5 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -159,13 +159,16 @@ with Serializable {
if (currEntry != null) {
val loadEntry =
ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
- val details =
-
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
model.setSegmentId(loadEntry.getLoadName)
model.setFactTimeStamp(loadEntry.getLoadStartTime)
- val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
- list.add(loadEntry)
- model.setLoadMetadataDetails(list)
+ if (!isLoadDetailsContainTheCurrentEntry(
+ model.getLoadMetadataDetails.asScala.toArray, loadEntry)) {
+ val details =
+
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+ val list = new
util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+ list.add(loadEntry)
+ model.setLoadMetadataDetails(list)
+ }
}
// Set the update timestamp if user sets in case of update query. It needs
to be updated
// in load status update time
@@ -224,6 +227,14 @@ with Serializable {
}
}
override def equals(other: Any): Boolean =
other.isInstanceOf[SparkCarbonTableFormat]
+
+ private def isLoadDetailsContainTheCurrentEntry(
+ loadDetails: Array[LoadMetadataDetails],
+ currentEntry: LoadMetadataDetails): Boolean = {
+ (loadDetails.length - 1 to 0).exists { index =>
+ loadDetails(index).getLoadName.equals(currentEntry.getLoadName)
+ }
+ }
}
case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String,
isAppend: Boolean)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2e6a441..2071385 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -64,7 +64,6 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
.lookupRelation(Some(carbonLoadModel.getDatabaseName),
carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
val indexMetadata = carbonTable.getIndexMetadata
- val mainTableDetails =
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val secondaryIndexProvider = IndexType.SI.getIndexProviderName
if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
@@ -72,6 +71,8 @@ class SILoadEventListenerForFailedSegments extends
OperationEventListener with L
.get(secondaryIndexProvider).keySet().asScala
// if there are no index tables for a given fact table do not
perform any action
if (indexTables.nonEmpty) {
+ val mainTableDetails =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
indexTables.foreach {
indexTableName =>
val isLoadSIForFailedSegments =
sparkSession.sessionState.catalog
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index 358295d..f0f14d6 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -54,11 +54,10 @@ public class TableProcessingOperations {
public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
final boolean isCompactionFlow) throws IOException {
String metaDataLocation = carbonTable.getMetadataPath();
- final LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(metaDataLocation);
-
//delete folder which metadata no exist in tablestatus
String partitionPath =
CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
if (FileFactory.isFileExist(partitionPath)) {
+ final LoadMetadataDetails[] details =
SegmentStatusManager.readLoadMetadata(metaDataLocation);
CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
@Override