This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 35f1501 [CARBONDATA-3483] don't require update.lock and
compaction.lock again when execute 'IUD_UPDDEL_DELTA' compaction
35f1501 is described below
commit 35f1501ff38023bf09f108ffb984699abdd6f639
Author: Zhang Zhichao <[email protected]>
AuthorDate: Thu Aug 1 09:53:14 2019 +0800
[CARBONDATA-3483] don't require update.lock and compaction.lock again when
execute 'IUD_UPDDEL_DELTA' compaction
Problem:
After PR#3166, horizontal compaction will not actually run when execute
update sql.
When it runs update sql and will run horizontal compaction if needs, it
will require update.lock and compaction.lock when execute
CarbonAlterTableCompactionCommand.alterTableForCompaction, but these two locks
already are locked when it starts to execute update sql. so it will require
locks failed and can't execute compaction.
Solution:
Don't require update.lock and compaction.lock again when execute
'IUD_UPDDEL_DELTA' compaction
This closes #3343
---
.../iud/HorizontalCompactionTestCase.scala | 64 +++++++++++++++++++++-
.../testsuite/iud/UpdateCarbonTableTestCase.scala | 2 +
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../CarbonAlterTableCompactionCommand.scala | 47 +++++++++-------
4 files changed, 95 insertions(+), 22 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
index 24c8f17..2852501 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
@@ -20,10 +20,14 @@ package org.apache.carbondata.spark.testsuite.iud
import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile,
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.spark.sql.CarbonEnv
import org.apache.spark.sql.test.util.QueryTest
-
class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
CarbonProperties.getInstance()
@@ -383,6 +387,64 @@ class HorizontalCompactionTestCase extends QueryTest with
BeforeAndAfterAll {
checkAnswer(sql("select longfield from
customer1"),Seq(Row(9223372036854775807L),Row(9223372036854775807L),Row(9223372036854775807L),Row(9223372036854775807L)))
}
+
+ def getDeltaFiles(carbonFile: CarbonFile, fileSuffix: String):
Array[CarbonFile] = {
+ return carbonFile.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(fileSuffix)
+ }
+ })
+ }
+
+ test("[CARBONDATA-3483] Don't require update.lock and compaction.lock again
when execute 'IUD_UPDDEL_DELTA' compaction") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "1")
+ sql("""drop database if exists iud10 cascade""")
+ sql("""create database iud10""")
+ sql("""use iud10""")
+
+ sql(
+ """create table dest10 (c1 string,c2 int,c3 string,c5 string) STORED BY
'org.apache.carbondata.format'""")
+ sql(s"""load data local inpath '$resourcesPath/IUD/comp1.csv' INTO table
dest10""")
+
+ val carbonTable = CarbonEnv.getCarbonTable(Some("iud10"),
"dest10")(sqlContext.sparkSession)
+ val identifier = carbonTable.getAbsoluteTableIdentifier()
+ val dataFilesDir = CarbonTablePath.getSegmentPath(identifier.getTablePath,
"0")
+ val carbonFile =
+ FileFactory.getCarbonFile(dataFilesDir,
FileFactory.getFileType(dataFilesDir))
+
+ var updateDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ var deletaDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ assert(updateDeltaFiles.length == 0)
+ assert(deletaDeltaFiles.length == 0)
+
+ sql("""update dest10 set (c1, c3) = ('update_a', 'update_aa') where c2 = 3
or c2 = 6""").show()
+
+ updateDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ deletaDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ // just update once, there is no horizontal compaction at this time
+ assert(updateDeltaFiles.length == 1)
+ assert(deletaDeltaFiles.length == 1)
+
+ sql("""update dest10 set (c1, c3) = ('update_a', 'update_aa') where c2 = 5
or c2 = 8""").show()
+
+ updateDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+ deletaDeltaFiles = getDeltaFiles(carbonFile,
CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+ // one '.carbonindex' file for last update operation
+ // one '.carbonindex' file for update operation this time
+ // one '.carbonindex' file for horizontal compaction
+ // so there must be three '.carbonindex' files and three '.deletedelta'
files
+ assert(updateDeltaFiles.length == 3)
+ assert(deletaDeltaFiles.length == 3)
+
+ sql("""drop table dest10""")
+ sql("""drop database if exists iud10 cascade""")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "false")
+ }
+
override def afterAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 107d228..cf45600 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -833,5 +833,7 @@ class UpdateCarbonTableTestCase extends QueryTest with
BeforeAndAfterAll {
.addProperty(CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE ,
"true")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER , "true")
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_UPDATE_SEGMENT_PARALLELISM, "1")
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 53e2eba..74c9140 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -276,7 +276,9 @@ object CarbonDataRDDFactory {
} finally {
executor.shutdownNow()
compactor.deletePartialLoadsInCompaction()
- compactionLock.unlock()
+ if (compactionModel.compactionType !=
CompactionType.IUD_UPDDEL_DELTA) {
+ compactionLock.unlock()
+ }
}
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 3189fd6..ab6b972 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -319,35 +319,42 @@ case class CarbonAlterTableCompactionCommand(
val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
try {
- if (updateLock.lockWithRetries(3, 3)) {
- if (lock.lockWithRetries()) {
- LOGGER.info("Acquired the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
- CarbonDataRDDFactory.startCompactionThreads(
- sqlContext,
- carbonLoadModel,
- storeLocation,
- compactionModel,
- lock,
- compactedSegments,
- operationContext
- )
- } else {
- LOGGER.error(s"Not able to acquire the compaction lock for table" +
- s" ${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
+ // COMPACTION_LOCK and UPDATE_LOCK are already locked when start to
execute update sql,
+ // so it don't need to require locks again when compactionType is
IUD_UPDDEL_DELTA.
+ if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+ if (!updateLock.lockWithRetries(3, 3)) {
+ throw new ConcurrentOperationException(carbonTable, "update",
"compaction")
+ }
+ if (!lock.lockWithRetries()) {
+ LOGGER.error(s"Not able to acquire the compaction lock for table "
+
+ s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some
time.")
+ } else {
+ LOGGER.info("Acquired the compaction lock for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${
carbonLoadModel.getTableName }")
}
- } else {
- throw new ConcurrentOperationException(carbonTable, "update",
"compaction")
}
+ CarbonDataRDDFactory.startCompactionThreads(
+ sqlContext,
+ carbonLoadModel,
+ storeLocation,
+ compactionModel,
+ lock,
+ compactedSegments,
+ operationContext
+ )
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread.", e)
- lock.unlock()
+ if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+ lock.unlock()
+ }
throw e
} finally {
- updateLock.unlock()
+ if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
+ updateLock.unlock()
+ }
DataMapStatusManager.disableAllLazyDataMaps(carbonTable)
}
}