This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 35f1501  [CARBONDATA-3483] don't require update.lock and 
compaction.lock again when execute 'IUD_UPDDEL_DELTA' compaction
35f1501 is described below

commit 35f1501ff38023bf09f108ffb984699abdd6f639
Author: Zhang Zhichao <[email protected]>
AuthorDate: Thu Aug 1 09:53:14 2019 +0800

    [CARBONDATA-3483] don't require update.lock and compaction.lock again when 
execute 'IUD_UPDDEL_DELTA' compaction
    
    Problem:
    After PR#3166, horizontal compaction will not actually run when execute 
update sql.
    When it runs update sql and will run horizontal compaction if needs, it 
will require update.lock and compaction.lock when execute 
CarbonAlterTableCompactionCommand.alterTableForCompaction, but these two locks 
already are locked when it starts to execute update sql. so it will require 
locks failed and can't execute compaction.
    
    Solution:
    Don't require update.lock and compaction.lock again when execute 
'IUD_UPDDEL_DELTA' compaction
    
    This closes #3343
---
 .../iud/HorizontalCompactionTestCase.scala         | 64 +++++++++++++++++++++-
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  |  2 +
 .../spark/rdd/CarbonDataRDDFactory.scala           |  4 +-
 .../CarbonAlterTableCompactionCommand.scala        | 47 +++++++++-------
 4 files changed, 95 insertions(+), 22 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
index 24c8f17..2852501 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
@@ -20,10 +20,14 @@ package org.apache.carbondata.spark.testsuite.iud
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.test.util.QueryTest
 
-
 class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
     CarbonProperties.getInstance()
@@ -383,6 +387,64 @@ class HorizontalCompactionTestCase extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(sql("select longfield from 
customer1"),Seq(Row(9223372036854775807L),Row(9223372036854775807L),Row(9223372036854775807L),Row(9223372036854775807L)))
 
   }
+
+  def getDeltaFiles(carbonFile: CarbonFile, fileSuffix: String): 
Array[CarbonFile] = {
+    return carbonFile.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(fileSuffix)
+      }
+    })
+  }
+
+  test("[CARBONDATA-3483] Don't require update.lock and compaction.lock again 
when execute 'IUD_UPDDEL_DELTA' compaction") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "1")
+    sql("""drop database if exists iud10 cascade""")
+    sql("""create database iud10""")
+    sql("""use iud10""")
+
+    sql(
+      """create table dest10 (c1 string,c2 int,c3 string,c5 string) STORED BY 
'org.apache.carbondata.format'""")
+    sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table 
dest10""")
+
+    val carbonTable = CarbonEnv.getCarbonTable(Some("iud10"), 
"dest10")(sqlContext.sparkSession)
+    val identifier = carbonTable.getAbsoluteTableIdentifier()
+    val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath, 
"0")
+    val carbonFile =
+        FileFactory.getCarbonFile(dataFilesDir, 
FileFactory.getFileType(dataFilesDir))
+
+    var updateDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    var deletaDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    assert(updateDeltaFiles.length == 0)
+    assert(deletaDeltaFiles.length == 0)
+
+    sql("""update dest10 set (c1, c3) = ('update_a', 'update_aa') where c2 = 3 
or c2 = 6""").show()
+
+    updateDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    deletaDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    // just update once, there is no horizontal compaction at this time
+    assert(updateDeltaFiles.length == 1)
+    assert(deletaDeltaFiles.length == 1)
+
+    sql("""update dest10 set (c1, c3) = ('update_a', 'update_aa') where c2 = 5 
or c2 = 8""").show()
+
+    updateDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    deletaDeltaFiles = getDeltaFiles(carbonFile, 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    // one '.carbonindex' file for last update operation
+    // one '.carbonindex' file for update operation this time
+    // one '.carbonindex' file for horizontal compaction
+    // so there must be three '.carbonindex' files and three '.deletedelta' 
files
+    assert(updateDeltaFiles.length == 3)
+    assert(deletaDeltaFiles.length == 3)
+
+    sql("""drop table dest10""")
+    sql("""drop database if exists iud10 cascade""")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
+  }
+
   override def afterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 107d228..cf45600 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -833,5 +833,7 @@ class UpdateCarbonTableTestCase extends QueryTest with 
BeforeAndAfterAll {
       .addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE , 
"true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "1")
   }
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 53e2eba..74c9140 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -276,7 +276,9 @@ object CarbonDataRDDFactory {
         } finally {
           executor.shutdownNow()
           compactor.deletePartialLoadsInCompaction()
-          compactionLock.unlock()
+          if (compactionModel.compactionType != 
CompactionType.IUD_UPDDEL_DELTA) {
+            compactionLock.unlock()
+          }
         }
       }
     }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 3189fd6..ab6b972 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -319,35 +319,42 @@ case class CarbonAlterTableCompactionCommand(
       val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
         .getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
       try {
-        if (updateLock.lockWithRetries(3, 3)) {
-          if (lock.lockWithRetries()) {
-            LOGGER.info("Acquired the compaction lock for table" +
-                        s" ${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
-            CarbonDataRDDFactory.startCompactionThreads(
-              sqlContext,
-              carbonLoadModel,
-              storeLocation,
-              compactionModel,
-              lock,
-              compactedSegments,
-              operationContext
-            )
-          } else {
-            LOGGER.error(s"Not able to acquire the compaction lock for table" +
-                         s" ${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+        // COMPACTION_LOCK and UPDATE_LOCK are already locked when start to 
execute update sql,
+        // so it don't need to require locks again when compactionType is 
IUD_UPDDEL_DELTA.
+        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+          if (!updateLock.lockWithRetries(3, 3)) {
+            throw new ConcurrentOperationException(carbonTable, "update", 
"compaction")
+          }
+          if (!lock.lockWithRetries()) {
+            LOGGER.error(s"Not able to acquire the compaction lock for table " 
+
+                         s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
             CarbonException.analysisException(
               "Table is already locked for compaction. Please try after some 
time.")
+          } else {
+            LOGGER.info("Acquired the compaction lock for table " +
+                        s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
           }
-        } else {
-          throw new ConcurrentOperationException(carbonTable, "update", 
"compaction")
         }
+        CarbonDataRDDFactory.startCompactionThreads(
+          sqlContext,
+          carbonLoadModel,
+          storeLocation,
+          compactionModel,
+          lock,
+          compactedSegments,
+          operationContext
+        )
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in start compaction thread.", e)
-          lock.unlock()
+          if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+            lock.unlock()
+          }
           throw e
       } finally {
-        updateLock.unlock()
+        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+          updateLock.unlock()
+        }
         DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
       }
     }

Reply via email to