Repository: carbondata Updated Branches: refs/heads/master 46f0c8517 -> b047918c7
[CARBONDATA-2759]Add Bad_Records_Options to STMPROPERTIES for Streaming Table This closes #2532 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b047918c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b047918c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b047918c Branch: refs/heads/master Commit: b047918c714ffee7e7f870c63872be2c4a6e2271 Parents: 46f0c85 Author: Indhumathi27 <[email protected]> Authored: Thu Jul 19 19:47:59 2018 +0530 Committer: QiangCai <[email protected]> Committed: Fri Jul 20 17:23:26 2018 +0800 ---------------------------------------------------------------------- .../carbondata/spark/StreamingOption.scala | 27 ++++- .../carbondata/stream/StreamJobManager.scala | 4 + .../sql/CarbonDatasourceHadoopRelation.scala | 4 + .../TestStreamingTableOperation.scala | 116 +++++++++++++++++++ 4 files changed, 150 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala index c724474..2402d83 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala @@ -22,7 +22,8 @@ import scala.collection.mutable import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.streaming.parser.CarbonStreamParser @@ -53,6 +54,30 @@ class StreamingOption(val userInputMap: Map[String, String]) { userInputMap.getOrElse(CarbonStreamParser.CARBON_STREAM_PARSER, CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) + lazy val badRecordsPath: String = + userInputMap + .getOrElse("bad_records_path", CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + + lazy val badRecordsAction: String = + userInputMap + .getOrElse("bad_records_action", CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) + + lazy val badRecordsLogger: String = + userInputMap + .getOrElse("bad_records_logger_enable", CarbonProperties.getInstance() + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)) + + lazy val isEmptyBadRecord: String = + userInputMap + .getOrElse("is_empty_bad_record", CarbonProperties.getInstance() + .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)) + lazy val remainingOption: Map[String, String] = { // copy the user input map and remove the fix options val mutableMap = mutable.Map[String, String]() ++= userInputMap http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala index 59e924d..8734c63 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala @@ -118,6 +118,10 @@ object StreamJobManager { .option("carbon.stream.parser", options.rowParser) .option("dbName", sinkTable.getDatabaseName) .option("tableName", sinkTable.getTableName) + .option("bad_record_path", options.badRecordsPath) + .option("bad_records_action", options.badRecordsAction) + .option("bad_records_logger_enable", options.badRecordsLogger) + .option("is_empty_bad_record", options.isEmptyBadRecord) .options(options.remainingOption) .start() latch.countDown() http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 8e402b9..8b6567f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -96,6 +96,10 @@ case class CarbonDatasourceHadoopRelation( var ifGetArrayItemExists = s breakable({ while (ifGetArrayItemExists.containsChild != null) { + if (ifGetArrayItemExists.childSchema.toString().contains("ArrayType")) { + arrayTypeExists = s.childSchema.toString().contains("ArrayType") + break + } if (ifGetArrayItemExists.child.isInstanceOf[AttributeReference]) { arrayTypeExists = s.childSchema.toString().contains("ArrayType") break http://git-wip-us.apache.org/repos/asf/carbondata/blob/b047918c/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 9b2af33..0771403 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -1744,6 +1744,122 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS sink") } + test("StreamSQL: create and drop a stream with Load options") { + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + + var rows = sql("SHOW STREAMS").collect() + assertResult(0)(rows.length) + + val csvDataDir = integrationPath + "/spark2/target/streamSql" + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) + + sql( + s""" + |CREATE TABLE source( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + |) + |STORED AS carbondata + |TBLPROPERTIES ( + | 'streaming'='source', + | 'format'='csv', + | 'path'='$csvDataDir' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE sink( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP + | ) + |STORED AS carbondata + |TBLPROPERTIES('streaming'='sink') + """.stripMargin) + + sql( + s""" + |CREATE STREAM stream123 ON TABLE sink + |STMPROPERTIES( + | 'trigger'='ProcessingTime', + | 'interval'='1 seconds', + | 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', + | 'BAD_RECORDS_ACTION' = 'FORCE', + | 'BAD_RECORDS_PATH'='$warehouse') + |AS + | SELECT * + | FROM source + | WHERE id % 2 = 1 + """.stripMargin).show(false) + sql( + s""" + |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink + |STMPROPERTIES( + | 'trigger'='ProcessingTime', + | 'interval'='1 seconds', + | 'BAD_RECORDS_LOGGER_ENABLE' = 'FALSE', + | 'BAD_RECORDS_ACTION' = 'FORCE', + | 'BAD_RECORDS_PATH'='$warehouse') + |AS + | SELECT * + | FROM source + | WHERE id % 2 = 1 + """.stripMargin).show(false) + Thread.sleep(200) + sql("select * from sink").show + + generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append) + Thread.sleep(5000) + + // after 2 minibatch, there should be 10 row added (filter condition: id%2=1) + checkAnswer(sql("select count(*) from sink"), Seq(Row(10))) + + val row = sql("select * from sink order by id").head() + val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0")) + assertResult(exceptedRow)(row) + + sql("SHOW STREAMS").show(false) + + rows = sql("SHOW STREAMS").collect() + assertResult(1)(rows.length) + assertResult("stream123")(rows.head.getString(0)) + assertResult("RUNNING")(rows.head.getString(2)) + assertResult("streaming.source")(rows.head.getString(3)) + assertResult("streaming.sink")(rows.head.getString(4)) + + rows = sql("SHOW STREAMS ON TABLE sink").collect() + assertResult(1)(rows.length) + assertResult("stream123")(rows.head.getString(0)) + assertResult("RUNNING")(rows.head.getString(2)) + assertResult("streaming.source")(rows.head.getString(3)) + assertResult("streaming.sink")(rows.head.getString(4)) + + sql("DROP STREAM stream123") + sql("DROP STREAM IF EXISTS stream123") + + rows = sql("SHOW STREAMS").collect() + assertResult(0)(rows.length) + + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + } + test("StreamSQL: create stream without interval ") { sql("DROP TABLE IF EXISTS source") sql("DROP TABLE IF EXISTS sink")
