This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 316939b  [CARBONDATA-4099] Fixed select query on main table with a SI 
table in case of concurrent load, compact and clean files operation
316939b is described below

commit 316939bd384fdaaf1b7543d05ec21cce2e7d5f8b
Author: Vikram Ahuja <[email protected]>
AuthorDate: Mon Dec 28 18:20:44 2020 +0530

    [CARBONDATA-4099] Fixed select query on main table with a SI table in case 
of concurrent load,
    compact and clean files operation
    
    Why is this PR needed?
    There were 2 issues in the clean files post event listener:
    
    1. In concurrent cases, while writing entry back to the table status file, 
wrong path was given,
    due to which table status file was not updated in the case of SI table.
    2. While writing the loadmetadetails to the table status file during 
concurrent scenarios,
    we were only writing the unwanted segments and not all the segments, which 
could make segments
    stale in the SI table
    Due to these 2 issues, when selet query is executed on SI table, the 
tablestatus would have entry
    for a segment but it's carbondata file would be deleted, thus throwing an 
IO Exception.
    3. Segment ID is null when writing hive table
    
    What changes were proposed in this PR?
    1.& 2. Added correct table status path as well sending the correct 
loadmetadatadetails to be updated in
    the table status file. Now when select query is fired on the SI table, it 
will not throw
    carbondata file not found exception
    3. set the load model after setup job of committer
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4066
---
 .../apache/carbondata/hive/MapredCarbonOutputCommitter.java  | 12 ++++++++----
 .../secondaryindex/events/CleanFilesPostEventListener.scala  |  7 ++++---
 2 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
index 07297a3..2b265d4 100644
--- 
a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
+++ 
b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -72,12 +72,19 @@ public class MapredCarbonOutputCommitter extends 
OutputCommitter {
       carbonLoadModel =
           (CarbonLoadModel) 
ObjectSerializationUtil.convertStringToObject(encodedString);
     }
+    boolean setLoadModelToEnv = false;
     if (null == carbonLoadModel) {
       
ThreadLocalSessionInfo.setConfigurationToCurrentThread(jobContext.getConfiguration());
-      String mapReduceMapTaskEnv = 
jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
       carbonLoadModel = 
HiveCarbonUtil.getCarbonLoadModel(jobContext.getConfiguration());
       CarbonTableOutputFormat.setLoadModel(jobContext.getConfiguration(), 
carbonLoadModel);
+      setLoadModelToEnv = true;
+    }
+    carbonOutputCommitter = new CarbonOutputCommitter(new 
Path(carbonLoadModel.getTablePath()),
+        context);
+    carbonOutputCommitter.setupJob(jobContext);
+    if (setLoadModelToEnv) {
       String loadModelStr = 
jobContext.getConfiguration().get(CarbonTableOutputFormat.LOAD_MODEL);
+      String mapReduceMapTaskEnv = 
jobContext.getJobConf().get(JobConf.MAPRED_MAP_TASK_ENV);
       // Set the loadModel string to mapreduce.map.env so that it will be 
published to all
       // containers later during job execution.
       jobContext.getJobConf()
@@ -85,9 +92,6 @@ public class MapredCarbonOutputCommitter extends 
OutputCommitter {
       jobContext.getJobConf()
           .set(JobConf.MAPRED_REDUCE_TASK_ENV, mapReduceMapTaskEnv + 
",carbon=" + loadModelStr);
     }
-    carbonOutputCommitter =
-        new CarbonOutputCommitter(new Path(carbonLoadModel.getTablePath()), 
context);
-    carbonOutputCommitter.setupJob(jobContext);
   }
 
   @Override
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
index e12a35b..2c94609 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, 
LockUsage}
@@ -133,16 +134,16 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
           val carbonFile = FileFactory
             .getCarbonFile(CarbonTablePath
               .getSegmentPath(indexTable.getTablePath, detail.getLoadName))
+          LOGGER.info(s"Deleting segment folder: ${carbonFile.getName}")
           CarbonUtil.deleteFoldersAndFiles(carbonFile)
         }
         unnecessarySegmentsOfSI.foreach { detail =>
           detail.setSegmentStatus(segToStatusMap(detail.getLoadName))
           detail.setVisibility("false")
         }
-
         SegmentStatusManager.writeLoadDetailsIntoFile(
-          indexTable.getMetadataPath + CarbonTablePath.TABLE_STATUS_FILE,
-          unnecessarySegmentsOfSI.toArray)
+          indexTable.getMetadataPath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonTablePath.TABLE_STATUS_FILE, 
indexTableMetadataDetails.toArray)
       } else {
         LOGGER.error("Unable to get the lock file for main/Index table. Please 
try again later")
       }

Reply via email to