This is an automated email from the ASF dual-hosted git repository.
indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new f5118f9 [CARBONDATA-4027] Fix the wrong modifiedtime of loading files
in insert stage
f5118f9 is described below
commit f5118f95362b601776bcf728769a62f60fce8d16
Author: haomarch <[email protected]>
AuthorDate: Mon Oct 12 09:58:58 2020 +0800
[CARBONDATA-4027] Fix the wrong modifiedtime of loading files in insert
stage
Why is this PR needed?
ISSUE1: In the insertstage flow, there is a empty file with suffix
'.loading'
to mark the stage in the status of 'in processing'. We update the
modifiedtime
of '.loading' file for monitoring the insertstage start time, which can be
used
for calculate TIMEOUT, help to retry and recovery. Before, we use
setModifiedTime
function to update the modifiedtime, which has a serious bug. For S3 file,
setModifiedTime operation do not take effect. leading to the incorrect
inserstage starttime of 'loading' file.
ISSUE2: For now, Insertstage non-parttion table will not merge index files,
which will degrade the query performance heavily.
What changes were proposed in this PR?
1. Update the modifiedtime of loading files based on recreating files.
2. Trigger indexfiles merging after insertstage non-parttiontable.
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #3977
---
.../core/constants/CarbonCommonConstants.java | 5 -----
.../org/apache/carbondata/core/view/MVProvider.java | 12 +++++-------
.../org/apache/carbon/flink/TestCarbonWriter.scala | 15 +++++++++++++++
.../apache/carbondata/hive/util/HiveCarbonUtil.java | 1 -
.../spark/sql/events/MergeIndexEventListener.scala | 5 -----
.../management/CarbonInsertFromStageCommand.scala | 21 ++++++++++-----------
6 files changed, 30 insertions(+), 29 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 15139ec..beaed60 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2523,11 +2523,6 @@ public final class CarbonCommonConstants {
public static final String INDEX_STATUS = "index_status";
/**
- * property which defines the insert stage flow
- */
- public static final String IS_INSERT_STAGE = "is_insert_stage";
-
- /**
* index server temp folder aging period
*/
@CarbonProperty
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index 1259f91..e21127f 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -545,14 +545,12 @@ public class MVProvider {
FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
- if (!FileFactory.isFileExist(this.schemaIndexFilePath)) {
- FileFactory.createNewFile(
- this.schemaIndexFilePath,
- new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ CarbonFile schemaIndexFile =
FileFactory.getCarbonFile(this.schemaIndexFilePath);
+ if (schemaIndexFile.exists()) {
+ schemaIndexFile.delete();
}
- long lastModifiedTime = System.currentTimeMillis();
-
FileFactory.getCarbonFile(this.schemaIndexFilePath).setLastModifiedTime(lastModifiedTime);
- this.lastModifiedTime = lastModifiedTime;
+ schemaIndexFile.createNewFile(new FsPermission(FsAction.ALL,
FsAction.ALL, FsAction.ALL));
+ this.lastModifiedTime = schemaIndexFile.getLastModifiedTime();
}
}
diff --git
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index aa6d440..4ea6896 100644
---
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -75,6 +75,7 @@ class TestCarbonWriter extends QueryTest with
BeforeAndAfterAll{
checkAnswer(sql(s"select count(intField) from $tableName where intField
>= 900"),
Seq(Row(100)))
checkIfStageFilesAreDeleted(tablePath)
+ assert(getMergeIndexFileCount(tableName, "0") == 1)
}
}
@@ -445,4 +446,18 @@ class TestCarbonWriter extends QueryTest with
BeforeAndAfterAll{
assert(nowtime <= lasttime)
}
}
+
+ private def getMergeIndexFileCount(tableName: String, segment: String): Int
= {
+ val table = CarbonEnv.getCarbonTable(None,
tableName)(sqlContext.sparkSession)
+ var path = CarbonTablePath
+ .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+ if (table.isHivePartitionTable) {
+ path = table.getAbsoluteTableIdentifier.getTablePath
+ }
+ val mergeIndexFiles = FileFactory.getCarbonFile(path).listFiles(true, new
CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean =
+ file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+ })
+ mergeIndexFiles.size()
+ }
}
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
index 973afe8..d17a8a3 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/util/HiveCarbonUtil.java
@@ -257,7 +257,6 @@ public class HiveCarbonUtil {
.fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName()));
thriftWriter.close();
-
FileFactory.getCarbonFile(schemaFilePath).setLastModifiedTime(System.currentTimeMillis());
}
public static HiveMetaHook getMetaHook() {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 58ce582..3872db5 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -43,11 +43,6 @@ class MergeIndexEventListener extends OperationEventListener
with Logging {
override def onEvent(event: Event, operationContext: OperationContext): Unit
= {
event match {
case preStatusUpdateEvent: LoadTablePreStatusUpdateEvent =>
- // skip merge index in case of insert stage flow
- if (null !=
operationContext.getProperty(CarbonCommonConstants.IS_INSERT_STAGE) &&
-
operationContext.getProperty(CarbonCommonConstants.IS_INSERT_STAGE).equals("true"))
{
- return
- }
LOGGER.info("Load post status event-listener called for merge index")
val loadModel = preStatusUpdateEvent.getCarbonLoadModel
val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index 9fdfe0d..c5f08d9 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -333,8 +333,6 @@ case class CarbonInsertFromStageCommand(
LOGGER.info(s"finish data loading, time taken
${System.currentTimeMillis() - start}ms")
// 4) write segment file and update the segment entry to SUCCESS
- val segmentFileName = SegmentFileStore.writeSegmentFile(
- table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
// create operationContext to fire load events
val operationContext: OperationContext = new OperationContext
val (tableIndexes, indexOperationContext) =
CommonLoadUtils.firePreLoadEvents(
@@ -350,13 +348,15 @@ case class CarbonInsertFromStageCommand(
operationContext = operationContext)
// in case of insert stage files, added the below property to avoid
merge index and
// fire event to load data to secondary index
- operationContext.setProperty(CarbonCommonConstants.IS_INSERT_STAGE,
"true")
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
new LoadTablePreStatusUpdateEvent(
table.getCarbonTableIdentifier,
loadModel)
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent,
operationContext)
+ val segmentFileName = SegmentFileStore.writeSegmentFile(
+ table, loadModel.getSegmentId, loadModel.getFactTimeStamp.toString)
+
val status = SegmentFileStore.updateTableStatusFile(
table, loadModel.getSegmentId, segmentFileName,
table.getCarbonTableIdentifier.getTableId,
@@ -538,14 +538,13 @@ case class CarbonInsertFromStageCommand(
val stageLoadingFile =
FileFactory.getCarbonFile(stagePath +
File.separator + files._1.getName +
CarbonTablePath.LOADING_FILE_SUFFIX);
- // Try to create loading files
- // make isFailed to be true if createNewFile return false.
- // the reason can be file exists or exceptions.
- var isFailed = !stageLoadingFile.createNewFile()
- // if file exists, modify the lastmodifiedtime of the file.
- if (isFailed) {
- // make isFailed to be true if setLastModifiedTime return false.
- isFailed =
!stageLoadingFile.setLastModifiedTime(System.currentTimeMillis());
+ // Try to recreate loading files if the loading file exists
+ // or create loading files directly if the loading file doesn't
exist
+ // set isFailed to be false when (delete and) createfile success
+ val isFailed = if (stageLoadingFile.exists()) {
+ !(stageLoadingFile.delete() && stageLoadingFile.createNewFile())
+ } else {
+ !stageLoadingFile.createNewFile()
}
(files._1, files._2, isFailed)
} catch {