Repository: carbondata
Updated Branches:
  refs/heads/master 46f0c8517 -> b047918c7


[CARBONDATA-2759]Add Bad_Records_Options to STMPROPERTIES for Streaming Table

This closes #2532


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b047918c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b047918c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b047918c

Branch: refs/heads/master
Commit: b047918c714ffee7e7f870c63872be2c4a6e2271
Parents: 46f0c85
Author: Indhumathi27 <[email protected]>
Authored: Thu Jul 19 19:47:59 2018 +0530
Committer: QiangCai <[email protected]>
Committed: Fri Jul 20 17:23:26 2018 +0800

----------------------------------------------------------------------
 .../carbondata/spark/StreamingOption.scala      |  27 ++++-
 .../carbondata/stream/StreamJobManager.scala    |   4 +
 .../sql/CarbonDatasourceHadoopRelation.scala    |   4 +
 .../TestStreamingTableOperation.scala           | 116 +++++++++++++++++++
 4 files changed, 150 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
index c724474..2402d83 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
@@ -22,7 +22,8 @@ import scala.collection.mutable
 import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -53,6 +54,30 @@ class StreamingOption(val userInputMap: Map[String, String]) 
{
     userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER,
       CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER)
 
+  lazy val badRecordsPath: String =
+    userInputMap
+      .getOrElse("bad_records_path", CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+
+  lazy val badRecordsAction: String =
+    userInputMap
+      .getOrElse("bad_records_action", CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+
+  lazy val badRecordsLogger: String =
+    userInputMap
+      .getOrElse("bad_records_logger_enable", CarbonProperties.getInstance()
+        
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+          
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))
+
+  lazy val isEmptyBadRecord: String =
+    userInputMap
+      .getOrElse("is_empty_bad_record", CarbonProperties.getInstance()
+        
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))
+
   lazy val remainingOption: Map[String, String] = {
     // copy the user input map and remove the fix options
     val mutableMap = mutable.Map[String, String]() ++= userInputMap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 59e924d..8734c63 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -118,6 +118,10 @@ object StreamJobManager {
             .option("carbon.stream.parser", options.rowParser)
             .option("dbName", sinkTable.getDatabaseName)
             .option("tableName", sinkTable.getTableName)
+            .option("bad_record_path", options.badRecordsPath)
+            .option("bad_records_action", options.badRecordsAction)
+            .option("bad_records_logger_enable", options.badRecordsLogger)
+            .option("is_empty_bad_record", options.isEmptyBadRecord)
             .options(options.remainingOption)
             .start()
           latch.countDown()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 8e402b9..8b6567f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -96,6 +96,10 @@ case class CarbonDatasourceHadoopRelation(
           var ifGetArrayItemExists = s
           breakable({
             while (ifGetArrayItemExists.containsChild != null) {
+              if 
(ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) {
+                arrayTypeExists = 
s.childSchema.toString().contains("ArrayType")
+                break
+              }
               if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) 
{
                 arrayTypeExists = 
s.childSchema.toString().contains("ArrayType")
                 break

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 9b2af33..0771403 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1744,6 +1744,122 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS sink")
   }
 
+  test("StreamSQL: create and drop a stream with Load options") {
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+
+    var rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    val csvDataDir = integrationPath + "/spark2/target/streamSql"
+    // streaming ingest 10 rows
+    generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
+
+    sql(
+      s"""
+         |CREATE TABLE source(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         |)
+         |STORED AS carbondata
+         |TBLPROPERTIES (
+         | 'streaming'='source',
+         | 'format'='csv',
+         | 'path'='$csvDataDir'
+         |)
+      """.stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE sink(
+         | id INT,
+         | name STRING,
+         | city STRING,
+         | salary FLOAT,
+         | tax DECIMAL(8,2),
+         | percent double,
+         | birthday DATE,
+         | register TIMESTAMP,
+         | updated TIMESTAMP
+         | )
+         |STORED AS carbondata
+         |TBLPROPERTIES('streaming'='sink')
+      """.stripMargin)
+
+    sql(
+      s"""
+        |CREATE STREAM stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='1 seconds',
+        |  'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
+        |  'BAD_RECORDS_ACTION' = 'FORCE',
+        |  'BAD_RECORDS_PATH'='$warehouse')
+        |AS
+        |  SELECT *
+        |  FROM source
+        |  WHERE id % 2 = 1
+      """.stripMargin).show(false)
+    sql(
+      s"""
+        |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink
+        |STMPROPERTIES(
+        |  'trigger'='ProcessingTime',
+        |  'interval'='1 seconds',
+        |  'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE',
+        |  'BAD_RECORDS_ACTION' = 'FORCE',
+        |  'BAD_RECORDS_PATH'='$warehouse')
+        |AS
+        |  SELECT *
+        |  FROM source
+        |  WHERE id % 2 = 1
+      """.stripMargin).show(false)
+    Thread.sleep(200)
+    sql("select * from sink").show
+
+    generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, 
SaveMode.Append)
+    Thread.sleep(5000)
+
+    // after 2 minibatch, there should be 10 row added (filter condition: 
id%2=1)
+    checkAnswer(sql("select count(*) from sink"), Seq(Row(10)))
+
+    val row = sql("select * from sink order by id").head()
+    val exceptedRow = Row(11, "name_11", "city_11", 110000.0, 
BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), 
Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 
10:01:01.0"))
+    assertResult(exceptedRow)(row)
+
+    sql("SHOW STREAMS").show(false)
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming.source")(rows.head.getString(3))
+    assertResult("streaming.sink")(rows.head.getString(4))
+
+    rows = sql("SHOW STREAMS ON TABLE sink").collect()
+    assertResult(1)(rows.length)
+    assertResult("stream123")(rows.head.getString(0))
+    assertResult("RUNNING")(rows.head.getString(2))
+    assertResult("streaming.source")(rows.head.getString(3))
+    assertResult("streaming.sink")(rows.head.getString(4))
+
+    sql("DROP STREAM stream123")
+    sql("DROP STREAM IF EXISTS stream123")
+
+    rows = sql("SHOW STREAMS").collect()
+    assertResult(0)(rows.length)
+
+    sql("DROP TABLE IF EXISTS source")
+    sql("DROP TABLE IF EXISTS sink")
+  }
+
   test("StreamSQL: create stream without interval ") {
     sql("DROP TABLE IF EXISTS source")
     sql("DROP TABLE IF EXISTS sink")

Reply via email to