Repository: carbondata Updated Branches: refs/heads/master 498502d2b -> 8840b7b56
[MINOR] Adding a testcase for stream-table join in StreamSQL This closes #2431 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8840b7b5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8840b7b5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8840b7b5 Branch: refs/heads/master Commit: 8840b7b56ba6ea180d1ee15b6e0fed9c5901ef98 Parents: 498502d Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Jun 26 19:10:35 2018 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Tue Jul 24 15:09:46 2018 +0800 ---------------------------------------------------------------------- .../TestStreamingTableOperation.scala | 147 +++++++++++++++++-- 1 file changed, 131 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/8840b7b5/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 0771403..3073c59 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assert(ex.getMessage.contains("'ddd' not found")) } + test("StreamSQL: stream join dimension table") { + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + sql("DROP TABLE IF EXISTS dimension") + + sql( + s""" + |CREATE TABLE dim( + | id INT, + | name STRING, + | country STRING + |) + |STORED AS carbondata + """.stripMargin) + val inputDir = integrationPath + "/spark2/target/streamDim" + import spark.implicits._ + spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, "chris", "canada"))) + .write.mode("overwrite").csv(inputDir) + sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim OPTIONS('header'='false')") + sql("SELECT * FROM dim").show + + var rows = sql("SHOW STREAMS").collect() + assertResult(0)(rows.length) + + val csvDataDir = integrationPath + "/spark2/target/streamSql" + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, SaveMode.Overwrite, false) + + sql( + s""" + |CREATE TABLE source( + | id INT, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + |) + |STORED AS carbondata + |TBLPROPERTIES ( + | 'streaming'='source', + | 'format'='csv', + | 'path'='$csvDataDir' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE sink( + | id INT, + | name STRING, + | country STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + | ) + |STORED AS carbondata + |TBLPROPERTIES('streaming'='sink') + """.stripMargin) + + sql( + """ + |CREATE STREAM stream123 ON TABLE sink + |STMPROPERTIES( + | 'trigger'='ProcessingTime', + | 'interval'='1 seconds') + |AS + | SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, s.birthday, s.register, s.updated + | FROM source s + | JOIN dim d ON s.id = d.id + """.stripMargin).show(false) + + Thread.sleep(2000) + sql("select * from sink").show + + generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append, false) + Thread.sleep(5000) + + // after 2 minibatch, there should be 10 row added (filter condition: id%2=1) + checkAnswer(sql("select count(*) from sink"), Seq(Row(20))) + + sql("select * from sink order by id").show + val row = sql("select * from sink order by id, salary").head() + val exceptedRow = Row(1, "alice", "india", 120000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")) + assertResult(exceptedRow)(row) + + sql("DROP STREAM stream123") + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + sql("DROP TABLE IF EXISTS dim") + } + def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int, @@ -2239,23 +2335,42 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { idStart: Int, rowNums: Int, csvDirPath: String, - saveMode: SaveMode = SaveMode.Overwrite): Unit = { + saveMode: SaveMode = SaveMode.Overwrite, + withDim: Boolean = true): Unit = { // Create csv data frame file - val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums) - .map { id => - (id, - "name_" + id, - "city_" + id, - 10000.00 * id, - BigDecimal.valueOf(0.01), - 80.01, - "1990-01-01", - "2010-01-01 10:01:01", - "2010-01-01 10:01:01", - "school_" + id + ":school_" + id + id + "$" + id) - } - val csvDataDF = spark.createDataFrame(csvRDD).toDF( - "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "file") + val csvDataDF = if (withDim) { + // generate data with dimension columns (name and city) + val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums) + .map { id => + (id, + "name_" + id, + "city_" + id, + 10000.00 * id, + BigDecimal.valueOf(0.01), + 80.01, + "1990-01-01", + "2010-01-01 10:01:01", + "2010-01-01 10:01:01", + "school_" + id + ":school_" + id + id + "$" + id) + } + spark.createDataFrame(csvRDD).toDF( + "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "file") + } else { + // generate data without dimension columns + val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums) + .map { id => + (id % 3 + 1, + 10000.00 * id, + BigDecimal.valueOf(0.01), + 80.01, + "1990-01-01", + "2010-01-01 10:01:01", + "2010-01-01 10:01:01", + "school_" + id + ":school_" + id + id + "$" + id) + } + spark.createDataFrame(csvRDD).toDF( + "id", "salary", "tax", "percent", "birthday", "register", "updated", "file") + } csvDataDF.write .option("header", "false")