This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 3d88685 [CARBONDATA-3663] Support loading stage files in batches 3d88685 is described below commit 3d88685757b4f5eef330bc543e0d8d20f70bb8df Author: liuzhi <371684...@qq.com> AuthorDate: Tue Jan 14 14:46:28 2020 +0800 [CARBONDATA-3663] Support loading stage files in batches Why is this PR needed? When there are a lots of stage files in the stage directory, if load all of them in once time, the loading time will can not be control. There need a way for users to specify the number of stage files per processing, to control the execution time of commands. What changes were proposed in this PR? Add a load option batch_file_count for users to specify the number of stage files per processing. Does this PR introduce any user interface change? Yes Is any new testcase added? Yes This closes #3578 --- docs/dml-of-carbondata.md | 19 ++++- .../carbon/flink/TestCarbonPartitionWriter.scala | 22 +++--- .../org/apache/carbon/flink/TestCarbonWriter.scala | 87 ++++++++++++++++++++-- .../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 ++++++++++- .../management/CarbonInsertFromStageCommand.scala | 78 +++++++++---------- .../spark/sql/parser/CarbonSpark2SqlParser.scala | 10 ++- 6 files changed, 190 insertions(+), 70 deletions(-) diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md index e148bd0..1cda75a 100644 --- a/docs/dml-of-carbondata.md +++ b/docs/dml-of-carbondata.md @@ -316,12 +316,29 @@ CarbonData DML statements are documented here,which includes: You can use this command to insert them into the table, so that making them visible for query. ``` - INSERT INTO <CARBONDATA TABLE> STAGE + INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, ...) ``` + **Supported Properties:** + +| Property | Description | +| ------------------------------------------------------- | ------------------------------------------------------------ | +| [BATCH_FILE_COUNT](#batch_file_count) | The number of stage files per processing | + +- + You can use the following options to load data: + + - ##### BATCH_FILE_COUNT: + The number of stage files per processing. + + ``` + OPTIONS('batch_file_count'=',') + ``` Examples: ``` INSERT INTO table1 STAGE + + INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5') ``` ### Load Data Using Static Partition diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala index fe2fa38..c92d6fc 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala @@ -34,7 +34,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest -import org.junit.Test import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema @@ -43,9 +42,8 @@ class TestCarbonPartitionWriter extends QueryTest { val tableName = "test_flink_partition" - @Test - def testLocal(): Unit = { - sql(s"drop table if exists $tableName").collect() + test("Writing flink data to local partition carbon table") { + sql(s"DROP TABLE IF EXISTS $tableName").collect() sql( s""" | CREATE TABLE $tableName (stringField string, intField int, shortField short) @@ -122,17 +120,16 @@ class TestCarbonPartitionWriter extends QueryTest { sql(s"INSERT INTO $tableName STAGE") - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) } finally { - sql(s"drop table if exists $tableName").collect() + sql(s"DROP TABLE IF EXISTS $tableName").collect() delDir(new File(dataPath)) } } - @Test - def testComplexType(): Unit = { - sql(s"drop table if exists $tableName").collect() + test("Test complex type") { + sql(s"DROP TABLE IF EXISTS $tableName").collect() sql( s""" | CREATE TABLE $tableName (stringField string, intField int, shortField short, @@ -212,14 +209,14 @@ class TestCarbonPartitionWriter extends QueryTest { sql(s"INSERT INTO $tableName STAGE") - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) - val rows = sql(s"select * from $tableName limit 1").collect() + val rows = sql(s"SELECT * FROM $tableName limit 1").collect() assertResult(1)(rows.length) assertResult(Array[Byte](2, 3, 4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0)) } finally { - sql(s"drop table if exists $tableName").collect() + sql(s"DROP TABLE IF EXISTS $tableName").collect() delDir(new File(dataPath)) } } @@ -231,7 +228,6 @@ class TestCarbonPartitionWriter extends QueryTest { val properties = new Properties properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath) properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath) - properties.setProperty(CarbonLocalProperty.COMMIT_THRESHOLD, "100") properties } diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala index 9195863..a297dcf 100644 --- a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala +++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala @@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest -import org.junit.Test import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.path.CarbonTablePath @@ -36,9 +35,8 @@ class TestCarbonWriter extends QueryTest { val tableName = "test_flink" - @Test - def testLocal(): Unit = { - sql(s"drop table if exists $tableName").collect() + test("Writing flink data to local carbon table") { + sql(s"DROP TABLE IF EXISTS $tableName").collect() sql( s""" | CREATE TABLE $tableName (stringField string, intField int, shortField short) @@ -103,14 +101,91 @@ class TestCarbonWriter extends QueryTest { sql(s"INSERT INTO $tableName STAGE") - checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000))) + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) // ensure the stage snapshot file and all stage files are deleted assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath))) assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty) } finally { - sql(s"drop table if exists $tableName").collect() + sql(s"DROP TABLE IF EXISTS $tableName").collect() + new File(dataPath).delete() + } + } + + test("test batch_file_count option") { + sql(s"DROP TABLE IF EXISTS $tableName").collect() + sql( + s""" + | CREATE TABLE $tableName (stringField string, intField int, shortField short) + | STORED AS carbondata + """.stripMargin + ).collect() + + val rootPath = System.getProperty("user.dir") + "/target/test-classes" + + val dataTempPath = rootPath + "/data/temp/" + val dataPath = rootPath + "/data/" + new File(dataPath).delete() + new File(dataPath).mkdir() + + try { + val tablePath = storeLocation + "/" + tableName + "/" + + val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation) + val carbonProperties = newCarbonProperties(storeLocation) + + writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100") + + val environment = StreamExecutionEnvironment.getExecutionEnvironment + environment.setParallelism(1) + environment.setRestartStrategy(RestartStrategies.noRestart) + + val dataCount = 1000 + val source = new TestSource(dataCount) { + @throws[InterruptedException] + override def get(index: Int): Array[AnyRef] = { + val data = new Array[AnyRef](3) + data(0) = "test" + index + data(1) = index.asInstanceOf[AnyRef] + data(2) = 12345.asInstanceOf[AnyRef] + data + } + + @throws[InterruptedException] + override def onFinish(): Unit = { + Thread.sleep(5000L) + } + } + val stream = environment.addSource(source) + val factory = CarbonWriterFactory.builder("Local").build( + "default", + tableName, + tablePath, + new Properties, + writerProperties, + carbonProperties + ) + val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build + + stream.addSink(streamSink) + + try environment.execute + catch { + case exception: Exception => + // TODO + throw new UnsupportedOperationException(exception) + } + + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500))) + + sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')") + + checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000))) + } finally { + sql(s"DROP TABLE IF EXISTS $tableName").collect() new File(dataPath).delete() } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index dc97cd9..ae859c0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -22,16 +22,20 @@ import java.util.Comparator import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext} +import org.apache.hadoop.mapreduce.InputSplit +import org.apache.spark.{Accumulator, CarbonInputMetrics, DataSkewRangePartitioner, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.execution.command.ExecutionErrors -import org.apache.spark.sql.util.SparkSQLUtil +import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter} import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, StructField, StructType} @@ -40,6 +44,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer +import org.apache.carbondata.hadoop.CarbonProjection import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, DataField, DataLoadProcessBuilder, FailureCauses} import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants @@ -47,8 +52,9 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, TableOptionConstant} -import org.apache.carbondata.spark.rdd.StringArrayRow +import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow} import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.store.CarbonRowReadSupport /** * Use sortBy operator in spark to load the data @@ -423,6 +429,38 @@ object DataLoadProcessBuilderOnSpark { } loadModel } + + /** + * create DataFrame basing on specified splits + */ + def createInputDataFrame( + sparkSession: SparkSession, + carbonTable: CarbonTable, + splits: Seq[InputSplit] + ): DataFrame = { + val columns = carbonTable + .getCreateOrderColumn + .asScala + .map(_.getColName) + .toArray + val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns) + val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow]( + sparkSession, + columnProjection = new CarbonProjection(columns), + null, + carbonTable.getAbsoluteTableIdentifier, + carbonTable.getTableInfo.serialize, + carbonTable.getTableInfo, + new CarbonInputMetrics, + null, + classOf[SparkDataTypeConverterImpl], + classOf[CarbonRowReadSupport], + splits.asJava) + .map { row => + new GenericInternalRow(row.getData.asInstanceOf[Array[Any]]) + } + SparkSQLUtil.execute(rdd, schema, sparkSession) + } } class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala index a4dd45b..0d1121d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala @@ -28,7 +28,7 @@ import com.google.gson.Gson import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.InputSplit import org.apache.log4j.Logger -import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.util.SparkSQLUtil @@ -56,7 +56,8 @@ import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark */ case class CarbonInsertFromStageCommand( databaseNameOp: Option[String], - tableName: String + tableName: String, + options: Map[String, String] ) extends DataCommand { @transient var LOGGER: Logger = _ @@ -113,7 +114,16 @@ case class CarbonInsertFromStageCommand( // 8) delete the snapshot file // 1) read all existing stage files - val stageFiles = listStageFiles(stagePath, hadoopConf) + val batchSize = try { + Integer.valueOf(options.getOrElse("batch_file_count", Integer.MAX_VALUE.toString)) + } catch { + case _: NumberFormatException => + throw new MalformedCarbonCommandException("Option [batch_file_count] is not a number.") + } + if (batchSize < 1) { + throw new MalformedCarbonCommandException("Option [batch_file_count] is less than 1.") + } + val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize) if (stageFiles.isEmpty) { // no stage files, so do nothing LOGGER.warn("files not found under stage metadata folder") @@ -258,25 +268,14 @@ case class CarbonInsertFromStageCommand( LOGGER.info(s"start to load ${splits.size} files into " + s"${table.getDatabaseName}.${table.getTableName}") val start = System.currentTimeMillis() - try { - CarbonUtils - .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - table.getDatabaseName + CarbonCommonConstants.POINT + table.getTableName, - splits.map(s => s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) - val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table) - DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( - spark, - Option(dataFrame), - loadModel, - SparkSQLUtil.sessionState(spark).newHadoopConf() - ).map { row => + val dataFrame = DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) + DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort( + spark, + Option(dataFrame), + loadModel, + SparkSQLUtil.sessionState(spark).newHadoopConf() + ).map { row => (row._1, FailureCauses.NONE == row._2._2.failureCauses) - } - } finally { - CarbonUtils - .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - table.getDatabaseName + "." + - table.getTableName) } LOGGER.info(s"finish data loading, time taken ${System.currentTimeMillis() - start}ms") @@ -316,26 +315,11 @@ case class CarbonInsertFromStageCommand( val start = System.currentTimeMillis() partitionDataList.map { case (partition, splits) => - LOGGER.info(s"start to load ${ splits.size } files into " + - s"${ table.getDatabaseName }.${ table.getTableName }. " + - s"Partition information: ${ partition.mkString(",") }") - val dataFrame = try { - // Segments should be set for query here, because consider a scenario where custom - // compaction is triggered, so it can happen that all the segments might be taken into - // consideration instead of custom segments if we do not set, leading to duplicate data in - // compacted segment. To avoid this, segments to be considered are to be set in threadset. - CarbonUtils - .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - table.getDatabaseName + CarbonCommonConstants.POINT + - table.getTableName, - splits.map(split => split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(",")) - SparkSQLUtil.createInputDataFrame(spark, table) - } finally { - CarbonUtils.threadUnset( - CarbonCommonConstants.CARBON_INPUT_SEGMENTS + table.getDatabaseName + - CarbonCommonConstants.POINT + - table.getTableName) - } + LOGGER.info(s"start to load ${splits.size} files into " + + s"${table.getDatabaseName}.${table.getTableName}. " + + s"Partition information: ${partition.mkString(",")}") + val dataFrame = + DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits) val columns = dataFrame.columns val header = columns.mkString(",") val selectColumns = columns.filter(!partition.contains(_)) @@ -457,7 +441,8 @@ case class CarbonInsertFromStageCommand( */ private def listStageFiles( loadDetailsDir: String, - hadoopConf: Configuration + hadoopConf: Configuration, + batchSize: Int ): Array[(CarbonFile, CarbonFile)] = { val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf) if (dir.exists()) { @@ -467,13 +452,20 @@ case class CarbonInsertFromStageCommand( }.map { file => (file.getName.substring(0, file.getName.indexOf(".")), file) }.toMap - allFiles.filter { file => + val stageFiles = allFiles.filter { file => !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX) }.filter { file => successFiles.contains(file.getName) + }.sortWith { + (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime }.map { file => (file, successFiles(file.getName)) } + if (stageFiles.length <= batchSize) { + stageFiles + } else { + stageFiles.dropRight(stageFiles.length - batchSize) + } } else { Array.empty } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 10b661a..ee094d7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -523,12 +523,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } /** - * INSERT INTO [dbName.]tableName STAGE + * INSERT INTO [dbName.]tableName STAGE [OPTIONS (key1=value1, key2=value2, ...)] */ protected lazy val insertStageData: Parser[LogicalPlan] = - INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ { - case dbName ~ tableName => - CarbonInsertFromStageCommand(dbName, tableName) + INSERT ~ INTO ~> (ident <~ ".").? ~ ident ~ STAGE ~ + (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ { + case dbName ~ tableName ~ stage ~ options => + CarbonInsertFromStageCommand(dbName, tableName, + options.getOrElse(List[(String, String)]()).toMap[String, String]) } protected lazy val cleanFiles: Parser[LogicalPlan] =