Zhangshunyu commented on a change in pull request #3578: [CARBONDATA-3663]
Support loading stage files in batches
URL: https://github.com/apache/carbondata/pull/3578#discussion_r366661610
##########
File path:
integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
##########
@@ -103,14 +103,91 @@ class TestCarbonWriter extends QueryTest {
sql(s"INSERT INTO $tableName STAGE")
- checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
// ensure the stage snapshot file and all stage files are deleted
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
} finally {
- sql(s"drop table if exists $tableName").collect()
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
+ new File(dataPath).delete()
+ }
+ }
+
+ @Test
+ def testBatchLoad(): Unit = {
+ sql(s"DROP TABLE IF EXISTS $tableName").collect()
+ sql(
+ s"""
+ | CREATE TABLE $tableName (stringField string, intField int,
shortField short)
+ | STORED AS carbondata
+ """.stripMargin
+ ).collect()
+
+ val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+ val dataTempPath = rootPath + "/data/temp/"
+ val dataPath = rootPath + "/data/"
+ new File(dataPath).delete()
+ new File(dataPath).mkdir()
+
+ try {
+ val tablePath = storeLocation + "/" + tableName + "/"
+
+ val writerProperties = newWriterProperties(dataTempPath, dataPath,
storeLocation)
+ val carbonProperties = newCarbonProperties(storeLocation)
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(1)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ val dataCount = 1000
+ val source = new TestSource(dataCount) {
+ @throws[InterruptedException]
+ override def get(index: Int): Array[AnyRef] = {
+ val data = new Array[AnyRef](3)
+ data(0) = "test" + index
+ data(1) = index.asInstanceOf[AnyRef]
+ data(2) = 12345.asInstanceOf[AnyRef]
+ data
+ }
+
+ @throws[InterruptedException]
+ override def onFinish(): Unit = {
+ Thread.sleep(5000L)
+ }
+ }
+ val stream = environment.addSource(source)
+ val factory = CarbonWriterFactory.builder("Local").build(
+ "default",
+ tableName,
+ tablePath,
+ new Properties,
+ writerProperties,
+ carbonProperties
+ )
+ val streamSink = StreamingFileSink.forBulkFormat(new
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+ stream.addSink(streamSink)
+
+ try environment.execute
+ catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+
+ sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+
+ checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
Review comment:
why each stage file have 100 lines?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services