Repository: carbondata Updated Branches: refs/heads/master 4c692d185 -> 560bfbe77
[CARBONDATA-2919] Support ingest from Kafka in StreamSQL This closes #2695 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/560bfbe7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/560bfbe7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/560bfbe7 Branch: refs/heads/master Commit: 560bfbe778a29bad285a6388686d888a3e877e33 Parents: 4c692d1 Author: Jacky Li <[email protected]> Authored: Tue Sep 11 12:17:55 2018 +0800 Committer: QiangCai <[email protected]> Committed: Thu Sep 13 16:42:20 2018 +0800 ---------------------------------------------------------------------- .../core/locks/AbstractCarbonLock.java | 10 + .../carbondata/core/locks/HdfsFileLock.java | 4 - .../carbondata/core/locks/ICarbonLock.java | 5 + .../carbondata/core/locks/LocalFileLock.java | 5 - .../carbondata/core/locks/S3FileLock.java | 4 - .../core/metadata/schema/table/CarbonTable.java | 8 + .../carbondata/core/util/SessionParams.java | 2 + datamap/mv/plan/pom.xml | 6 + examples/spark2/pom.xml | 4 + .../carbondata/examples/StreamSQLExample.scala | 107 ++++++++++ .../examples/StructuredStreamingExample.scala | 1 + .../org/apache/carbondata/spark/util/Util.java | 11 + .../org/apache/carbondata/api/CarbonStore.scala | 43 ++-- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 2 + .../streaming/CarbonAppendableStreamSink.scala | 1 + .../datasources/CarbonSparkDataSourceUtil.scala | 16 +- integration/spark2/pom.xml | 6 + .../carbondata/spark/util/CarbonSparkUtil.scala | 10 + .../carbondata/stream/StreamJobManager.scala | 43 ++-- .../CarbonAlterTableCompactionCommand.scala | 5 +- .../management/CarbonShowLoadsCommand.scala | 10 +- .../stream/CarbonCreateStreamCommand.scala | 213 ++++++++++++++++--- .../TestStreamingTableOperation.scala | 21 +- pom.xml | 6 + .../loading/parser/impl/RowParserImpl.java | 3 + .../streaming/segment/StreamSegment.java | 2 +- 26 files changed, 451 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java index 4aa0a18..37f77b2 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java @@ -30,8 +30,18 @@ public abstract class AbstractCarbonLock implements ICarbonLock { private int retryTimeout; + /** + * lockFilePath is the location of the lock file. + */ + protected String lockFilePath; + public abstract boolean lock(); + @Override + public String getLockFilePath() { + return this.lockFilePath; + } + /** * API for enabling the locking of file with retries. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java index ade4212..bc65ece 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java @@ -33,10 +33,6 @@ public class HdfsFileLock extends AbstractCarbonLock { private static final LogService LOGGER = LogServiceFactory.getLogService(HdfsFileLock.class.getName()); - /** - * lockFilePath is the location of the lock file. - */ - private String lockFilePath; /** * lockFileDir is the directory of the lock file. http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java index e964f0c..ab20a5e 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java @@ -51,4 +51,9 @@ public interface ICarbonLock { */ boolean releaseLockManually(String lockFile); + /** + * Return the path to the lock file + * @return lock file path + */ + String getLockFilePath(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java index 1148ae2..5e3033e 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java @@ -36,11 +36,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; */ public class LocalFileLock extends AbstractCarbonLock { /** - * lockFilePath is the location of the lock file. - */ - private String lockFilePath; - - /** * lockFileDir is the directory of the lock file. */ private String lockFileDir; http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java index 464becb..10bab28 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/S3FileLock.java @@ -34,10 +34,6 @@ public class S3FileLock extends AbstractCarbonLock { private static final LogService LOGGER = LogServiceFactory.getLogService(S3FileLock.class.getName()); - /** - * lockFilePath is the location of the lock file. - */ - private String lockFilePath; /** * lockFileDir is the directory of the lock file. http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 21f24d6..c606063 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -1186,6 +1186,14 @@ public class CarbonTable implements Serializable { } /** + * Return the format value defined in table properties + * @return String as per table properties, null if not defined + */ + public String getFormat() { + return getTableInfo().getFactTable().getTableProperties().get("format"); + } + + /** * Method to get the list of cached columns of the table. * This method need to be used for Describe formatted like scenario where columns need to be * displayed in the column create order http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index 2abf0e1..931e106 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -36,6 +36,7 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_ import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SEARCH_MODE_ENABLE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD; +import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE; import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_OFFHEAP_SORT; import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION; import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT; @@ -162,6 +163,7 @@ public class SessionParams implements Serializable, Cloneable { case CARBON_SEARCH_MODE_ENABLE: case ENABLE_VECTOR_READER: case ENABLE_UNSAFE_IN_QUERY_EXECUTION: + case ENABLE_AUTO_LOAD_MERGE: isValid = CarbonUtil.validateBoolean(value); if (!isValid) { throw new InvalidConfigurationException("Invalid value " + value + " for key " + key); http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/datamap/mv/plan/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml index ff6976d..982724d 100644 --- a/datamap/mv/plan/pom.xml +++ b/datamap/mv/plan/pom.xml @@ -48,6 +48,12 @@ <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.binary.version}</artifactId> <version>${spark.version}</version> + <exclusions> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index 91cb20f..bd497c5 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -55,6 +55,10 @@ </dependency> <dependency> <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala new file mode 100644 index 0000000..58f51bd --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StreamSQLExample.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File +import java.net.ServerSocket + +import org.apache.carbondata.examples.util.ExampleUtils + +// scalastyle:off println +object StreamSQLExample { + def main(args: Array[String]) { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + + val spark = ExampleUtils.createCarbonSession("StructuredStreamingExample", 4) + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS sink") + spark.sql("DROP TABLE IF EXISTS source") + + // Create target carbon table and populate with initial data + spark.sql( + s""" + | CREATE TABLE sink( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file struct<school:array<string>, age:int> + | ) + | STORED AS carbondata + | TBLPROPERTIES( + | 'streaming'='true', 'sort_columns'='') + """.stripMargin) + } + + spark.sql( + """ + | CREATE TABLE source ( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT, + | file struct<school:array<string>, age:int> + | ) + | STORED AS carbondata + | TBLPROPERTIES( + | 'streaming'='source', + | 'format'='socket', + | 'host'='localhost', + | 'port'='7071') + """.stripMargin) + + val serverSocket = new ServerSocket(7071) + + // start ingest streaming job + spark.sql( + s""" + | CREATE STREAM ingest ON TABLE sink + | STMPROPERTIES( + | 'trigger' = 'ProcessingTime', + | 'interval' = '3 seconds') + | AS SELECT * FROM source + """.stripMargin) + + // start writing data into the socket + import StructuredStreamingExample.{showTableCount, writeSocket} + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, "sink") + + System.out.println("type enter to interrupt streaming") + System.in.read() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + + // stop streaming job + spark.sql("DROP STREAM ingest").show + + spark.stop() + System.out.println("streaming finished") + } + +} + +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala index f88d8ee..31de668 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/StructuredStreamingExample.scala @@ -130,6 +130,7 @@ object StructuredStreamingExample { override def run(): Unit = { for (_ <- 0 to 1000) { spark.sql(s"select count(*) from $tableName").show(truncate = false) + spark.sql(s"show segments for table $tableName").show Thread.sleep(1000 * 3) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java index b4c8250..832a1b2 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.hadoop.CarbonInputSplit; @@ -83,6 +84,16 @@ public class Util { } } + public static StructType convertToSparkSchema(CarbonTable table) { + List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName()); + ColumnSchema[] schema = new ColumnSchema[columns.size()]; + int i = 0; + for (CarbonColumn column : columns) { + schema[i++] = column.getColumnSchema(); + } + return convertToSparkSchema(table, schema); + } + public static StructType convertToSparkSchema(CarbonTable table, ColumnSchema[] carbonColumns) { List<StructField> fields = new ArrayList<>(carbonColumns.length); for (int i = 0; i < carbonColumns.length; i++) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index 02c8607..3864b5d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -37,21 +37,23 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.streaming.segment.StreamSegment object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) def showSegments( limit: Option[String], - tableFolderPath: String, + tablePath: String, showHistory: Boolean): Seq[Row] = { + val metaFolder = CarbonTablePath.getMetadataPath(tablePath) val loadMetadataDetailsArray = if (showHistory) { - SegmentStatusManager.readLoadMetadata(tableFolderPath) ++ - SegmentStatusManager.readLoadHistoryMetadata(tableFolderPath) + SegmentStatusManager.readLoadMetadata(metaFolder) ++ + SegmentStatusManager.readLoadHistoryMetadata(metaFolder) } else { - SegmentStatusManager.readLoadMetadata(tableFolderPath) + SegmentStatusManager.readLoadMetadata(metaFolder) } if (loadMetadataDetailsArray.nonEmpty) { @@ -84,18 +86,31 @@ object CarbonStore { val startTime = if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { - null + "NA" } else { - new java.sql.Timestamp(load.getLoadStartTime) + new java.sql.Timestamp(load.getLoadStartTime).toString } val endTime = if (load.getLoadEndTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) { - null + "NA" } else { - new java.sql.Timestamp(load.getLoadEndTime) + new java.sql.Timestamp(load.getLoadEndTime).toString } + val (dataSize, indexSize) = if (load.getFileFormat == FileFormat.ROW_V1) { + // for streaming segment, we should get the actual size from the index file + // since it is continuously inserting data + val segmentDir = CarbonTablePath.getSegmentPath(tablePath, load.getLoadName) + val indexPath = CarbonTablePath.getCarbonStreamIndexFilePath(segmentDir) + val indices = StreamSegment.readIndexFile(indexPath, FileFactory.getFileType(indexPath)) + (indices.asScala.map(_.getFile_size).sum, FileFactory.getCarbonFile(indexPath).getSize) + } else { + // for batch segment, we can get the data size from table status file directly + (if (load.getDataSize == null) 0L else load.getDataSize.toLong, + if (load.getIndexSize == null) 0L else load.getIndexSize.toLong) + } + if (showHistory) { Row( load.getLoadName, @@ -104,9 +119,9 @@ object CarbonStore { endTime, mergedTo, load.getFileFormat.toString, - load.getVisibility(), - Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat), - Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat)) + load.getVisibility, + Strings.formatSize(dataSize.toFloat), + Strings.formatSize(indexSize.toFloat)) } else { Row( load.getLoadName, @@ -115,8 +130,8 @@ object CarbonStore { endTime, mergedTo, load.getFileFormat.toString, - Strings.formatSize(if (load.getDataSize == null) 0 else load.getDataSize.toFloat), - Strings.formatSize(if (load.getIndexSize == null) 0 else load.getIndexSize.toFloat)) + Strings.formatSize(dataSize.toFloat), + Strings.formatSize(indexSize.toFloat)) } }.toSeq } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index ab6e320..57b2e44 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -410,6 +410,8 @@ object StreamHandoffRDD { } else { newSegment.get.setSegmentStatus(SegmentStatus.SUCCESS) newSegment.get.setLoadEndTime(System.currentTimeMillis()) + CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newSegment.get, loadModel.getSegmentId, + loadModel.getCarbonDataLoadSchema.getCarbonTable) } // update streaming segment to compacted status http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 5762906..6d93b34 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -43,6 +43,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.rdd.StreamHandoffRDD http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala index 00a5139..73c07b4 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField} +import org.apache.carbondata.core.metadata.datatype.{ArrayType => CarbonArrayType, DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, MapType => CarbonMapType, StructField => CarbonStructField, StructType => CarbonStructType} import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} @@ -39,6 +39,20 @@ object CarbonSparkDataSourceUtil { DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision, dataType.asInstanceOf[CarbonDecimalType].getScale) } else { + if (CarbonDataTypes.isStructType(dataType)) { + val struct = dataType.asInstanceOf[CarbonStructType] + return StructType(struct.getFields.asScala.map(x => + StructField(x.getFieldName, convertCarbonToSparkDataType(x.getDataType))) + ) + } else if (CarbonDataTypes.isArrayType(dataType)) { + val array = dataType.asInstanceOf[CarbonArrayType] + return ArrayType(convertCarbonToSparkDataType(array.getElementType)) + } else if (CarbonDataTypes.isMapType(dataType)) { + val map = dataType.asInstanceOf[CarbonMapType] + return MapType( + convertCarbonToSparkDataType(map.getKeyType), + convertCarbonToSparkDataType(map.getValueType)) + } dataType match { case CarbonDataTypes.STRING => StringType case CarbonDataTypes.SHORT => ShortType http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 5af4fbe..8c8fd28 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -109,6 +109,12 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>${spark.deps.scope}</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala index a0c0545..b2687d0 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala @@ -107,4 +107,14 @@ object CarbonSparkUtil { }) fields.mkString(",") } + + /** + * add escape prefix for delimiter + */ + def delimiterConverter4Udf(delimiter: String): String = delimiter match { + case "|" | "*" | "." | ":" | "^" | "\\" | "$" | "+" | "?" | "(" | ")" | "{" | "}" | "[" | "]" => + "\\\\" + delimiter + case _ => + delimiter + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala index f0d7bf5..23323d4 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala @@ -21,7 +21,7 @@ import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{CarbonEnv, DataFrame, SparkSession} import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.types.{StructField, StructType} @@ -29,7 +29,9 @@ import org.apache.spark.sql.types.{StructField, StructType} import org.apache.carbondata.common.exceptions.NoSuchStreamException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.spark.StreamingOption import org.apache.carbondata.streaming.CarbonStreamException @@ -46,25 +48,29 @@ object StreamJobManager { private def validateSourceTable(source: CarbonTable): Unit = { if (!source.isStreamingSource) { - throw new MalformedCarbonCommandException(s"Table ${source.getTableName} is not " + - "streaming source table " + - "('streaming' tblproperty is not 'source')") + throw new MalformedCarbonCommandException( + s"Table ${source.getTableName} is not streaming source table " + + "('streaming' tblproperty is not 'source')") } } - private def validateSinkTable(querySchema: StructType, sink: CarbonTable): Unit = { + private def validateSinkTable(validateQuerySchema: Boolean, + querySchema: StructType, sink: CarbonTable): Unit = { if (!sink.isStreamingSink) { - throw new MalformedCarbonCommandException(s"Table ${sink.getTableName} is not " + - "streaming sink table " + - "('streaming' tblproperty is not 'sink' or 'true')") + throw new MalformedCarbonCommandException( + s"Table ${sink.getTableName} is not streaming sink table " + + "('streaming' tblproperty is not 'sink' or 'true')") } - val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => - StructField(column.getColName, - CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType)) - } - if (!querySchema.equals(StructType(fields))) { - throw new MalformedCarbonCommandException(s"Schema of table ${sink.getTableName} " + - s"does not match query output") + if (validateQuerySchema) { + val fields = sink.getCreateOrderColumn(sink.getTableName).asScala.map { column => + StructField( + column.getColName, + CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(column.getDataType)) + } + if (!querySchema.equals(StructType(fields))) { + throw new MalformedCarbonCommandException( + s"Schema of table ${sink.getTableName} does not match query output") + } } } @@ -102,7 +108,12 @@ object StreamJobManager { } validateSourceTable(sourceTable) - validateSinkTable(streamDf.schema, sinkTable) + + // for kafka and socket stream source, the source table schema is one string column only + // so we do not validate the query schema against the sink table schema + val isKafka = Option(sourceTable.getFormat).contains("kafka") + val isSocket = Option(sourceTable.getFormat).contains("socket") + validateSinkTable(!(isKafka || isSocket), streamDf.schema, sinkTable) // start a new thread to run the streaming ingest job, the job will be running // until user stops it by STOP STREAM JOB http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index a13dfdc..8b6dabd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -330,10 +330,13 @@ case class CarbonAlterTableCompactionCommand( ): Unit = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - // 1. acquire lock of streaming.lock + // 1. delete the lock of streaming.lock, forcing the stream to be closed val streamingLock = CarbonLockFactory.getCarbonLockObj( carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier, LockUsage.STREAMING_LOCK) + if (!FileFactory.getCarbonFile(streamingLock.getLockFilePath).delete()) { + LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath) + } try { if (streamingLock.lockWithRetries()) { // 2. convert segment status from "streaming" to "streaming finish" http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index 3d57a99..3f68cc4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -37,8 +37,8 @@ case class CarbonShowLoadsCommand( if (showHistory) { Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(), AttributeReference("Status", StringType, nullable = false)(), - AttributeReference("Load Start Time", TimestampType, nullable = false)(), - AttributeReference("Load End Time", TimestampType, nullable = true)(), + AttributeReference("Load Start Time", StringType, nullable = false)(), + AttributeReference("Load End Time", StringType, nullable = true)(), AttributeReference("Merged To", StringType, nullable = false)(), AttributeReference("File Format", StringType, nullable = false)(), AttributeReference("Visibility", StringType, nullable = false)(), @@ -47,8 +47,8 @@ case class CarbonShowLoadsCommand( } else { Seq(AttributeReference("SegmentSequenceId", StringType, nullable = false)(), AttributeReference("Status", StringType, nullable = false)(), - AttributeReference("Load Start Time", TimestampType, nullable = false)(), - AttributeReference("Load End Time", TimestampType, nullable = true)(), + AttributeReference("Load Start Time", StringType, nullable = false)(), + AttributeReference("Load End Time", StringType, nullable = true)(), AttributeReference("Merged To", StringType, nullable = false)(), AttributeReference("File Format", StringType, nullable = false)(), AttributeReference("Data Size", StringType, nullable = false)(), @@ -64,7 +64,7 @@ case class CarbonShowLoadsCommand( } CarbonStore.showSegments( limit, - carbonTable.getMetadataPath, + carbonTable.getTablePath, showHistory ) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala index 1e1ab44..94e063b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala @@ -17,11 +17,14 @@ package org.apache.spark.sql.execution.command.stream +import java.util + import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, ExprId, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.execution.command.DataCommand import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.streaming.StreamingRelation @@ -29,8 +32,9 @@ import org.apache.spark.sql.types.{StringType, StructType} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.spark.StreamingOption -import org.apache.carbondata.spark.util.Util +import org.apache.carbondata.spark.util.{CarbonSparkUtil, Util} import org.apache.carbondata.stream.StreamJobManager /** @@ -51,29 +55,57 @@ case class CarbonCreateStreamCommand( AttributeReference("Status", StringType, nullable = false)()) override def processData(sparkSession: SparkSession): Seq[Row] = { - val df = sparkSession.sql(query) - var sourceTable: CarbonTable = null - - // find the streaming source table in the query - // and replace it with StreamingRelation - val streamLp = df.logicalPlan transform { + val inputQuery = sparkSession.sql(query) + val sourceTableSeq = inputQuery.logicalPlan collect { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => - val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) - if (sourceTable != null && sourceTable.getTableName != source.getTableName) { - throw new MalformedCarbonCommandException( - "Stream query on more than one stream source table is not supported") - } - sourceTable = source - streamingRelation - case plan: LogicalPlan => plan + r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + } + if (sourceTableSeq.isEmpty) { + throw new MalformedCarbonCommandException( + "Must specify stream source table in the stream query") + } + if (sourceTableSeq.size > 1) { + throw new MalformedCarbonCommandException( + "Stream query on more than one stream source table is not supported") + } + val sourceTable = sourceTableSeq.head + + val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties + val format = sourceTable.getFormat + if (format == null) { + throw new MalformedCarbonCommandException("Streaming from carbon file is not supported") + } + val updatedQuery = if (format.equals("kafka")) { + shouldHaveProperty(tblProperty, "kafka.bootstrap.servers", sourceTable) + shouldHaveProperty(tblProperty, "subscribe", sourceTable) + createPlan(sparkSession, inputQuery, sourceTable, "kafka", tblProperty) + } else if (format.equals("socket")) { + shouldHaveProperty(tblProperty, "host", sourceTable) + shouldHaveProperty(tblProperty, "port", sourceTable) + createPlan(sparkSession, inputQuery, sourceTable, "socket", tblProperty) + } else { + // Replace the logical relation with a streaming relation created + // from the stream source table + inputQuery.logicalPlan transform { + case r: LogicalRelation + if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource + => prepareStreamingRelation(sparkSession, r) + case plan: LogicalPlan => plan + } } if (sourceTable == null) { throw new MalformedCarbonCommandException("Must specify stream source table in the query") } + // add CSV row parser if user does not specify + val newMap = mutable.Map[String, String]() + optionMap.foreach(x => newMap(x._1) = x._2) + newMap(CSVInputFormat.DELIMITER) = tblProperty.asScala.getOrElse("delimiter", ",") + // start the streaming job val jobId = StreamJobManager.startStream( sparkSession = sparkSession, @@ -82,33 +114,140 @@ case class CarbonCreateStreamCommand( sourceTable = sourceTable, sinkTable = CarbonEnv.getCarbonTable(sinkDbName, sinkTableName)(sparkSession), query = query, - streamDf = Dataset.ofRows(sparkSession, streamLp), - options = new StreamingOption(optionMap) + streamDf = Dataset.ofRows(sparkSession, updatedQuery), + options = new StreamingOption(newMap.toMap) ) Seq(Row(streamName, jobId, "RUNNING")) } + /** + * Create a new plan for the stream query on kafka and Socket source table. + * This is required because we need to convert the schema of the data stored in kafka + * The returned logical plan contains the complete plan tree of original plan with + * logical relation replaced with a streaming relation. + * + * @param sparkSession spark session + * @param inputQuery stream query from user + * @param sourceTable source table (kafka table) + * @param sourceName source name, kafka or socket + * @param tblProperty table property of source table + * @return a new logical plan + */ + private def createPlan( + sparkSession: SparkSession, + inputQuery: DataFrame, + sourceTable: CarbonTable, + sourceName: String, + tblProperty: util.Map[String, String]): LogicalPlan = { + // We follow 3 steps to generate new plan + // 1. replace the logical relation in stream query with streaming relation + // 2. collect the new ExprId generated + // 3. update the stream query plan with the new ExprId generated, to make the plan consistent + + // exprList is used for UDF to extract the data from the 'value' column in kafka + val columnNames = Util.convertToSparkSchema(sourceTable).fieldNames + val exprList = columnNames.zipWithIndex.map { + case (columnName, i) => + s"case when size(_values) > $i then _values[$i] else null end AS $columnName" + } + + val delimiter = tblProperty.asScala.getOrElse("delimiter", ",") + val aliasMap = new util.HashMap[String, ExprId]() + val updatedQuery = inputQuery.logicalPlan transform { + case r: LogicalRelation + if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => + // for kafka stream source, get the 'value' column and split it by using UDF + val kafkaPlan = sparkSession.readStream + .format(sourceName) + .options(tblProperty) + .load() + .selectExpr("CAST(value as string) as _value") + .selectExpr( + s"split(_value, '${CarbonSparkUtil.delimiterConverter4Udf(delimiter)}') as _values") + .selectExpr(exprList: _*) + .logicalPlan + + // collect the newly generated ExprId + kafkaPlan collect { + case p@Project(projectList, child) => + projectList.map { expr => + aliasMap.put(expr.name, expr.exprId) + } + p + } + kafkaPlan + case plan: LogicalPlan => plan + } + + // transform the stream plan to replace all attribute with the collected ExprId + val transFormedPlan = updatedQuery transform { + case p@Project(projectList: Seq[NamedExpression], child) => + val newProjectList = projectList.map { expr => + val newExpr = expr transform { + case attribute: Attribute => + val exprId: ExprId = aliasMap.get(attribute.name) + if (exprId != null) { + if (exprId.id != attribute.exprId.id) { + AttributeReference( + attribute.name, attribute.dataType, attribute.nullable, + attribute.metadata)(exprId, attribute.qualifier) + } else { + attribute + } + } else { + attribute + } + } + newExpr.asInstanceOf[NamedExpression] + } + Project(newProjectList, child) + case f@Filter(condition: Expression, child) => + val newCondition = condition transform { + case attribute: Attribute => + val exprId: ExprId = aliasMap.get(attribute.name) + if (exprId != null) { + if (exprId.id != attribute.exprId.id) { + AttributeReference( + attribute.name, attribute.dataType, attribute.nullable, + attribute.metadata)(exprId, attribute.qualifier) + } else { + attribute + } + } else { + attribute + } + } + Filter(newCondition, child) + } + transFormedPlan + } + + /** + * Create a streaming relation from the input logical relation (source table) + * + * @param sparkSession spark session + * @param logicalRelation source table to convert + * @return sourceTable and its streaming relation + */ private def prepareStreamingRelation( sparkSession: SparkSession, - r: LogicalRelation): (CarbonTable, StreamingRelation) = { - val sourceTable = r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + logicalRelation: LogicalRelation): StreamingRelation = { + val sourceTable = logicalRelation.relation + .asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable val tblProperty = sourceTable.getTableInfo.getFactTable.getTableProperties - val format = tblProperty.get("format") + val format = sourceTable.getFormat if (format == null) { throw new MalformedCarbonCommandException("Streaming from carbon file is not supported") } - val streamReader = sparkSession.readStream + val streamReader = sparkSession + .readStream .schema(getSparkSchema(sourceTable)) .format(format) - val dataFrame = format match { + val dataFrame: DataFrame = format match { case "csv" | "text" | "json" | "parquet" => - if (!tblProperty.containsKey("path")) { - throw new MalformedCarbonCommandException( - s"'path' tblproperty should be provided for '$format' format") - } + shouldHaveProperty(tblProperty, "path", sourceTable) streamReader.load(tblProperty.get("path")) - case "kafka" | "socket" => - streamReader.load() case other => throw new MalformedCarbonCommandException(s"Streaming from $format is not supported") } @@ -116,8 +255,18 @@ case class CarbonCreateStreamCommand( // Since SparkSQL analyzer will match the UUID in attribute, // create a new StreamRelation and re-use the same attribute from LogicalRelation - (sourceTable, - StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, r.output)) + StreamingRelation(streamRelation.dataSource, streamRelation.sourceName, logicalRelation.output) + } + + private def shouldHaveProperty( + tblProperty: java.util.Map[String, String], + propertyName: String, + sourceTable: CarbonTable) : Unit = { + if (!tblProperty.containsKey(propertyName)) { + throw new MalformedCarbonCommandException( + s"tblproperty '$propertyName' should be provided for stream source " + + s"${sourceTable.getDatabaseName}.${sourceTable.getTableName}") + } } private def getSparkSchema(sourceTable: CarbonTable): StructType = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 31c9597..baf4664 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -1770,28 +1770,18 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { |CREATE STREAM stream123 ON TABLE sink |STMPROPERTIES( | 'trigger'='ProcessingTime', - | 'interval'='1 seconds') - |AS - | SELECT * - | FROM source - | WHERE id % 2 = 1 - """.stripMargin).show(false) - sql( - """ - |CREATE STREAM IF NOT EXISTS stream123 ON TABLE sink - |STMPROPERTIES( - | 'trigger'='ProcessingTime', - | 'interval'='1 seconds') + | 'interval'='5 seconds') |AS | SELECT * | FROM source | WHERE id % 2 = 1 """.stripMargin).show(false) + Thread.sleep(200) sql("select * from sink").show generateCSVDataFile(spark, idStart = 30, rowNums = 10, csvDataDir, SaveMode.Append) - Thread.sleep(5000) + Thread.sleep(7000) // after 2 minibatch, there should be 10 row added (filter condition: id%2=1) checkAnswer(sql("select count(*) from sink"), Seq(Row(10))) @@ -2098,6 +2088,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } test("StreamSQL: start stream on non-stream table") { + sql("DROP TABLE IF EXISTS source") + sql("DROP TABLE IF EXISTS sink") + sql( s""" |CREATE TABLE notsource( @@ -2143,7 +2136,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { | WHERE id % 2 = 1 """.stripMargin).show(false) } - assert(ex.getMessage.contains("Must specify stream source table in the query")) + assert(ex.getMessage.contains("Must specify stream source table in the stream query")) sql("DROP TABLE sink") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d55c21a..a6679fc 100644 --- a/pom.xml +++ b/pom.xml @@ -218,6 +218,12 @@ <scope>${spark.deps.scope}</scope> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>${spark.deps.scope}</scope> + </dependency> + <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>${scala.version}</version> http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java index 6f7c398..00d8420 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/RowParserImpl.java @@ -78,6 +78,9 @@ public class RowParserImpl implements RowParser { @Override public Object[] parseRow(Object[] row) { + if (row == null) { + return new String[numberOfColumns]; + } // If number of columns are less in a row then create new array with same size of header. if (row.length < numberOfColumns) { String[] temp = new String[numberOfColumns]; http://git-wip-us.apache.org/repos/asf/carbondata/blob/560bfbe7/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 89f00c9..744915d 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -419,7 +419,7 @@ public class StreamSegment { * @return the list of BlockIndex in the index file * @throws IOException */ - private static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType) + public static List<BlockIndex> readIndexFile(String indexPath, FileFactory.FileType fileType) throws IOException { List<BlockIndex> blockIndexList = new ArrayList<>(); CarbonFile index = FileFactory.getCarbonFile(indexPath, fileType);
