This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 3ebf7f6 [CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns 3ebf7f6 is described below commit 3ebf7f61f3326bdc9f16a1665d44bea82828d3f7 Author: Zhang Zhichao <441586...@qq.com> AuthorDate: Sat Dec 7 11:25:04 2019 +0800 [CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns Problem: Filter failed with measure columns on stream table when this stream table includes complex columns Solution: Use 'segmentProperties.getDimensions().size()' instead of 'segmentProperties.getLastDimensionColOrdinal()' when set 'columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray' on stream data file. This closes #3503 --- .../indexstore/blockletindex/BlockDataMap.java | 8 +- .../scan/executor/impl/AbstractQueryExecutor.java | 2 +- .../carbondata/core/scan/filter/FilterUtil.java | 57 ++-- .../carbondata/core/stream/StreamPruner.java | 2 +- .../datamap/examples/MinMaxIndexDataMap.java | 2 +- .../hadoop/stream/StreamRecordReader.java | 4 +- .../carbondata/TestStreamingTableQueryFilter.scala | 315 +++++++++++++++++++++ 7 files changed, 360 insertions(+), 30 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index e83985f..c865603 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -610,8 +610,8 @@ public class BlockDataMap extends CoarseGrainDataMap @Override public boolean isScanRequired(FilterResolverIntf filterExp) { - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns()); + FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), false); DataMapRow unsafeRow = taskSummaryDMStore .getDataMapRow(getTaskSummarySchema(), taskSummaryDMStore.getRowCount() - 1); boolean isScanRequired = FilterExpressionProcessor @@ -741,8 +741,8 @@ public class BlockDataMap extends CoarseGrainDataMap // Remove B-tree jump logic as start and end key prepared is not // correct for old store scenarios int entryIndex = 0; - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(filterExp, getSegmentProperties(), null, getMinMaxCacheColumns()); + FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree( + filterExp, getSegmentProperties(), null, getMinMaxCacheColumns(), false); // flag to be used for deciding whether use min/max in executor pruning for BlockletDataMap boolean useMinMaxForPruning = useMinMaxForExecutorPruning(filterExp); // min and max for executor pruning diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 9688868..c891ba2 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -522,7 +522,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { filterResolverIntf = queryModel.getDataMapFilter().getResolver(); blockExecutionInfo.setFilterExecuterTree( FilterUtil.getFilterExecuterTree(filterResolverIntf, segmentProperties, - blockExecutionInfo.getComlexDimensionInfoMap())); + blockExecutionInfo.getComlexDimensionInfoMap(), false)); } try { startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index a096051..679ee43 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -134,12 +134,16 @@ public final class FilterUtil { * * @param filterExpressionResolverTree * @param segmentProperties + * @param complexDimensionInfoMap + * @param minMaxCacheColumns + * @param isStreamDataFile: whether create filter executer tree for stream data files * @return FilterExecuter instance + * */ private static FilterExecuter createFilterExecuterTree( FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties, Map<Integer, GenericQueryType> complexDimensionInfoMap, - List<CarbonColumn> minMaxCacheColumns) { + List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) { FilterExecuterType filterExecuterType = filterExpressionResolverTree.getFilterExecuterType(); if (null != filterExecuterType) { switch (filterExecuterType) { @@ -154,7 +158,7 @@ public final class FilterUtil { } // return true filter expression if filter column min/max is not cached in driver if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree, - segmentProperties, minMaxCacheColumns)) { + segmentProperties, minMaxCacheColumns, isStreamDataFile)) { return new TrueFilterExecutor(); } return getIncludeFilterExecuter( @@ -167,15 +171,15 @@ public final class FilterUtil { case OR: return new OrFilterExecuterImpl( createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties, - complexDimensionInfoMap, minMaxCacheColumns), + complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile), createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties, - complexDimensionInfoMap, minMaxCacheColumns)); + complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile)); case AND: return new AndFilterExecuterImpl( createFilterExecuterTree(filterExpressionResolverTree.getLeft(), segmentProperties, - complexDimensionInfoMap, minMaxCacheColumns), + complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile), createFilterExecuterTree(filterExpressionResolverTree.getRight(), segmentProperties, - complexDimensionInfoMap, minMaxCacheColumns)); + complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile)); case ROWLEVEL_LESSTHAN: case ROWLEVEL_LESSTHAN_EQUALTO: case ROWLEVEL_GREATERTHAN_EQUALTO: @@ -186,7 +190,7 @@ public final class FilterUtil { if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression( rowLevelRangeFilterResolver.getDimColEvaluatorInfoList(), rowLevelRangeFilterResolver.getMsrColEvalutorInfoList(), segmentProperties, - minMaxCacheColumns)) { + minMaxCacheColumns, isStreamDataFile)) { return new TrueFilterExecutor(); } return RowLevelRangeTypeExecuterFactory @@ -195,7 +199,7 @@ public final class FilterUtil { case RANGE: // return true filter expression if filter column min/max is not cached in driver if (checkIfCurrentNodeToBeReplacedWithTrueFilterExpression(filterExpressionResolverTree, - segmentProperties, minMaxCacheColumns)) { + segmentProperties, minMaxCacheColumns, isStreamDataFile)) { return new TrueFilterExecutor(); } return new RangeValueFilterExecuterImpl( @@ -291,20 +295,21 @@ public final class FilterUtil { private static boolean checkIfCurrentNodeToBeReplacedWithTrueFilterExpression( List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList, List<MeasureColumnResolvedFilterInfo> msrColEvaluatorInfoList, - SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns) { + SegmentProperties segmentProperties, List<CarbonColumn> minMaxCacheColumns, + boolean isStreamDataFile) { boolean replaceCurrentNodeWithTrueFilter = false; ColumnResolvedFilterInfo columnResolvedFilterInfo = null; if (!msrColEvaluatorInfoList.isEmpty()) { columnResolvedFilterInfo = msrColEvaluatorInfoList.get(0); replaceCurrentNodeWithTrueFilter = checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties, - minMaxCacheColumns, true); + minMaxCacheColumns, true, isStreamDataFile); } else { columnResolvedFilterInfo = dimColEvaluatorInfoList.get(0); if (!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) { replaceCurrentNodeWithTrueFilter = checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties, - minMaxCacheColumns, false); + minMaxCacheColumns, false, isStreamDataFile); } } return replaceCurrentNodeWithTrueFilter; @@ -317,24 +322,25 @@ public final class FilterUtil { * @param filterExpressionResolverTree * @param segmentProperties * @param minMaxCacheColumns + * @Param isStreamDataFile * @return */ private static boolean checkIfCurrentNodeToBeReplacedWithTrueFilterExpression( FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties, - List<CarbonColumn> minMaxCacheColumns) { + List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) { boolean replaceCurrentNodeWithTrueFilter = false; ColumnResolvedFilterInfo columnResolvedFilterInfo = null; if (null != filterExpressionResolverTree.getMsrColResolvedFilterInfo()) { columnResolvedFilterInfo = filterExpressionResolverTree.getMsrColResolvedFilterInfo(); replaceCurrentNodeWithTrueFilter = checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties, - minMaxCacheColumns, true); + minMaxCacheColumns, true, isStreamDataFile); } else { columnResolvedFilterInfo = filterExpressionResolverTree.getDimColResolvedFilterInfo(); if (!columnResolvedFilterInfo.getDimension().hasEncoding(Encoding.IMPLICIT)) { replaceCurrentNodeWithTrueFilter = checkIfFilterColumnIsCachedInDriver(columnResolvedFilterInfo, segmentProperties, - minMaxCacheColumns, false); + minMaxCacheColumns, false, isStreamDataFile); } } return replaceCurrentNodeWithTrueFilter; @@ -352,7 +358,7 @@ public final class FilterUtil { */ private static boolean checkIfFilterColumnIsCachedInDriver( ColumnResolvedFilterInfo columnResolvedFilterInfo, SegmentProperties segmentProperties, - List<CarbonColumn> minMaxCacheColumns, boolean isMeasure) { + List<CarbonColumn> minMaxCacheColumns, boolean isMeasure, boolean isStreamDataFile) { boolean replaceCurrentNodeWithTrueFilter = false; CarbonColumn columnFromCurrentBlock = null; if (isMeasure) { @@ -377,8 +383,17 @@ public final class FilterUtil { // if columns to be cached are not specified then in that case all columns will be cached // and then the ordinal of column will be its index in the min/max byte array if (isMeasure) { - columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray( - segmentProperties.getLastDimensionColOrdinal() + columnFromCurrentBlock.getOrdinal()); + // when read from stream data file, minmax columns cache don't include complex columns, + // so it can not use 'segmentProperties.getLastDimensionColOrdinal()' as + // last dimension ordinal. + if (isStreamDataFile) { + columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray( + segmentProperties.getDimensions().size() + columnFromCurrentBlock.getOrdinal()); + } else { + columnResolvedFilterInfo.setColumnIndexInMinMaxByteArray( + segmentProperties.getLastDimensionColOrdinal() + columnFromCurrentBlock + .getOrdinal()); + } } else { columnResolvedFilterInfo .setColumnIndexInMinMaxByteArray(columnFromCurrentBlock.getOrdinal()); @@ -1492,9 +1507,9 @@ public final class FilterUtil { */ public static FilterExecuter getFilterExecuterTree( FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties, - Map<Integer, GenericQueryType> complexDimensionInfoMap) { + Map<Integer, GenericQueryType> complexDimensionInfoMap, boolean isStreamDataFile) { return getFilterExecuterTree(filterExpressionResolverTree, segmentProperties, - complexDimensionInfoMap, null); + complexDimensionInfoMap, null, isStreamDataFile); } /** @@ -1507,9 +1522,9 @@ public final class FilterUtil { public static FilterExecuter getFilterExecuterTree( FilterResolverIntf filterExpressionResolverTree, SegmentProperties segmentProperties, Map<Integer, GenericQueryType> complexDimensionInfoMap, - List<CarbonColumn> minMaxCacheColumns) { + List<CarbonColumn> minMaxCacheColumns, boolean isStreamDataFile) { return createFilterExecuterTree(filterExpressionResolverTree, segmentProperties, - complexDimensionInfoMap, minMaxCacheColumns); + complexDimensionInfoMap, minMaxCacheColumns, isStreamDataFile); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java index c92a8a1..e8790ee 100644 --- a/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java +++ b/core/src/main/java/org/apache/carbondata/core/stream/StreamPruner.java @@ -72,7 +72,7 @@ public class StreamPruner { SegmentProperties segmentProperties = new SegmentProperties(listOfColumns, columnCardinality); filterExecuter = FilterUtil.getFilterExecuterTree( - filterExp, segmentProperties, null, minMaxCacheColumns); + filterExp, segmentProperties, null, minMaxCacheColumns, false); } } diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java index d32afd9..d860229 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java @@ -135,7 +135,7 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap { } } else { FilterExecuter filterExecuter = - FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null, false); for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) { for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; blkltIdx++) { diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java index 020af65..3ea65e5 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/stream/StreamRecordReader.java @@ -236,8 +236,8 @@ public class StreamRecordReader extends RecordReader<Void, Object> { Map<Integer, GenericQueryType> complexDimensionInfoMap = new HashMap<>(); FilterResolverIntf resolverIntf = model.getDataMapFilter().getResolver(); - filter = - FilterUtil.getFilterExecuterTree(resolverIntf, segmentProperties, complexDimensionInfoMap); + filter = FilterUtil.getFilterExecuterTree( + resolverIntf, segmentProperties, complexDimensionInfoMap, true); // for row filter, we need update column index FilterUtil.updateIndexOfColumnExpression(resolverIntf.getFilterExpression(), carbonTable.getDimensionOrdinalMax()); diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala new file mode 100644 index 0000000..f47e18d --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableQueryFilter.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import java.io.PrintWriter +import java.math.BigDecimal +import java.net.{BindException, ServerSocket} +import java.sql.{Date, Timestamp} + +import scala.collection.mutable + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonTablePath + +class TestStreamingTableQueryFilter extends QueryTest with BeforeAndAfterAll { + + private val spark = sqlContext.sparkSession + private val dataFilePath = s"$resourcesPath/streamSample_with_long_string.csv" + private val longStrValue = "abc" * 12000 + + override def beforeAll { + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty( + CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE") + sql("CREATE DATABASE streaming_table_filter") + sql("USE streaming_table_filter") + + dropTable() + createTableWithComplexType( + tableName = "stream_filter", streaming = true, withBatchLoad = true) + } + + override def afterAll { + dropTable() + sql("USE default") + sql("DROP DATABASE IF EXISTS streaming_table_filter CASCADE") + } + + def dropTable(): Unit = { + sql("drop table if exists streaming_table_filter.stream_filter") + } + + test("[CARBONDATA-3611] Fix failed when filter with measure columns on stream table when this stream table includes complex columns") { + executeStreamingIngest( + tableName = "stream_filter", + batchNums = 2, + rowNumsEachBatch = 25, + intervalOfSource = 5, + intervalOfIngest = 5, + continueSeconds = 20, + handoffSize = 51200, + autoHandoff = false + ) + + // non-filter + val result = sql("select * from streaming_table_filter.stream_filter order by id, name").collect() + assert(result != null) + assert(result.length == 55) + // check one row of streaming data + assert(result(3).getString(1) == "name_4") + assert(result(3).getString(9) == ("4" + longStrValue)) + // check one row of batch loading + assert(result(52).getInt(0) == 100000003) + assert(result(52).getString(1) == "batch_3") + assert(result(52).getString(9) == ("3" + longStrValue)) + assert(result(52).getStruct(10).getInt(1) == 40) + + // filter + checkAnswer( + sql("select * from streaming_table_filter.stream_filter where id = 1"), + Seq(Row(1, "name_1", "city_1", 10000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 1)))) + + checkAnswer( + sql("select * from streaming_table_filter.stream_filter where id > 49 and id < 100000002"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from streaming_table_filter.stream_filter where id between 50 and 100000001"), + Seq(Row(50, "name_50", "city_50", 500000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("50" + longStrValue), Row(wrap(Array("school_50", "school_5050")), 50)), + Row(100000001, "batch_1", "city_1", 0.1, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("1" + longStrValue), Row(wrap(Array("school_1", "school_11")), 20)))) + + checkAnswer( + sql("select * from streaming_table_filter.stream_filter where salary = 490000.0 and percent = 80.01"), + Seq(Row(49, "name_49", "city_49", 490000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("49" + longStrValue), Row(wrap(Array("school_49", "school_4949")), 49)))) + + checkAnswer( + sql("select * from streaming_table_filter.stream_filter where id > 20 and salary = 300000.0 and file.age > 25"), + Seq(Row(30, "name_30", "city_30", 300000.0, BigDecimal.valueOf(0.01), 80.01, Date.valueOf("1990-01-01"), Timestamp.valueOf("2010-01-01 10:01:01.0"), Timestamp.valueOf("2010-01-01 10:01:01.0"), ("30" + longStrValue), Row(wrap(Array("school_30", "school_3030")), 30)))) + } + + def createWriteSocketThread( + serverSocket: ServerSocket, + writeNums: Int, + rowNums: Int, + intervalSecond: Int): Thread = { + new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to writeNums) { + // write 5 records per iteration + val stringBuilder = new StringBuilder() + for (_ <- 1 to rowNums) { + index = index + 1 + stringBuilder.append(index.toString + ",name_" + index + + ",city_" + index + "," + (10000.00 * index).toString + ",0.01,80.01" + + ",1990-01-01,2010-01-01 10:01:01,2010-01-01 10:01:01," + + index.toString() + ("abc" * 12000) + + ",school_" + index + ":school_" + index + index + "$" + index) + stringBuilder.append("\n") + } + socketWriter.append(stringBuilder.toString()) + socketWriter.flush() + Thread.sleep(1000 * intervalSecond) + } + socketWriter.close() + } + } + } + + def createSocketStreamingThread( + spark: SparkSession, + port: Int, + carbonTable: CarbonTable, + tableIdentifier: TableIdentifier, + intervalSecond: Int = 2, + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean + ): Thread = { + new Thread() { + override def run(): Unit = { + var qry: StreamingQuery = null + try { + import spark.implicits._ + val readSocketDF = spark.readStream + .format("socket") + .option("host", "localhost") + .option("port", port) + .load().as[String] + .map(_.split(",")) + .map { fields => { + val tmp = fields(10).split("\\$") + val file = FileElement(tmp(0).split(":"), tmp(1).toInt) + StreamLongStrData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat, + BigDecimal.valueOf(fields(4).toDouble), fields(5).toDouble, + fields(6), fields(7), fields(8), fields(9), file) + } } + + // Write data from socket stream to carbondata file + // repartition to simulate an empty partition when readSocketDF has only one row + qry = readSocketDF.repartition(2).writeStream + .format("carbondata") + .trigger(ProcessingTime(s"$intervalSecond seconds")) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) + .option("dbName", tableIdentifier.database.get) + .option("tableName", tableIdentifier.table) + .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize) + .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff) + .start() + qry.awaitTermination() + } catch { + case ex: Throwable => + LOGGER.error(ex.getMessage) + throw new Exception(ex.getMessage, ex) + } finally { + if (null != qry) { + qry.stop() + } + } + } + } + } + + /** + * start ingestion thread: write `rowNumsEachBatch` rows repeatly for `batchNums` times. + */ + def executeStreamingIngest( + tableName: String, + batchNums: Int, + rowNumsEachBatch: Int, + intervalOfSource: Int, + intervalOfIngest: Int, + continueSeconds: Int, + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean + ): Unit = { + val identifier = new TableIdentifier(tableName, Option("streaming_table_filter")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetaStore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + var server: ServerSocket = null + try { + server = getServerSocket() + val thread1 = createWriteSocketThread( + serverSocket = server, + writeNums = batchNums, + rowNums = rowNumsEachBatch, + intervalSecond = intervalOfSource) + val thread2 = createSocketStreamingThread( + spark = spark, + port = server.getLocalPort, + carbonTable = carbonTable, + tableIdentifier = identifier, + intervalSecond = intervalOfIngest, + handoffSize = handoffSize, + autoHandoff = autoHandoff) + thread1.start() + thread2.start() + Thread.sleep(continueSeconds * 1000) + thread2.interrupt() + thread1.interrupt() + } finally { + if (null != server) { + server.close() + } + } + } + + def createTableWithComplexType( + tableName: String, + streaming: Boolean, + withBatchLoad: Boolean): Unit = { + sql( + s""" + | CREATE TABLE streaming_table_filter.$tableName( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | tax DECIMAL(8,2), + | percent double, + | birthday DATE, + | register TIMESTAMP, + | updated TIMESTAMP, + | longstr STRING, + | file struct<school:array<string>, age:int> + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES(${if (streaming) "'streaming'='true', " else "" } + | 'sort_columns'='name', 'dictionary_include'='name,tax,percent,updated', 'LONG_STRING_COLUMNS'='longstr') + | """.stripMargin) + + if (withBatchLoad) { + // batch loading 5 rows + executeBatchLoad(tableName) + } + } + + def executeBatchLoad(tableName: String): Unit = { + sql( + s"LOAD DATA LOCAL INPATH '$dataFilePath' INTO TABLE streaming_table_filter.$tableName OPTIONS" + + "('HEADER'='true','COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')") + } + + def wrap(array: Array[String]) = { + new mutable.WrappedArray.ofRef(array) + } + + /** + * get a ServerSocket + * if the address was already used, it will retry to use new port number. + * + * @return ServerSocket + */ + def getServerSocket(): ServerSocket = { + var port = 7071 + var serverSocket: ServerSocket = null + var retry = false + do { + try { + retry = false + serverSocket = new ServerSocket(port) + } catch { + case ex: BindException => + retry = true + port = port + 2 + if (port >= 65535) { + throw ex + } + } + } while (retry) + serverSocket + } +}