http://git-wip-us.apache.org/repos/asf/carbondata/blob/a160dfb6/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala new file mode 100644 index 0000000..d789f5c --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOpName.scala @@ -0,0 +1,2647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import java.io.{File, PrintWriter} +import java.math.BigDecimal +import java.net.{BindException, ServerSocket} +import java.sql.{Date, Timestamp} +import java.util.concurrent.Executors + +import scala.collection.mutable + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.common.exceptions.NoSuchStreamException +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.TIMESERIES +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.exception.ProcessMetaDataException +import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +class TestStreamingTableOpName extends QueryTest with BeforeAndAfterAll { + + private val spark = sqlContext.sparkSession + private val dataFilePath = s"$resourcesPath/streamSample.csv" + def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../") + .getCanonicalPath + val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords") + + override def beforeAll { + badRecordFilePath.delete() + badRecordFilePath.mkdirs() + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + sql("DROP DATABASE IF EXISTS streaming CASCADE") + sql("CREATE DATABASE streaming") + sql("USE streaming") + sql( + """ + | CREATE TABLE source( + | c1 string, + | c2 int, + | c3 string, + | c5 string + | ) STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES ('streaming' = 'true') + """.stripMargin) + sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO TABLE source""") + + dropTable() + + // 1. normal table not support streaming ingest + createTable(tableName = "batch_table", streaming = false, withBatchLoad = true) + + // 2. streaming table with different input source + // file source + createTable(tableName = "stream_table_file", streaming = true, withBatchLoad = true) + + // 3. streaming table with bad records + createTable(tableName = "bad_record_fail", streaming = true, withBatchLoad = true) + + // 4. streaming frequency check + createTable(tableName = "stream_table_1s", streaming = true, withBatchLoad = true) + + // 5. streaming table execute batch loading + // 6. detail query + // 8. compaction + // full scan + filter scan + aggregate query + createTable(tableName = "stream_table_filter", streaming = true, withBatchLoad = true) + + createTableWithComplexType( + tableName = "stream_table_filter_complex", streaming = true, withBatchLoad = true) + + + // 11. table for delete segment test + createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false) + createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad = false) + + // 12. reject alter streaming properties + // 13. handoff streaming segment and finish streaming + createTable(tableName = "stream_table_handoff", streaming = false, withBatchLoad = false) + + // 15. auto handoff streaming segment + // 16. close streaming table + // 17. reopen streaming table after close + // 9. create new stream segment if current stream segment is full + createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false) + + // 18. block drop table while streaming is in progress + createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false) + + // 19. block streaming on 'preaggregate' main table + createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false) + + createTable(tableName = "agg_table", streaming = true, withBatchLoad = false) + + createTable(tableName = "stream_table_empty", streaming = true, withBatchLoad = false) + + var csvDataDir = integrationPath + "/spark2/target/csvdatanew" + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append) + } + + test("validate streaming property") { + sql( + """ + | CREATE TABLE correct( + | c1 string + | ) STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES ('streaming' = 'true') + """.stripMargin) + sql("DROP TABLE correct") + sql( + """ + | CREATE TABLE correct( + | c1 string + | ) STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES ('streaming' = 'false') + """.stripMargin) + sql("DROP TABLE correct") + val exceptionMsg = intercept[MalformedCarbonCommandException] { + sql( + """ + | create table wrong( + | c1 string + | ) STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES ('streaming' = 'invalid') + """.stripMargin) + } + assert(exceptionMsg.getMessage.equals("Table property \'streaming\' should be either \'true\' or \'false\'")) + } + + test("test blocking update and delete operation on streaming table") { + val exceptionMsgUpdate = intercept[MalformedCarbonCommandException] { + sql("""UPDATE source d SET (d.c2) = (d.c2 + 1) WHERE d.c1 = 'a'""").collect() + } + val exceptionMsgDelete = intercept[MalformedCarbonCommandException] { + sql("""DELETE FROM source WHERE d.c1 = 'a'""").collect() + } + assert(exceptionMsgUpdate.getMessage.equals("Data update is not allowed for streaming table")) + assert(exceptionMsgDelete.getMessage.equals("Data delete is not allowed for streaming table")) + } + + test("test blocking alter table operation on streaming table") { + val addColException = intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").collect() + } + val dropColException = intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source DROP COLUMNS (c1)""").collect() + } + val renameException = intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source RENAME to t""").collect() + } + val changeDataTypeException = intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source CHANGE c2 c2 bigint""").collect() + } + assertResult("Alter table add column is not allowed for streaming table")(addColException.getMessage) + assertResult("Alter table drop column is not allowed for streaming table")(dropColException.getMessage) + assertResult("Alter rename table is not allowed for streaming table")(renameException.getMessage) + assertResult("Alter table change datatype is not allowed for streaming table")(changeDataTypeException.getMessage) + } + + override def afterAll { + dropTable() + sql("USE default") + sql("DROP DATABASE IF EXISTS streaming CASCADE") + var csvDataDir = integrationPath + "/spark2/target/csvdatanew" + badRecordFilePath.delete() + new File(csvDataDir).delete() + csvDataDir = integrationPath + "/spark2/target/csvdata" + new File(csvDataDir).delete() + } + + def dropTable(): Unit = { + sql("drop table if exists streaming.batch_table") + sql("drop table if exists streaming.stream_table_file") + sql("drop table if exists streaming.bad_record_fail") + sql("drop table if exists streaming.stream_table_1s") + sql("drop table if exists streaming.stream_table_filter ") + sql("drop table if exists streaming.stream_table_filter_complex") + sql("drop table if exists streaming.stream_table_delete_id") + sql("drop table if exists streaming.stream_table_delete_date") + sql("drop table if exists streaming.stream_table_handoff") + sql("drop table if exists streaming.stream_table_reopen") + sql("drop table if exists streaming.stream_table_drop") + sql("drop table if exists streaming.agg_table_block") + sql("drop table if exists streaming.stream_table_empty") + } + + // normal table not support streaming ingest + test("normal table not support streaming ingest and alter normal table's streaming property") { + // alter normal table's streaming property + val msg = intercept[MalformedCarbonCommandException](sql("alter table streaming.batch_table set tblproperties('streaming'='false')")) + assertResult("Streaming property value is incorrect")(msg.getMessage) + + val identifier = new TableIdentifier("batch_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + var server: ServerSocket = null + try { + server = getServerSocket() + val thread1 = createWriteSocketThread(server, 2, 10, 1) + thread1.start() + // use thread pool to catch the exception of sink thread + val pool = Executors.newSingleThreadExecutor() + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier) + val future = pool.submit(thread2) + Thread.sleep(1000) + thread1.interrupt() + val msg = intercept[Exception] { + future.get() + } + assert(msg.getMessage.contains("is not a streaming table")) + } finally { + if (server != null) { + server.close() + } + } + } + + // input source: file + test("streaming ingest from file source") { + val identifier = new TableIdentifier("stream_table_file", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdata").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.stream_table_file"), + Seq(Row(25)) + ) + + val row = sql("select * from streaming.stream_table_file order by id").head() + val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")) + assertResult(exceptedRow)(row) + } + + test("test preaggregate table creation on streaming table without handoff") { + val identifier = new TableIdentifier("agg_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table using 'preaggregate' as select name, sum(salary) from agg_table group by name") + // No data should be loaded into aggregate table as hand-off is not yet fired + checkAnswer(sql("select * from agg_table_p1"), Seq()) + } + + test("test if data is loaded into preaggregate after handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, avg(salary) from agg_table2 group by name") + sql("create datamap p3 on table agg_table2 using 'preaggregate' as select name, min(salary) from agg_table2 group by name") + sql("create datamap p4 on table agg_table2 using 'preaggregate' as select name, max(salary) from agg_table2 group by name") + sql("create datamap p5 on table agg_table2 using 'preaggregate' as select name, count(salary) from agg_table2 group by name") + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + checkAnswer(sql("select * from agg_table2_p2"), + Seq( + Row("name_10", 200000.0, 2.0), + Row("name_11", 220000.0, 2.0), + Row("name_12", 240000.0, 2.0), + Row("name_13", 260000.0, 2.0), + Row("name_14", 280000.0, 2.0))) + checkAnswer(sql("select * from agg_table2_p3"), + Seq( + Row("name_10", 100000.0), + Row("name_11", 110000.0), + Row("name_12", 120000.0), + Row("name_13", 130000.0), + Row("name_14", 140000.0))) + checkAnswer(sql("select * from agg_table2_p4"), + Seq( + Row("name_10", 100000.0), + Row("name_11", 110000.0), + Row("name_12", 120000.0), + Row("name_13", 130000.0), + Row("name_14", 140000.0))) + checkAnswer(sql("select * from agg_table2_p5"), + Seq( + Row("name_10", 2.0), + Row("name_11", 2.0), + Row("name_12", 2.0), + Row("name_13", 2.0), + Row("name_14", 2.0))) + sql("drop table agg_table2") + } + + test("test whether data is loaded into preaggregate after handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')") + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"), + Seq( + Row("name_10", 400000.0), + Row("name_14", 560000.0), + Row("name_12", 480000.0), + Row("name_11", 440000.0), + Row("name_13", 520000.0))) + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0), + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + + sql("drop table agg_table2") + } + + test("test whether data is loaded into preaggregate before handoff is fired") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')") + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select name, sum(salary) from agg_table2 group by name"), + Seq( + Row("name_10", 400000.0), + Row("name_14", 560000.0), + Row("name_12", 480000.0), + Row("name_11", 440000.0), + Row("name_13", 520000.0))) + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + + sql("drop table agg_table2") + } + + test("test if timeseries load is successful when created on streaming table") { + sql("drop table if exists timeseries_table") + createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("timeseries_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + sql( + s""" + | CREATE DATAMAP agg0_second ON TABLE timeseries_table + | USING '${TIMESERIES.toString}' + | DMPROPERTIES ( + | 'EVENT_TIME'='register', + | 'SECOND_GRANULARITY'='1') + | AS SELECT register, SUM(id) FROM timeseries_table + | GROUP BY register + """.stripMargin) + sql("alter table timeseries_table finish streaming") + sql("alter table timeseries_table compact 'streaming'") + checkAnswer( sql("select * FROM timeseries_table_agg0_second"), Seq(Row(Timestamp.valueOf("2010-01-01 10:01:01.0"), 120))) + } + + test("test if timeseries load is successful when created on streaming table with day granularity") { + sql("drop table if exists timeseries_table") + createTable(tableName = "timeseries_table", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("timeseries_table", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE timeseries_table + | USING '${TIMESERIES.toString}' + | DMPROPERTIES ( + | 'EVENT_TIME'='register', + | 'DAY_GRANULARITY'='1') + | AS SELECT register, SUM(id) FROM timeseries_table + | GROUP BY register + """.stripMargin) + sql("alter table timeseries_table finish streaming") + sql("alter table timeseries_table compact 'streaming'") + checkAnswer( sql("select * FROM timeseries_table_agg0_day"), Seq(Row(Timestamp.valueOf("2010-01-01 00:00:00.0"), 120))) + } + + test("test if minor compaction is successful for streaming and preaggregate tables") { + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + sql("create datamap p2 on table agg_table2 using 'preaggregate' as select name, min(salary) from agg_table2 group by name") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + sql("alter table agg_table2 compact 'minor'") + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 800000.0), + Row("name_11", 880000.0), + Row("name_12", 960000.0), + Row("name_13", 1040000.0), + Row("name_14", 1120000.0))) + assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1")) + assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1")) + assert(sql("show segments for table agg_table2_p2").collect().map(_.get(0)).contains("0.1")) + sql("drop table if exists agg_table2") + } + + test("test if major compaction is successful for streaming and preaggregate tables") { + sql("drop table if exists agg_table2") + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + loadData() + sql("alter table agg_table2 finish streaming") + sql("alter table agg_table2 compact 'streaming'") + sql("alter table agg_table2 compact 'major'") + checkAnswer(sql("select * from agg_table2_p1"), + Seq( + Row("name_10", 800000.0), + Row("name_11", 880000.0), + Row("name_12", 960000.0), + Row("name_13", 1040000.0), + Row("name_14", 1120000.0))) + assert(sql("show segments for table agg_table2").collect().map(_.get(0)).contains("1.1")) + assert(sql("show segments for table agg_table2_p1").collect().map(_.get(0)).contains("0.1")) + sql("drop table if exists agg_table2") + } + + def loadData() { + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(2000) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + } + + test("test if data is displayed when alias is used for column name") { + sql("drop table if exists agg_table2") + createTable(tableName = "agg_table2", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table2", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdata1").getCanonicalPath + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append) + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table2"), + Seq(Row(10))) + sql(s"load data inpath '$csvDataDir' into table agg_table2 options('FILEHEADER'='id, name, city, salary, tax, percent, birthday, register, updated, file')") + sql("create datamap p1 on table agg_table2 using 'preaggregate' as select name, sum(salary) from agg_table2 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select name as abc, sum(salary) as sal from agg_table2 group by name"), + Seq( + Row("name_14", 560000.0), + Row("name_10", 400000.0), + Row("name_12", 480000.0), + Row("name_11", 440000.0), + Row("name_13", 520000.0))) + + sql("drop table agg_table2") + } + + test("test if data is loaded in aggregate table after handoff is done for streaming table") { + createTable(tableName = "agg_table3", streaming = true, withBatchLoad = false) + val identifier = new TableIdentifier("agg_table3", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val csvDataDir = new File("target/csvdatanew").getCanonicalPath + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir) + generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, SaveMode.Append) + // streaming ingest 10 rows + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming.agg_table3"), + Seq(Row(10))) + sql("alter table agg_table3 finish streaming") + sql("alter table agg_table3 compact 'streaming'") + sql("create datamap p1 on table agg_table3 using 'preaggregate' as select name, sum(salary) from agg_table3 group by name") + // Data should be loaded into aggregate table as hand-off is fired + checkAnswer(sql("select * from agg_table3_p1"), + Seq( + Row("name_10", 200000.0), + Row("name_11", 220000.0), + Row("name_12", 240000.0), + Row("name_13", 260000.0), + Row("name_14", 280000.0))) + } + + // bad records + test("streaming table with bad records action: fail") { + executeStreamingIngest( + tableName = "bad_record_fail", + batchNums = 2, + rowNumsEachBatch = 10, + intervalOfSource = 1, + intervalOfIngest = 1, + continueSeconds = 8, + generateBadRecords = true, + badRecordAction = "fail", + autoHandoff = false + ) + val result = sql("select count(*) from streaming.bad_record_fail").collect() + assert(result(0).getLong(0) < 10 + 5) + } + + // ingest with different interval + test("1 row per 1 second interval") { + executeStreamingIngest( + tableName = "stream_table_1s", + batchNums = 3, + rowNumsEachBatch = 1, + intervalOfSource = 1, + intervalOfIngest = 1, + continueSeconds = 6, + generateBadRecords = false, + badRecordAction = "force", + autoHandoff = false + ) + val result = sql("select count(*) from streaming.stream_table_1s").collect() + // 20 seconds can't ingest all data, exists data delay + assert(result(0).getLong(0) >= 5) + } + + test("query on stream table with dictionary, sort_columns") { + val batchParts = + partitionNums("select * from streaming.stream_table_filter") + + executeStreamingIngest( + tableName = "stream_table_filter", + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 20, + generateBadRecords = true, + badRecordAction = "force", + autoHandoff = false + ) + + val totalParts = + partitionNums("select * from streaming.stream_table_filter") + assert(totalParts > batchParts) + + val streamParts = totalParts - batchParts + + // non-filter + val result = sql("select * from streaming.stream_table_filter order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(1).isNullAt(0)) + assert(result(1).getString(1) == "name_6") + // check one row of batch loading + assert(result(50).getInt(0) == 100000001) + assert(result(50).getString(1) == "batch_1") + + // filter + assert(batchParts >= partitionNums("select * from stream_table_filter where id >= 100000001")) + + checkAnswer( + sql("select * from stream_table_filter where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id = 1")) + + checkAnswer( + sql("select * from stream_table_filter where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where id > 49 and id < 100000002")) + + checkAnswer( + sql("select * from stream_table_filter where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where id between 50 and 100000001")) + + checkAnswer( + sql("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where name in ('name_9','name_10', 'name_11', 'name_12') and id <> 10 and id not in (11, 12)")) + + checkAnswer( + sql("select * from stream_table_filter where name = 'name_3'"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where name = 'name_3'")) + + checkAnswer( + sql("select * from stream_table_filter where name like '%me_3%' and id < 30"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where name like '%me_3%' and id < 30")) + + checkAnswer(sql("select count(*) from stream_table_filter where name like '%ame%'"), + Seq(Row(49))) + assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%ame%'")) + + checkAnswer(sql("select count(*) from stream_table_filter where name like '%batch%'"), + Seq(Row(5))) + assert(totalParts == partitionNums("select count(*) from stream_table_filter where name like '%batch%'")) + + checkAnswer( + sql("select * from stream_table_filter where name >= 'name_3' and id < 4"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where name >= 'name_3' and id < 4")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and name <> 'name_10' and name not in ('name_11', 'name_12')")) + + checkAnswer( + sql("select * from stream_table_filter where city = 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where city = 'city_1'")) + + checkAnswer( + sql("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where city like '%ty_1%' and ( id < 10 or id >= 100000001)")) + + checkAnswer(sql("select count(*) from stream_table_filter where city like '%city%'"), + Seq(Row(54))) + assert(totalParts == partitionNums("select count(*) from stream_table_filter where city like '%city%'")) + + checkAnswer( + sql("select * from stream_table_filter where city > 'city_09' and city < 'city_10'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where city > 'city_09' and city < 'city_10'")) + + checkAnswer( + sql("select * from stream_table_filter where city between 'city_09' and 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and city <> 'city_10' and city not in ('city_11', 'city_12')")) + + checkAnswer( + sql("select * from stream_table_filter where salary = 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where salary = 90000")) + + checkAnswer( + sql("select * from stream_table_filter where salary > 80000 and salary <= 100000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where salary > 80000 and salary <= 100000")) + + checkAnswer( + sql("select * from stream_table_filter where salary between 80001 and 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where salary between 80001 and 90000")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10, 11, 12) and salary <> 100000.0 and salary not in (110000.0, 120000.0)")) + + checkAnswer( + sql("select * from stream_table_filter where tax = 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where tax = 0.04 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where tax >= 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where tax >= 0.04 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where tax < 0.05 and tax > 0.02 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where tax between 0.02 and 0.04 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10) and tax <> 0.01"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and tax <> 0.01")) + + checkAnswer( + sql("select * from stream_table_filter where percent = 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where percent = 80.04 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where percent >= 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where percent >= 80.04 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where percent < 80.05 and percent > 80.02 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where percent between 80.02 and 80.05 and id < 100")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10) and percent <> 80.01"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and percent <> 80.01")) + + checkAnswer( + sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'")) + + checkAnswer( + sql("select * from stream_table_filter where birthday = '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where birthday = '1990-01-04'")) + + checkAnswer( + sql("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where birthday > '1990-01-03' and birthday <= '1990-01-04'")) + + checkAnswer( + sql("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where birthday between '1990-01-04' and '1990-01-05'")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and birthday <> '1990-01-01'")) + + checkAnswer( + sql("select * from stream_table_filter where register = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where register = '2010-01-04 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and register <> '2010-01-01 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where updated = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where updated = '2010-01-04 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0")))) + assert(streamParts >= partitionNums("select * from stream_table_filter where id in (9, 10) and updated <> '2010-01-01 10:01:01'")) + + checkAnswer( + sql("select * from stream_table_filter where id is null order by name"), + Seq(Row(null, "", "", null, null, null, null, null, null), + Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts >= partitionNums("select * from stream_table_filter where id is null order by name")) + + checkAnswer( + sql("select * from stream_table_filter where name = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(streamParts >= partitionNums("select * from stream_table_filter where name = ''")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and name <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and name <> ''")) + + checkAnswer( + sql("select * from stream_table_filter where city = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(streamParts >= partitionNums("select * from stream_table_filter where city = ''")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and city <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and city <> ''")) + + checkAnswer( + sql("select * from stream_table_filter where salary is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(totalParts == partitionNums("select * from stream_table_filter where salary is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and salary is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and salary is not null")) + + checkAnswer( + sql("select * from stream_table_filter where tax is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(totalParts == partitionNums("select * from stream_table_filter where tax is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and tax is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and tax is not null")) + + checkAnswer( + sql("select * from stream_table_filter where percent is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(totalParts == partitionNums("select * from stream_table_filter where percent is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and percent is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and percent is not null")) + + checkAnswer( + sql("select * from stream_table_filter where birthday is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(1 == partitionNums("select * from stream_table_filter where birthday is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and birthday is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and birthday is not null")) + + checkAnswer( + sql("select * from stream_table_filter where register is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(1 == partitionNums("select * from stream_table_filter where register is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and register is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and register is not null")) + + checkAnswer( + sql("select * from stream_table_filter where updated is null"), + Seq(Row(null, "", "", null, null, null, null, null, null))) + assert(3 == partitionNums("select * from stream_table_filter where updated is null")) + + checkAnswer( + sql("select * from stream_table_filter where id is null and updated is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")))) + assert(totalParts == partitionNums("select * from stream_table_filter where id is null and updated is not null")) + + // agg + checkAnswer( + sql("select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " + + "from stream_table_filter where id >= 2 and id <= 100000004"), + Seq(Row(51, 100000004, "batch_1", 7843162, 400001276))) + assert(totalParts >= partitionNums( + "select count(*), max(id), min(name), cast(avg(id) as integer), sum(id) " + + "from stream_table_filter where id >= 2 and id <= 100000004")) + + checkAnswer( + sql("select city, count(id), sum(id), cast(avg(id) as integer), " + + "max(salary), min(salary) " + + "from stream_table_filter " + + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + + "and city <> '' " + + "group by city " + + "order by city"), + Seq(Row("city_1", 2, 100000002, 50000001, 10000.0, 0.1), + Row("city_2", 1, 100000002, 100000002, 0.2, 0.2), + Row("city_3", 2, 100000006, 50000003, 30000.0, 0.3))) + assert(totalParts >= partitionNums( + "select city, count(id), sum(id), cast(avg(id) as integer), " + + "max(salary), min(salary) " + + "from stream_table_filter " + + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + + "and city <> '' " + + "group by city " + + "order by city")) + + // batch loading + for(_ <- 0 to 2) { + executeBatchLoad("stream_table_filter") + } + checkAnswer( + sql("select count(*) from streaming.stream_table_filter"), + Seq(Row(25 * 2 + 5 + 5 * 3))) + + sql("alter table streaming.stream_table_filter compact 'minor'") + Thread.sleep(5000) + val result1 = sql("show segments for table streaming.stream_table_filter").collect() + result1.foreach { row => + if (row.getString(0).equals("1")) { + assertResult(SegmentStatus.STREAMING.getMessage)(row.getString(1)) + assertResult(FileFormat.ROW_V1.toString)(row.getString(5)) + } else if (row.getString(0).equals("0.1")) { + assertResult(SegmentStatus.SUCCESS.getMessage)(row.getString(1)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + } else { + assertResult(SegmentStatus.COMPACTED.getMessage)(row.getString(1)) + assertResult(FileFormat.COLUMNAR_V3.toString)(row.getString(5)) + } + } + + } + + test("query on stream table with dictionary, sort_columns and complex column") { + executeStreamingIngest( + tableName = "stream_table_filter_complex", + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 20, + generateBadRecords = true, + badRecordAction = "force", + autoHandoff = false + ) + + // non-filter + val result = sql("select * from streaming.stream_table_filter_complex order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(0).isNullAt(0)) + assert(result(0).getString(1) == "") + assert(result(0).getStruct(9).isNullAt(1)) + // check one row of batch loading + assert(result(50).getInt(0) == 100000001) + assert(result(50).getString(1) == "batch_1") + assert(result(50).getStruct(9).getInt(1) == 20) + + // filter + checkAnswer( + sql("select * from stream_table_filter_complex where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name = 'name_3'"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name like '%me_3%' and id < 30"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%ame%'"), + Seq(Row(49))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where name like '%batch%'"), + Seq(Row(5))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name >= 'name_3' and id < 4"), + Seq(Row(3, "name_3", "city_3", 30000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_3", "school_33")), 3)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city = 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city like '%ty_1%' and ( id < 10 or id >= 100000001)"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer(sql("select count(*) from stream_table_filter_complex where city like '%city%'"), + Seq(Row(54))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city > 'city_09' and city < 'city_10'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city between 'city_09' and 'city_1'"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 1)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary = 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary > 80000 and salary <= 100000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_10", "school_1010")), 10)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary between 80001 and 90000"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax = 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax >= 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax < 0.05 and tax > 0.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax between 0.02 and 0.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent = 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent >= 80.04 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent < 80.05 and percent > 80.02 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent between 80.02 and 80.05 and id < 100"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday = '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday > '1990-01-03' and birthday <= '1990-01-04'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday between '1990-01-04' and '1990-01-05'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")),50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated = '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated > '2010-01-03 10:01:01' and register <= '2010-01-04 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated between '2010-01-04 10:01:01' and '2010-01-05 10:01:01'"), + Seq(Row(9, "name_9", "city_9", 90000.0, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_9", "school_99")), 9)), + Row(100000004, "batch_4", "city_4", 0.4, BigDecimal.valueOf(0.04), 80.04, Date.valueOf("1990-01-04"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Timestamp.valueOf("2010-01-04 10:01:01.0"), Row(wrap(Array("school_4", "school_44")), 50)), + Row(100000005, "batch_5", "city_5", 0.5, BigDecimal.valueOf(0.05), 80.05, Date.valueOf("1990-01-05"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Timestamp.valueOf("2010-01-05 10:01:01.0"), Row(wrap(Array("school_5", "school_55")), 60)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null order by name"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)), + Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where name = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and name <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where city = ''"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and city <> ''"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where salary is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and salary is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where tax is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and tax is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where percent is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and salary is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where birthday is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and birthday is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where register is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and register is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where updated is null"), + Seq(Row(null, "", "", null, null, null, null, null, null, Row(wrap(Array(null)), null)))) + + checkAnswer( + sql("select * from stream_table_filter_complex where id is null and updated is not null"), + Seq(Row(null, "name_6", "city_6", 60000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Row(wrap(Array("school_6", "school_66")), 6)))) + + // agg + checkAnswer( + sql("select count(*), max(id), min(name), cast(avg(file.age) as integer), sum(file.age) " + + "from stream_table_filter_complex where id >= 2 and id <= 100000004"), + Seq(Row(51, 100000004, "batch_1", 27, 1406))) + + checkAnswer( + sql("select city, count(id), sum(id), cast(avg(file.age) as integer), " + + "max(salary), min(salary) " + + "from stream_table_filter_complex " + + "where name in ('batch_1', 'batch_2', 'batch_3', 'name_1', 'name_2', 'name_3') " + + "and city <> '' " + + "group by city " + + "order by city"), + Seq(Row("city_1", 2, 100000002, 10, 10000.0, 0.1), + Row("city_2", 1, 100000002, 30, 0.2, 0.2), + Row("city_3", 2, 100000006, 21, 30000.0, 0.3))) + } + + test("test deleting streaming segment by ID while ingesting") { + executeStreamingIngest( + tableName = "stream_table_delete_id", + batchNums = 3, + rowNumsEachBatch = 100, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 18, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1, + autoHandoff = false + ) + val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect() + val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",") + val msg = intercept[Exception] { + sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds1) ") + } + assertResult(s"Delete segment by Id is failed. Invalid ID is: ${beforeDelete.length -1}")(msg.getMessage) + + val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming Finish")) + .map(_.getString(0)).mkString(",") + sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds2) ") + val afterDelete = sql("show segments for table streaming.stream_table_delete_id").collect() + afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row => + assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1)) + } + } + + test("test deleting streaming segment by date while ingesting") { + executeStreamingIngest( + tableName = "stream_table_delete_date", + batchNums = 3, + rowNumsEachBatch = 100, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 18, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1, + autoHandoff = false) + val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect() + sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before " + + s"'2999-10-01 01:00:00'") + val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming")) + assertResult(1)(segmentIds.length) + val afterDelete = sql("show segments for table streaming.stream_table_delete_date").collect() + afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row => + assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1)) + } + } + + test("reject alter streaming properties and handoff 'streaming finish' segment to columnar segment") { + try { + sql("ALTER TABLE streaming.stream_table_handoff UNSET TBLPROPERTIES IF EXISTS ('streaming')") + assert(false, "unsupport to unset streaming property") + } catch { + case _: Throwable => + assert(true) + } + try { + sql("ALTER TABLE streaming.stream_table_handoff SET TBLPROPERTIES('streaming'='true')") + executeStreamingIngest( + tableName = "stream_table_handoff", + batchNums = 2, +
<TRUNCATED>