This is an automated email from the ASF dual-hosted git repository.
jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 5c28ee6 [CARBONDATA-3714] Support specify order type when list stage
files
5c28ee6 is described below
commit 5c28ee61433c4529d86a7611cf9768884f6c59bc
Author: liuzhi <[email protected]>
AuthorDate: Tue Feb 18 16:46:35 2020 +0800
[CARBONDATA-3714] Support specify order type when list stage files
Why is this PR needed?
Sometimes, user want load the lastest data to table first.
What changes were proposed in this PR?
Add "batch_file_order" option for CarbonInsertFromStagesCommand.
Does this PR introduce any user interface change?
Yes. (One option "batch_file_order" is added for
CarbonInsertFromStageCommand, document added)
Is any new testcase added?
Yes
This closes #3628
---
docs/dml-of-carbondata.md | 14 +++++
.../org/apache/carbon/flink/TestCarbonWriter.scala | 15 ++---
.../management/CarbonInsertFromStageCommand.scala | 70 +++++++++++++++++-----
3 files changed, 78 insertions(+), 21 deletions(-)
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 9d935c8..49e5664 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -323,6 +323,7 @@ CarbonData DML statements are documented here,which
includes:
| Property | Description
|
| ------------------------------------------------------- |
------------------------------------------------------------ |
| [BATCH_FILE_COUNT](#batch_file_count) | The number of
stage files per processing |
+| [BATCH_FILE_ORDER](#batch_file_order) | The order type of
stage files in per processing |
-
You can use the following options to load data:
@@ -334,11 +335,24 @@ CarbonData DML statements are documented here,which
includes:
OPTIONS('batch_file_count'='5')
```
+ - ##### BATCH_FILE_ORDER:
+ The order type of stage files in per processing, choices: ASC, DESC.
+ The default is ASC.
+ Stage files will order by the last modified time with the specified order
type.
+
+ ```
+ OPTIONS('batch_file_order'='DESC')
+ ```
+
Examples:
```
INSERT INTO table1 STAGE
INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
+ Note: This command use the default file order, will insert the earliest
stage files into the table.
+
+ INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5',
'batch_file_order'='DESC')
+ Note: This command will insert the latest stage files into the table.
```
### Load Data Using Static Partition
diff --git
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 1d82a75..396703d 100644
---
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -27,6 +27,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.spark.sql.{CarbonEnv, Row}
import org.apache.spark.sql.test.util.QueryTest
+
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -37,7 +38,6 @@ class TestCarbonWriter extends QueryTest {
val tableName = "test_flink"
val bucketTableName = "insert_bucket_table"
-
test("Writing flink data to local carbon table") {
sql(s"DROP TABLE IF EXISTS $tableName").collect()
sql(
@@ -281,9 +281,9 @@ class TestCarbonWriter extends QueryTest {
val plan = sql(
s"""
- |select t1.*, t2.*
- |from $tableName t1, $bucketTableName t2
- |where t1.stringField = t2.stringField
+ |select t1.*, t2.*
+ |from $tableName t1, $bucketTableName t2
+ |where t1.stringField = t2.stringField
""".stripMargin).queryExecution.executedPlan
var shuffleExists = false
plan.collect {
@@ -297,9 +297,9 @@ class TestCarbonWriter extends QueryTest {
checkAnswer(sql(
s"""select count(*) from
- |(select t1.*, t2.*
- |from $tableName t1, $bucketTableName t2
- |where t1.stringField = t2.stringField) temp
+ |(select t1.*, t2.*
+ |from $tableName t1, $bucketTableName t2
+ |where t1.stringField = t2.stringField) temp
""".stripMargin), Row(1000))
} finally {
sql(s"DROP TABLE IF EXISTS $tableName").collect()
@@ -307,6 +307,7 @@ class TestCarbonWriter extends QueryTest {
}
}
+
private def newWriterProperties(
dataTempPath: String,
storeLocation: String) = {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index d63ec24..b971dcd 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -27,7 +27,6 @@ import scala.collection.JavaConverters._
import com.google.gson.Gson
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.InputSplit
-import org.apache.log4j.Logger
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{Checker, DataCommand}
import org.apache.spark.sql.util.SparkSQLUtil
@@ -35,7 +34,7 @@ import org.apache.spark.sql.util.SparkSQLUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{AbstractDFSCarbonFile,
CarbonFile}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil,
ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{ColumnarFormatVersion,
SegmentFileStore}
@@ -61,12 +60,9 @@ case class CarbonInsertFromStageCommand(
options: Map[String, String]
) extends DataCommand {
- @transient var LOGGER: Logger = _
-
- val DELETE_FILES_RETRY_TIMES = 3
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
override def processData(spark: SparkSession): Seq[Row] = {
- LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
Checker.validateTableExists(databaseNameOp, tableName, spark)
val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
val hadoopConf = spark.sessionState.newHadoopConf()
@@ -118,15 +114,31 @@ case class CarbonInsertFromStageCommand(
// 1) read all existing stage files
val batchSize = try {
- Integer.valueOf(options.getOrElse("batch_file_count",
Integer.MAX_VALUE.toString))
+ Integer.valueOf(
+ options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY,
+ CarbonInsertFromStageCommand.BATCH_FILE_COUNT_DEFAULT))
} catch {
case _: NumberFormatException =>
- throw new MalformedCarbonCommandException("Option [batch_file_count]
is not a number.")
+ throw new MalformedCarbonCommandException("Option [" +
+ CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is not a
number.")
}
if (batchSize < 1) {
- throw new MalformedCarbonCommandException("Option [batch_file_count]
is less than 1.")
+ throw new MalformedCarbonCommandException("Option [" +
+ CarbonInsertFromStageCommand.BATCH_FILE_COUNT_KEY + "] is less
than 1.")
+ }
+ val orderType =
options.getOrElse(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY,
+ CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DEFAULT)
+ if
(!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC)
&&
+
!orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC))
{
+ throw new MalformedCarbonCommandException("Option [" +
+ CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY + "] is invalid,
should be " +
+ CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC + " or " +
+ CarbonInsertFromStageCommand.BATCH_FILE_ORDER_DESC + ".")
}
- val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize)
+ LOGGER.info("Option [" +
CarbonInsertFromStageCommand.BATCH_FILE_ORDER_KEY +
+ "] value is " + orderType)
+ val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize,
+
orderType.equalsIgnoreCase(CarbonInsertFromStageCommand.BATCH_FILE_ORDER_ASC))
if (stageFiles.isEmpty) {
// no stage files, so do nothing
LOGGER.warn("files not found under stage metadata folder")
@@ -446,7 +458,7 @@ case class CarbonInsertFromStageCommand(
executorService: ExecutorService,
stageFiles: Array[(CarbonFile, CarbonFile)]): Unit = {
val startTime = System.currentTimeMillis()
- var retry = DELETE_FILES_RETRY_TIMES
+ var retry = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
while (deleteStageFiles(executorService, stageFiles).length > 0 && retry >
0) {
retry -= 1
}
@@ -484,7 +496,7 @@ case class CarbonInsertFromStageCommand(
if (table.isHivePartitionTable) {
return
}
- var retries = DELETE_FILES_RETRY_TIMES
+ var retries = CarbonInsertFromStageCommand.DELETE_FILES_RETRY_TIMES
while(deleteSnapShotFile(snapshotFilePath) && retries > 0) {
retries -= 1
}
@@ -497,7 +509,8 @@ case class CarbonInsertFromStageCommand(
private def listStageFiles(
loadDetailsDir: String,
hadoopConf: Configuration,
- batchSize: Int
+ batchSize: Int,
+ ascendingSort: Boolean
): Array[(CarbonFile, CarbonFile)] = {
val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
if (dir.exists()) {
@@ -518,7 +531,12 @@ case class CarbonInsertFromStageCommand(
}.filter { file =>
successFiles.contains(file.getName)
}.sortWith {
- (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime
+ (file1, file2) =>
+ if (ascendingSort) {
+ file1.getLastModifiedTime < file2.getLastModifiedTime
+ } else {
+ file1.getLastModifiedTime > file2.getLastModifiedTime
+ }
}.map { file =>
(file, successFiles(file.getName))
}
@@ -556,3 +574,27 @@ case class CarbonInsertFromStageCommand(
override protected def opName: String = "INSERT STAGE"
}
+
+object CarbonInsertFromStageCommand {
+
+ val DELETE_FILES_RETRY_TIMES = 3
+
+ val BATCH_FILE_COUNT_KEY = "batch_file_count"
+
+ val BATCH_FILE_COUNT_DEFAULT: String = Integer.MAX_VALUE.toString
+
+ val BATCH_FILE_ORDER_KEY = "batch_file_order"
+
+ /**
+ * Use this option will insert the earliest stage files into the table.
+ */
+ val BATCH_FILE_ORDER_ASC = "ASC"
+
+ /**
+ * Use this option will insert the latest stage files into the table.
+ */
+ val BATCH_FILE_ORDER_DESC = "DESC"
+
+ val BATCH_FILE_ORDER_DEFAULT: String = BATCH_FILE_ORDER_KEY
+
+}