This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 99e0c7cb59fdf9d89a797b1b13923b64639dfc30 Author: Zhang Zhichao <[email protected]> AuthorDate: Tue Aug 27 11:32:48 2019 +0800 [CARBONDATA-3497] Support to write long string for streaming table This closes #3366 --- .../hadoop/stream/StreamRecordReader.java | 19 +- .../resources/streamSample_with_long_string.csv | 6 + .../streaming/CarbonAppendableStreamSink.scala | 19 +- .../converter/SparkDataTypeConverterImpl.java | 6 +- .../TestStreamingTableWithLongString.scala | 649 +++++++++++++++++++++ .../streaming/CarbonStreamRecordWriter.java | 11 +- .../streaming/parser/CSVStreamParserImp.java | 5 +- .../streaming/parser/CarbonStreamParser.java | 3 +- .../streaming/parser/RowStreamParserImp.scala | 11 +- 9 files changed, 715 insertions(+), 14 deletions(-) diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java index 75e36be..1e40baa 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java @@ -81,6 +81,7 @@ public class StreamRecordReader extends RecordReader<Void, Object> { protected CarbonTable carbonTable; private CarbonColumn[] storageColumns; private boolean[] isRequired; + private boolean[] dimensionsIsVarcharTypeMap; private DataType[] measureDataTypes; private int dimensionCount; private int measureCount; @@ -163,6 +164,10 @@ public class StreamRecordReader extends RecordReader<Void, Object> { .getDirectDictionaryGenerator(storageColumns[i].getDataType()); } } + dimensionsIsVarcharTypeMap = new boolean[dimensionCount]; + for (int i = 0; i < dimensionCount; i++) { + dimensionsIsVarcharTypeMap[i] = storageColumns[i].getDataType() == DataTypes.VARCHAR; + } measureDataTypes = new DataType[measureCount]; for (int i = 0; i < measureCount; i++) { measureDataTypes[i] = storageColumns[dimensionCount + i].getDataType(); @@ -387,7 +392,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> { } } else { if (isNoDictColumn[colCount]) { - int v = input.readShort(); + int v = 0; + if (dimensionsIsVarcharTypeMap[colCount]) { + v = input.readInt(); + } else { + v = input.readShort(); + } if (isRequired[colCount]) { byte[] b = input.readBytes(v); if (isFilterRequired[colCount]) { @@ -561,7 +571,12 @@ public class StreamRecordReader extends RecordReader<Void, Object> { outputValues[colCount] = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY; } else { if (isNoDictColumn[colCount]) { - int v = input.readShort(); + int v = 0; + if (dimensionsIsVarcharTypeMap[colCount]) { + v = input.readInt(); + } else { + v = input.readShort(); + } outputValues[colCount] = input.readBytes(v); } else { outputValues[colCount] = input.readInt(); diff --git a/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv new file mode 100644 index 0000000..b010c07 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/streamSample_with_long_string.csv @@ -0,0 +1,6 @@ +id,name,city,salary,tax,percent,birthday,register,updated,longstr,file +100000001,batch_1,city_1,0.1,0.01,80.01,1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01,1abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...] +100000002,batch_2,city_2,0.2,0.02,80.02,1990-01-02,2010-01-02 10:01:01,2010-01-02 10:01:01,2abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...] +100000003,batch_3,city_3,0.3,0.03,80.03,1990-01-03,2010-01-03 10:01:01,2010-01-03 10:01:01,3abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...] +100000004,batch_4,city_4,0.4,0.04,80.04,1990-01-04,2010-01-04 10:01:01,2010-01-04 10:01:01,4abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...] +100000005,batch_5,city_5,0.5,0.05,80.05,1990-01-05,2010-01-05 10:01:01,2010-01-05 10:01:01,5abcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabcabca [...] diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 90132ff..4440e3a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -40,6 +40,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties @@ -261,6 +262,15 @@ object CarbonAppendableStreamSink { } val rowSchema = queryExecution.analyzed.schema + val isVarcharTypeMapping = { + val col2VarcharType = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getCreateOrderColumn(carbonLoadModel.getTableName).asScala + .map(c => c.getColName -> (c.getDataType == DataTypes.VARCHAR)).toMap + rowSchema.fieldNames.map(c => { + val r = col2VarcharType.get(c.toLowerCase) + r.isDefined && r.get + }) + } // write data file result = sparkSession.sparkContext.runJob(queryExecution.toRdd, (taskContext: TaskContext, iterator: Iterator[InternalRow]) => { @@ -272,7 +282,8 @@ object CarbonAppendableStreamSink { sparkAttemptNumber = taskContext.attemptNumber(), committer, iterator, - rowSchema + rowSchema, + isVarcharTypeMapping ) }) @@ -319,7 +330,8 @@ object CarbonAppendableStreamSink { sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[InternalRow], - rowSchema: StructType): (TaskCommitMessage, StreamFileIndex) = { + rowSchema: StructType, + isVarcharTypeMapping: Array[Boolean]): (TaskCommitMessage, StreamFileIndex) = { val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId) val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) @@ -350,7 +362,8 @@ object CarbonAppendableStreamSink { val streamParser = Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser] - streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema) + streamParser.initialize(taskAttemptContext.getConfiguration, + rowSchema, isVarcharTypeMapping) blockIndex = StreamSegment.appendBatchData(new InputIterator(iterator, streamParser), taskAttemptContext, carbonLoadModel) diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java index 41b378d..4db1154 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java @@ -114,7 +114,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType( DataType carbonDataType) { - if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) { + if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING + || carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) { return DataTypes.StringType; } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) { return DataTypes.ShortType; @@ -170,7 +171,8 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG - || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY) { + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BINARY + || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.VARCHAR) { fields[i] = new StructField(carbonColumn.getColName(), convertCarbonToSparkDataType(dataType), true, null); } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) { diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala new file mode 100644 index 0000000..521b241 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithLongString.scala @@ -0,0 +1,649 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import java.io.{File, PrintWriter} +import java.math.BigDecimal +import java.net.{BindException, ServerSocket} +import java.sql.{Date, Timestamp} + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +case class StreamLongStrData(id: Integer, name: String, city: String, salary: java.lang.Float, + tax: BigDecimal, percent: java.lang.Double, birthday: String, + register: String, updated: String, longStr: String, + file: FileElement) + +class TestStreamingTableWithLongString extends QueryTest with BeforeAndAfterAll { + + private val spark = sqlContext.sparkSession + private val dataFilePath = s"$resourcesPath/streamSample_with_long_string.csv" + private val csvDataDir = integrationPath + "/spark2/target/csvdata_longstr" + private val longStrValue = "abc" * 12000 + + override def beforeAll { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE") + sql("CREATE DATABASE streaming_longstr") + sql("USE streaming_longstr") + + dropTable() + + // 1. streaming table with long string field + // socket source + createTable(tableName = "stream_table_longstr", streaming = true, withBatchLoad = true) + + // 2. streaming table with long string field + // file source + createTable(tableName = "stream_table_longstr_file", streaming = true, withBatchLoad = true) + + // 3. streaming table with long string and complex field + createTableWithComplexType( + tableName = "stream_table_longstr_complex", streaming = true, withBatchLoad = true) + } + + override def afterAll { + dropTable() + sql("USE default") + sql("DROP DATABASE IF EXISTS streaming_longstr CASCADE") + new File(csvDataDir).delete() + } + + def dropTable(): Unit = { + sql("drop table if exists streaming_longstr.stream_table_longstr") + sql("drop table if exists streaming_longstr.stream_table_longstr_file") + sql("drop table if exists streaming_longstr.stream_table_longstr_complex") + } + + // input source: file + test("[CARBONDATA-3497] Support to write long string for streaming table: ingest from file source") { + val identifier = new TableIdentifier("stream_table_longstr_file", Option("streaming_longstr")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, + identifier) + thread.start() + Thread.sleep(3000) + generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir) + Thread.sleep(5000) + thread.interrupt() + checkAnswer( + sql("select count(*) from streaming_longstr.stream_table_longstr_file"), + Seq(Row(25)) + ) + + val row = sql("select * from streaming_longstr.stream_table_longstr_file order by id").head() + val exceptedRow = Row(10, "name_10", "city_10", 100000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), "10" + longStrValue) + assertResult(exceptedRow)(row) + new File(csvDataDir).delete() + } + + test("[CARBONDATA-3497] Support to write long string for streaming table") { + executeStreamingIngest( + tableName = "stream_table_longstr", + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 20, + handoffSize = 51200, + autoHandoff = false + ) + + var result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(1).getString(1) == "name_2") + assert(result(1).getString(9) == ("2" + longStrValue)) + // check one row of batch loading + assert(result(50).getInt(0) == 100000001) + assert(result(50).getString(1) == "batch_1") + assert(result(50).getString(9) == ("1" + longStrValue)) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false) + sql("alter table streaming_longstr.stream_table_longstr finish streaming") + sql("alter table streaming_longstr.stream_table_longstr compact 'streaming'") + sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false) + Thread.sleep(5000) + + result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(2).getString(1) == "name_3") + assert(result(2).getString(9) == ("3" + longStrValue)) + // check one row of batch loading + assert(result(51).getInt(0) == 100000002) + assert(result(51).getString(1) == "batch_2") + assert(result(51).getString(9) == ("2" + longStrValue)) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + sql("alter table streaming_longstr.stream_table_longstr compact 'major'") + sql("show segments for table streaming_longstr.stream_table_longstr").show(20, false) + Thread.sleep(5000) + + result = sql("select * from streaming_longstr.stream_table_longstr order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(3).getString(1) == "name_4") + assert(result(3).getString(9) == ("4" + longStrValue)) + // check one row of batch loading + assert(result(52).getInt(0) == 100000003) + assert(result(52).getString(1) == "batch_3") + assert(result(52).getString(9) == ("3" + longStrValue)) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue)))) + } + + test("[CARBONDATA-3497] Support to write long string for streaming table: include complex column") { + executeStreamingIngest( + tableName = "stream_table_longstr_complex", + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 20, + handoffSize = 51200, + autoHandoff = false + ) + + // non-filter + val result = sql("select * from streaming_longstr.stream_table_longstr_complex order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(3).getString(1) == "name_4") + assert(result(3).getString(9) == ("4" + longStrValue)) + // check one row of batch loading + assert(result(52).getInt(0) == 100000003) + assert(result(52).getString(1) == "batch_3") + assert(result(52).getString(9) == ("3" + longStrValue)) + assert(result(52).getStruct(10).getInt(1) == 40) + + // filter + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr_complex where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 1)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr_complex where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from streaming_longstr.stream_table_longstr_complex where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20)))) + } + + test("[CARBONDATA-3497] Support to write long string for streaming table: StreamSQL") { + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + + var rows = sql("SHOW STREAMS").collect() + assertResult(0)(rows.length) + + val csvDataDir = integrationPath + "/spark2/target/streamSql_longstr" + // streaming ingest 10 rows + generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) + + sql( + s""" + |CREATE TABLE source( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP, + | longstr STRING + |) + |STORED AS carbondata + |TBLPROPERTIES ( + | 'streaming'='source', + | 'format'='csv', + | 'path'='$csvDataDir' + |) + """.stripMargin) + + sql( + s""" + |CREATE TABLE sink( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP, + | longstr STRING + | ) + |STORED AS carbondata + |TBLPROPERTIES('streaming'='sink', 'LONG_STRING_COLUMNS'='longstr') + """.stripMargin) + + sql( + """ + |CREATE STREAM stream123 ON TABLE sink + |STMPROPERTIES( + | 'trigger'='ProcessingTime', + | 'interval'='5 seconds') + |AS + | SELECT * + | FROM source + | WHERE id % 2 = 1 + """.stripMargin).show(false) + + Thread.sleep(200) + sql("select * from sink").show + + generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append) + Thread.sleep(7000) + + // after 2 minibatch, there should be 10 row added (filter condition: id%2=1) + checkAnswer(sql("select count(*) from sink"), Seq(Row(10))) + + val row = sql("select * from sink order by id").head() + val exceptedRow = Row(11, "name_11", "city_11", 110000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("11" + longStrValue)) + assertResult(exceptedRow)(row) + + sql("SHOW STREAMS").show(false) + + rows = sql("SHOW STREAMS").collect() + assertResult(1)(rows.length) + assertResult("stream123")(rows.head.getString(0)) + assertResult("RUNNING")(rows.head.getString(2)) + assertResult("streaming_longstr.source")(rows.head.getString(3)) + assertResult("streaming_longstr.sink")(rows.head.getString(4)) + + rows = sql("SHOW STREAMS ON TABLE sink").collect() + assertResult(1)(rows.length) + assertResult("stream123")(rows.head.getString(0)) + assertResult("RUNNING")(rows.head.getString(2)) + assertResult("streaming_longstr.source")(rows.head.getString(3)) + assertResult("streaming_longstr.sink")(rows.head.getString(4)) + + sql("DROP STREAM stream123") + sql("DROP STREAM IF EXISTS stream123") + + rows = sql("SHOW STREAMS").collect() + assertResult(0)(rows.length) + + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + new File(csvDataDir).delete() + } + + def createWriteSocketThread( + serverSocket: ServerSocket, + writeNums: Int, + rowNums: Int, + intervalSecond: Int): Thread = { + new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to writeNums) { + // write 5 records per iteration + val stringBuilder = new StringBuilder() + for (_ <- 1 to rowNums) { + index = index + 1 + stringBuilder.append(index.toString + ",name_" + index + + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" + + ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01," + + index.toString() + ("abc" * 12000) + + ",school_" + index + ":school_" + index + index + "$" + index) + stringBuilder.append("\n") + } + socketWriter.append(stringBuilder.toString()) + socketWriter.flush() + Thread.sleep(1000 * intervalSecond) + } + socketWriter.close() + } + } + } + + def createSocketStreamingThread( + spark: SparkSession, + port: Int, + carbonTable: CarbonTable, + tableIdentifier: TableIdentifier, + intervalSecond: Int = 2, + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean + ): Thread = { + new Thread() { + override def run(): Unit = { + var qry: StreamingQuery = null + try { + import spark.implicits._ + val readSocketDF = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", port) + .load().as[String] + .map(_.split(",")) + .map { fields => { + val tmp = fields(10).split("\\$") + val file = FileElement(tmp(0).split(":"), tmp(1).toInt) + StreamLongStrData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, + BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble, + fields(6), fields(7), fields(8), fields(9), file) + } } + + // Write data from socket stream to carbondata file + // repartition to simulate an empty partition when readSocketDF has only one row + qry = readSocketDF.repartition(2).writeStream + .format("carbondata") + .trigger(ProcessingTime(s"$intervalSecond seconds")) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) + .option("dbName", tableIdentifier.database.get) + .option("tableName", tableIdentifier.table) + .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize) + .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff) + .start() + qry.awaitTermination() + } catch { + case ex: Throwable => + LOGGER.error(ex.getMessage) + throw new Exception(ex.getMessage, ex) + } finally { + if (null != qry) { + qry.stop() + } + } + } + } + } + + /** + * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times. + */ + def executeStreamingIngest( + tableName: String, + batchNums: Int, + rowNumsEachBatch: Int, + intervalOfSource: Int, + intervalOfIngest: Int, + continueSeconds: Int, + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean + ): Unit = { + val identifier = new TableIdentifier(tableName, Option("streaming_longstr")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + var server: ServerSocket = null + try { + server = getServerSocket() + val thread1 = createWriteSocketThread( + serverSocket = server, + writeNums = batchNums, + rowNums = rowNumsEachBatch, + intervalSecond = intervalOfSource) + val thread2 = createSocketStreamingThread( + spark = spark, + port = server.getLocalPort, + carbonTable = carbonTable, + tableIdentifier = identifier, + intervalSecond = intervalOfIngest, + handoffSize = handoffSize, + autoHandoff = autoHandoff) + thread1.start() + thread2.start() + Thread.sleep(continueSeconds * 1000) + thread2.interrupt() + thread1.interrupt() + } finally { + if (null != server) { + server.close() + } + } + } + + def generateCSVDataFile( + spark: SparkSession, + idStart: Int, + rowNums: Int, + csvDirPath: String, + saveMode: SaveMode = SaveMode.Overwrite): Unit = { + // Create csv data frame file + val csvDataDF = { + // generate data with dimension columns (name and city) + val csvRDD = spark.sparkContext.parallelize(idStart until idStart + rowNums) + .map { id => + (id, + "name_" + id, + "city_" + id, + 10000.00 * id, + BigDecimal.valueOf(0.01), + 80.01, + "1990-01-01", + "2010-01-01 10:01:01", + "2010-01-01 10:01:01", + id.toString() + ("abc" * 12000), + "school_" + id + "\002school_" + id + id + "\001" + id) + } + spark.createDataFrame(csvRDD).toDF( + "id", "name", "city", "salary", "tax", "percent", "birthday", "register", "updated", "longstr", "file") + } + + csvDataDF.write + .option("header", "false") + .mode(saveMode) + .csv(csvDirPath) + } + + def createFileStreamingThread( + spark: SparkSession, + carbonTable: CarbonTable, + csvDataDir: String, + intervalSecond: Int, + tableIdentifier: TableIdentifier): Thread = { + new Thread() { + override def run(): Unit = { + var qry: StreamingQuery = null + try { + val readSocketDF = spark.readStream.text(csvDataDir) + + // Write data from socket stream to carbondata file + qry = readSocketDF.writeStream + .format("carbondata") + .trigger(ProcessingTime(s"${ intervalSecond } seconds")) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) + .option("dbName", tableIdentifier.database.get) + .option("tableName", tableIdentifier.table) + .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + .option(CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_CSV) + .start() + + qry.awaitTermination() + } catch { + case _: InterruptedException => + println("Done reading and writing streaming data") + } finally { + if (qry != null) { + qry.stop() + } + } + } + } + } + + def createTable(tableName: String, streaming: Boolean, withBatchLoad: Boolean): Unit = { + sql( + s""" + | CREATE TABLE streaming_longstr.$tableName( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP, + | longstr STRING + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } + | 'sort_columns'='name', 'dictionary_include'='city,register', 'LONG_STRING_COLUMNS'='longstr') + | """.stripMargin) + + if (withBatchLoad) { + // batch loading 5 rows + executeBatchLoad(tableName) + } + } + + def createTableWithComplexType( + tableName: String, + streaming: Boolean, + withBatchLoad: Boolean): Unit = { + sql( + s""" + | CREATE TABLE streaming_longstr.$tableName( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP, + | longstr STRING, + | file struct<school:array<string>, age:int> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } + | 'sort_columns'='name', 'dictionary_include'='id,name,salary,tax,percent,updated', 'LONG_STRING_COLUMNS'='longstr') + | """.stripMargin) + + if (withBatchLoad) { + // batch loading 5 rows + executeBatchLoad(tableName) + } + } + + def executeBatchLoad(tableName: String): Unit = { + sql( + s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE streaming_longstr.$tableName OPTIONS" + + "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')") + } + + def wrap(array: Array[String]) = { + new mutable.WrappedArray.ofRef(array) + } + + /** + * get a ServerSocket + * if the address was already used, it will retry to use new port number. + * + * @return ServerSocket + */ + def getServerSocket(): ServerSocket = { + var port = 7071 + var serverSocket: ServerSocket = null + var retry = false + do { + try { + retry = false + serverSocket = new ServerSocket(port) + } catch { + case ex: BindException => + retry = true + port = port + 2 + if (port >= 65535) { + throw ex + } + } + } while (retry) + serverSocket + } +} diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java index 5ef5ab9..1f642e5 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java @@ -91,6 +91,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { private boolean[] isNoDictionaryDimensionColumn; private int dimensionWithComplexCount; private int measureCount; + private boolean[] dimensionsIsVarcharTypeMap; private DataType[] measureDataTypes; private StreamBlockletWriter output = null; private String compressorName; @@ -147,6 +148,10 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { dimensionWithComplexCount = configuration.getDimensionCount(); measureCount = configuration.getMeasureCount(); dataFields = configuration.getDataFields(); + dimensionsIsVarcharTypeMap = new boolean[dimensionWithComplexCount]; + for (int i = 0; i < dimensionWithComplexCount; i++) { + dimensionsIsVarcharTypeMap[i] = dataFields[i].getColumn().getDataType() == DataTypes.VARCHAR; + } measureDataTypes = new DataType[measureCount]; for (int i = 0; i < measureCount; i++) { measureDataTypes[i] = @@ -234,7 +239,11 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { if (null != columnValue) { if (isNoDictionaryDimensionColumn[dimCount]) { byte[] col = (byte[]) columnValue; - output.writeShort(col.length); + if (dimensionsIsVarcharTypeMap[dimCount]) { + output.writeInt(col.length); + } else { + output.writeShort(col.length); + } output.writeBytes(col); output.dimStatsCollectors[dimCount].update(col); } else { diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java index 00d06b6..bf2c460 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CSVStreamParserImp.java @@ -21,6 +21,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; import com.univocity.parsers.csv.CsvParser; import com.univocity.parsers.csv.CsvParserSettings; + import org.apache.hadoop.conf.Configuration; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; @@ -32,7 +33,9 @@ public class CSVStreamParserImp implements CarbonStreamParser { private CsvParser csvParser; - @Override public void initialize(Configuration configuration, StructType structType) { + @Override public void initialize( + Configuration configuration, + StructType structType, boolean[] isVarcharTypeMapping) { CsvParserSettings settings = CSVInputFormat.extractCsvParserSettings(configuration); csvParser = new CsvParser(settings); } diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java index 94f0307..e68117c 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java @@ -36,7 +36,8 @@ public interface CarbonStreamParser { String CARBON_STREAM_PARSER_DEFAULT = CARBON_STREAM_PARSER_ROW_PARSER; - void initialize(Configuration configuration, StructType structType); + void initialize(Configuration configuration, + StructType structType, boolean[] isVarcharTypeMapping); Object[] parserRow(InternalRow value); diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala index cb12bb6..16e7258 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala @@ -36,6 +36,7 @@ import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConst class RowStreamParserImp extends CarbonStreamParser { var configuration: Configuration = null + var isVarcharTypeMapping: Array[Boolean] = null var structType: StructType = null var encoder: ExpressionEncoder[Row] = null @@ -44,10 +45,12 @@ class RowStreamParserImp extends CarbonStreamParser { var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]() var serializationNullFormat: String = null - override def initialize(configuration: Configuration, structType: StructType): Unit = { + override def initialize(configuration: Configuration, + structType: StructType, isVarcharTypeMapping: Array[Boolean]): Unit = { this.configuration = configuration this.structType = structType this.encoder = RowEncoder.apply(this.structType).resolveAndBind() + this.isVarcharTypeMapping = isVarcharTypeMapping this.timeStampFormat = new SimpleDateFormat( this.configuration.get(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)) @@ -62,12 +65,12 @@ class RowStreamParserImp extends CarbonStreamParser { } override def parserRow(value: InternalRow): Array[Object] = { - this.encoder.fromRow(value).toSeq.map { x => { + this.encoder.fromRow(value).toSeq.zipWithIndex.map { case (x, i) => FieldConverter.objectToString( x, serializationNullFormat, complexDelimiters, - timeStampFormat, dateFormat) + timeStampFormat, dateFormat, + isVarcharType = i < this.isVarcharTypeMapping.length && this.isVarcharTypeMapping(i)) } }.toArray - } override def close(): Unit = { }
