This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 316939b [CARBONDATA-4099] Fixed select query on main table with a SI
table in case of concurrent load, compact and clean files operation
316939b is described below
commit 316939bd384fdaaf1b7543d05ec21cce2e7d5f8b
Author: Vikram Ahuja <[email protected]>
AuthorDate: Mon Dec 28 18:20:44 2020 +0530
[CARBONDATA-4099] Fixed select query on main table with a SI table in case
of concurrent load,
compact and clean files operation
Why is this PR needed?
There were 2 issues in the clean files post event listener:
1. In concurrent cases, while writing entry back to the table status file,
wrong path was given,
due to which table status file was not updated in the case of SI table.
2. While writing the loadmetadetails to the table status file during
concurrent scenarios,
we were only writing the unwanted segments and not all the segments, which
could make segments
stale in the SI table
Due to these 2 issues, when selet query is executed on SI table, the
tablestatus would have entry
for a segment but it's carbondata file would be deleted, thus throwing an
IO Exception.
3. Segment ID is null when writing hive table
What changes were proposed in this PR?
1.& 2. Added correct table status path as well sending the correct
loadmetadatadetails to be updated in
the table status file. Now when select query is fired on the SI table, it
will not throw
carbondata file not found exception
3. set the load model after setup job of committer
Does this PR introduce any user interface change?
No
Is any new testcase added?
No
This closes #4066
---
.../apache/carbondata/hive/MapredCarbonOutputCommitter.java | 12 ++++++++----
.../secondaryindex/events/CleanFilesPostEventListener.scala | 7 ++++---
2 files changed, 12 insertions(+), 7 deletions(-)
diff --git
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
index 07297a3..2b265d4 100644
---
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
+++
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -72,12 +72,19 @@ public class MapredCarbonOutputCommitter extends
OutputCommitter {
carbonLoadModel =
(CarbonLoadModel)
ObjectSerializationUtil.convertStringToObject(encodedString);
}
+ boolean setLoadModelToEnv = false;
if (null == carbonLoadModel) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
- String mapReduceMapTaskEnv =
jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
carbonLoadModel =
HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(),
carbonLoadModel);
+ setLoadModelToEnv = true;
+ }
+ carbonOutputCommitter = new CarbonOutputCommitter(new
Path(carbonLoadModel.getTablePath()),
+ context);
+ carbonOutputCommitter.setupJob(jobContext);
+ if (setLoadModelToEnv) {
String loadModelStr =
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+ String mapReduceMapTaskEnv =
jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
// Set the loadModel string to mapreduce.map.env so that it will be
published to all
// containers later during job execution.
jobContext.getJobConf()
@@ -85,9 +92,6 @@ public class MapredCarbonOutputCommitter extends
OutputCommitter {
jobContext.getJobConf()
.set(JobConf.MAPRED_REDUCE_TASK_ENV, mapReduceMapTaskEnv +
",carbon=" + loadModelStr);
}
- carbonOutputCommitter =
- new CarbonOutputCommitter(new Path(carbonLoadModel.getTablePath()),
context);
- carbonOutputCommitter.setupJob(jobContext);
}
@Override
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index e12a35b..2c94609 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock,
LockUsage}
@@ -133,16 +134,16 @@ class CleanFilesPostEventListener extends
OperationEventListener with Logging {
val carbonFile = FileFactory
.getCarbonFile(CarbonTablePath
.getSegmentPath(indexTable.getTablePath, detail.getLoadName))
+ LOGGER.info(s"Deleting segment folder: ${carbonFile.getName}")
CarbonUtil.deleteFoldersAndFiles(carbonFile)
}
unnecessarySegmentsOfSI.foreach { detail =>
detail.setSegmentStatus(segToStatusMap(detail.getLoadName))
detail.setVisibility("false")
}
-
SegmentStatusManager.writeLoadDetailsIntoFile(
- indexTable.getMetadataPath + CarbonTablePath.TABLE_STATUS_FILE,
- unnecessarySegmentsOfSI.toArray)
+ indexTable.getMetadataPath + CarbonCommonConstants.FILE_SEPARATOR +
+ CarbonTablePath.TABLE_STATUS_FILE,
indexTableMetadataDetails.toArray)
} else {
LOGGER.error("Unable to get the lock file for main/Index table. Please
try again later")
}