Repository: carbondata
Updated Branches:
  refs/heads/master 498502d2b -> 8840b7b56


[MINOR] Adding a testcase for stream-table join in StreamSQL

This closes #2431


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8840b7b5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8840b7b5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8840b7b5

Branch: refs/heads/master
Commit: 8840b7b56ba6ea180d1ee15b6e0fed9c5901ef98
Parents: 498502d
Author: Jacky Li <jacky.li...@qq.com>
Authored: Tue Jun 26 19:10:35 2018 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Tue Jul 24 15:09:46 2018 +0800

----------------------------------------------------------------------
 .../TestStreamingTableOperation.scala           | 147 +++++++++++++++++--
 1 file changed, 131 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8840b7b5/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 0771403..3073c59 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -2079,6 +2079,102 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
     assert(ex.getMessage.contains("'ddd' not found"))
   }
 
+  test("StreamSQL: stream join dimension table") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+    sql("DROP TABLE IF EXISTS dimension")
+
+    sql(
+      s"""
+         |CREATE TABLE dim(
+         |  id INT,
+         |  name STRING,
+         |  country STRING
+         |)
+         |STORED AS carbondata
+       """.stripMargin)
+    val inputDir = integrationPath + "/spark2/target/streamDim"
+    import spark.implicits._
+    spark.createDataset(Seq((1, "alice", "india"), (2, "bob", "france"), (3, 
"chris", "canada")))
+      .write.mode("overwrite").csv(inputDir)
+    sql(s"LOAD DATA INPATH '$inputDir' INTO TABLE dim 
OPTIONS('header'='false')")
+    sql("SELECT * FROM dim").show
+
+    var rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    val csvDataDir = integrationPath + "/spark2/target/streamSql"
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir, 
SaveMode.Overwrite, false)
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source',
+         | 'format'='csv',
+         | 'path'='$csvDataDir'
+         |)
+      """.stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | country STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink')
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE STREAM stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='1 seconds')
+        |AS
+        |  SELECT s.id, d.name, d.country, s.salary, s.tax, s.percent, 
s.birthday, s.register, s.updated
+        |  FROM source s
+        |  JOIN dim d ON s.id = d.id
+      """.stripMargin).show(false)
+
+    Thread.sleep(2000)
+    sql("select * from sink").show
+
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, 
SaveMode.Append, false)
+    Thread.sleep(5000)
+
+    // after 2 minibatch, there should be 10 row added (filter condition: 
id%2=1)
+    checkAnswer(sql("select count(*) from sink"), Seq(Row(20)))
+
+    sql("select * from sink order by id").show
+    val row = sql("select * from sink order by id, salary").head()
+    val exceptedRow = Row(1, "alice", "india", 120000.0, 
BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 
10:01:01.0"))
+    assertResult(exceptedRow)(row)
+
+    sql("DROP STREAM stream123")
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+    sql("DROP TABLE IF EXISTS dim")
+  }
+
   def createWriteSocketThread(
       serverSocket: ServerSocket,
       writeNums: Int,
@@ -2239,23 +2335,42 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
       idStart: Int,
       rowNums: Int,
       csvDirPath: String,
-      saveMode: SaveMode = SaveMode.Overwrite): Unit = {
+      saveMode: SaveMode = SaveMode.Overwrite,
+      withDim: Boolean = true): Unit = {
     // Create csv data frame file
-    val csvRDD = spark.sparkContext.parallelize(idStart until idStart + 
rowNums)
-      .map { id =>
-        (id,
-          "name_" + id,
-          "city_" + id,
-          10000.00 * id,
-          BigDecimal.valueOf(0.01),
-          80.01,
-          "1990-01-01",
-          "2010-01-01 10:01:01",
-          "2010-01-01 10:01:01",
-          "school_" + id + ":school_" + id + id + "$" + id)
-      }
-    val csvDataDF = spark.createDataFrame(csvRDD).toDF(
-      "id", "name", "city", "salary", "tax", "percent", "birthday", 
"register", "updated", "file")
+    val csvDataDF = if (withDim) {
+      // generate data with dimension columns (name and city)
+      val csvRDD = spark.sparkContext.parallelize(idStart until idStart + 
rowNums)
+        .map { id =>
+          (id,
+            "name_" + id,
+            "city_" + id,
+            10000.00 * id,
+            BigDecimal.valueOf(0.01),
+            80.01,
+            "1990-01-01",
+            "2010-01-01 10:01:01",
+            "2010-01-01 10:01:01",
+            "school_" + id + ":school_" + id + id + "$" + id)
+        }
+      spark.createDataFrame(csvRDD).toDF(
+        "id", "name", "city", "salary", "tax", "percent", "birthday", 
"register", "updated", "file")
+    } else {
+      // generate data without dimension columns
+      val csvRDD = spark.sparkContext.parallelize(idStart until idStart + 
rowNums)
+        .map { id =>
+          (id % 3 + 1,
+            10000.00 * id,
+            BigDecimal.valueOf(0.01),
+            80.01,
+            "1990-01-01",
+            "2010-01-01 10:01:01",
+            "2010-01-01 10:01:01",
+            "school_" + id + ":school_" + id + id + "$" + id)
+        }
+      spark.createDataFrame(csvRDD).toDF(
+        "id", "salary", "tax", "percent", "birthday", "register", "updated", 
"file")
+    }
 
     csvDataDF.write
       .option("header", "false")

Reply via email to