Repository: carbondata Updated Branches: refs/heads/master f2fb06806 -> 9db662a2d
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala deleted file mode 100644 index 2bc3eaf..0000000 --- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/lucene/LuceneDataMapRefreshRDD.scala +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.datamap.lucene - -import java.text.SimpleDateFormat -import java.util - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.spark.{CarbonInputMetrics, Partition, SparkContext, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.Segment -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} -import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableInfo} -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.TaskMetricsMap -import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader} -import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat} -import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport -import org.apache.carbondata.spark.{RefreshResult, RefreshResultImpl} -import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, CarbonSparkPartition} -import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl - -object LuceneDataMapRefreshRDD { - - def refreshDataMap( - sparkSession: SparkSession, - carbonTable: CarbonTable, - schema: DataMapSchema - ): Unit = { - val tableIdentifier = carbonTable.getAbsoluteTableIdentifier - val segmentStatusManager = new SegmentStatusManager(tableIdentifier) - val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments() - val validSegments = validAndInvalidSegments.getValidSegments() - val indexedCarbonColumns = - LuceneDataMapFactoryBase.validateAndGetIndexedColumns(schema, carbonTable) - - // loop all segments to rebuild DataMap - val tableInfo = carbonTable.getTableInfo() - validSegments.asScala.foreach { segment => - val dataMapStorePath = LuceneDataMapWriter.genDataMapStorePath( - tableIdentifier.getTablePath(), - segment.getSegmentNo(), - schema.getDataMapName) - // if lucene datamap folder is exists, not require to build lucene datamap again - refreshOneSegment(sparkSession, tableInfo, dataMapStorePath, schema.getDataMapName, - indexedCarbonColumns, segment.getSegmentNo()); - } - } - - def refreshOneSegment( - sparkSession: SparkSession, - tableInfo: TableInfo, - dataMapStorePath: String, - dataMapName: String, - indexColumns: java.util.List[String], - segmentId: String): Unit = { - - if (!FileFactory.isFileExist(dataMapStorePath)) { - if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) { - try { - val status = new LuceneDataMapRefreshRDD[String, Boolean]( - sparkSession.sparkContext, - new RefreshResultImpl(), - tableInfo, - dataMapStorePath, - dataMapName, - indexColumns.asScala.toArray, - segmentId - ).collect() - - status.find(_._2 == false).foreach { task => - throw new Exception( - s"Task Failed to refresh datamap $dataMapName on segment_$segmentId") - } - } catch { - case ex => - // process failure - FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath)) - throw new Exception( - s"Failed to refresh datamap $dataMapName on segment_$segmentId", ex) - } - } - } - } - -} - -class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] { - override def initialize(carbonColumns: Array[CarbonColumn], - carbonTable: CarbonTable): Unit = { - } - - override def readRow(data: Array[Object]): Array[Object] = { - for (i <- 0 until dataTypes.length) { - if (dataTypes(i) == DataTypes.STRING) { - data(i) = data(i).toString - } - } - data - } - - override def close(): Unit = { - } -} - -class LuceneDataMapRefreshRDD[K, V]( - sc: SparkContext, - result: RefreshResult[K, V], - @transient tableInfo: TableInfo, - dataMapStorePath: String, - dataMapName: String, - indexColumns: Array[String], - segmentId: String -) extends CarbonRDDWithTableInfo[(K, V)](sc, Nil, tableInfo.serialize()) { - - private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") - - private val jobTrackerId: String = { - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - formatter.format(new util.Date()) - } - - override def internalCompute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - var status = false - val inputMetrics = new CarbonInputMetrics - TaskMetricsMap.getInstance().registerThreadCallback() - val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value - inputMetrics.initBytesReadCallback(context, inputSplit) - - val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) - val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) - val format = createInputFormat(attemptContext) - - val taskName = CarbonTablePath.getUniqueTaskName(inputSplit.getAllSplits.get(0).getBlockPath) - - val tableInfo = getTableInfo - val identifier = tableInfo.getOrCreateAbsoluteTableIdentifier() - - val columns = tableInfo.getFactTable.getListOfColumns.asScala - val dataTypes = indexColumns.map { columnName => - columns.find(_.getColumnName.equals(columnName)).get.getDataType - } - - val indexPath = LuceneDataMapWriter.genDataMapStorePathOnTaskId(identifier.getTablePath, - segmentId, dataMapName, taskName) - - val model = format.createQueryModel(inputSplit, attemptContext) - // one query id per table - model.setQueryId(queryId) - model.setVectorReader(false) - model.setForcedDetailRawQuery(false) - model.setRequiredRowId(true) - var reader: CarbonRecordReader[Array[Object]] = null - var indexBuilder: LuceneIndexRefreshBuilder = null - try { - reader = new CarbonRecordReader(model, new OriginalReadSupport(dataTypes), inputMetrics) - reader.initialize(inputSplit, attemptContext) - - indexBuilder = new LuceneIndexRefreshBuilder(indexPath, indexColumns, dataTypes) - indexBuilder.initialize() - - while (reader.nextKeyValue()) { - indexBuilder.addDocument(reader.getCurrentValue) - } - - indexBuilder.finish() - - status = true - } finally { - if (reader != null) { - try { - reader.close() - } catch { - case ex => - LOGGER.error(ex, "Failed to close reader") - } - } - - if (indexBuilder != null) { - try { - indexBuilder.close() - } catch { - case ex => - LOGGER.error(ex, "Failed to close index writer") - } - } - } - - new Iterator[(K, V)] { - - var finished = false - - override def hasNext: Boolean = { - !finished - } - - override def next(): (K, V) = { - finished = true - result.getKey(split.index.toString, status) - } - } - } - - - private def createInputFormat( - attemptContext: TaskAttemptContextImpl) = { - val format = new CarbonTableInputFormat[Object] - val tableInfo1 = getTableInfo - val conf = attemptContext.getConfiguration - CarbonInputFormat.setTableInfo(conf, tableInfo1) - CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName) - CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName) - CarbonInputFormat.setDataTypeConverter(conf, classOf[SparkDataTypeConverterImpl]) - - val identifier = tableInfo1.getOrCreateAbsoluteTableIdentifier() - CarbonInputFormat.setTablePath( - conf, - identifier.appendWithLocalPrefix(identifier.getTablePath)) - - CarbonInputFormat.setSegmentsToAccess( - conf, - Segment.toSegmentList(Array(segmentId), null)) - - CarbonInputFormat.setColumnProjection( - conf, - new CarbonProjection(indexColumns)) - format - } - - override protected def getPartitions = { - val conf = new Configuration() - val jobConf = new JobConf(conf) - SparkHadoopUtil.get.addCredentials(jobConf) - val job = Job.getInstance(jobConf) - job.getConfiguration.set("query.id", queryId) - - val format = new CarbonTableInputFormat[Object] - - CarbonInputFormat.setSegmentsToAccess( - job.getConfiguration, - Segment.toSegmentList(Array(segmentId), null)) - - CarbonInputFormat.setTableInfo( - job.getConfiguration, - tableInfo) - CarbonInputFormat.setTablePath( - job.getConfiguration, - tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath) - CarbonInputFormat.setDatabaseName( - job.getConfiguration, - tableInfo.getDatabaseName) - CarbonInputFormat.setTableName( - job.getConfiguration, - tableInfo.getFactTable.getTableName) - - format - .getSplits(job) - .asScala - .map(_.asInstanceOf[CarbonInputSplit]) - .groupBy(_.taskId) - .map { group => - new CarbonMultiBlockSplit( - group._2.asJava, - group._2.flatMap(_.getLocations).toArray) - } - .zipWithIndex - .map { split => - new CarbonSparkPartition(id, split._2, split._1) - } - .toArray - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 3fe2c00..34a4013 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command.datamap import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier @@ -24,11 +25,11 @@ import org.apache.spark.sql.execution.command._ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager, IndexDataMapProvider} +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} -import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider} /** * Below command class will be used to create datamap on table @@ -37,7 +38,7 @@ import org.apache.carbondata.datamap.DataMapManager case class CarbonCreateDataMapCommand( dataMapName: String, tableIdentifier: Option[TableIdentifier], - dmClassName: String, + dmProviderName: String, dmProperties: Map[String, String], queryString: Option[String], ifNotExistsSet: Boolean = false) @@ -68,41 +69,39 @@ case class CarbonCreateDataMapCommand( } } - dataMapSchema = new DataMapSchema(dataMapName, dmClassName) - // TODO: move this if logic inside lucene module - if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getShortName) || - dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.getClassName)) { - val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala - if (datamaps.nonEmpty) { - datamaps.foreach(datamap => { - val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns").trim - .toLowerCase.split(",").toSet - val existingColumns = dmProperties("text_columns").trim.toLowerCase().split(",").toSet - val duplicateDMColumn = dmColumns.intersect(existingColumns) - if (duplicateDMColumn.nonEmpty) { - throw new MalformedDataMapCommandException( - s"Create lucene datamap $dataMapName failed, datamap already exists on column(s) " + - s"$duplicateDMColumn") - } - }) - } - } if (mainTable != null && mainTable.isStreamingTable && - !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString) - || dataMapSchema.getProviderName - .equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString))) { - throw new MalformedCarbonCommandException(s"Streaming table does not support creating ${ - dataMapSchema.getProviderName - } datamap") + !(dmProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString) + || dmProviderName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.toString))) { + throw new MalformedCarbonCommandException(s"Streaming table does not support creating " + + s"$dmProviderName datamap") } + + dataMapSchema = new DataMapSchema(dataMapName, dmProviderName) dataMapSchema.setProperties(new java.util.HashMap[String, String]( dmProperties.map(x => (x._1.trim, x._2.trim)).asJava)) - dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema, sparkSession) - dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull) - // TODO Currently this feature is only available for index datamaps - if (dataMapProvider.isInstanceOf[IndexDataMapProvider]) { - DataMapStatusManager.disableDataMap(dataMapName) + + dataMapProvider = DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) + + // If it is index datamap, check whether the column has datamap created already + dataMapProvider match { + case provider: IndexDataMapProvider => + val datamaps = DataMapStoreManager.getInstance.getAllDataMap(mainTable).asScala + val existingIndexColumn = mutable.Set[String]() + datamaps.foreach { datamap => + datamap.getDataMapSchema.getIndexColumns.foreach(existingIndexColumn.add) + } + + provider.getIndexedColumns.asScala.foreach { column => + if (existingIndexColumn.contains(column.getColName)) { + throw new MalformedDataMapCommandException(String.format( + "column '%s' already has datamap created", column.getColName)) + } + } + dataMapProvider.initMeta(queryString.orNull) + DataMapStatusManager.disableDataMap(dataMapName) + case _ => + dataMapProvider.initMeta(queryString.orNull) } val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) LOGGER.audit(s"DataMap $dataMapName successfully added") @@ -111,11 +110,11 @@ case class CarbonCreateDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { if (dataMapProvider != null) { - dataMapProvider.initData(mainTable) - if (mainTable != null && mainTable.isAutoRefreshDataMap) { - if (!DataMapClassProvider.LUCENE.getShortName.equals(dataMapSchema.getProviderName)) { - dataMapProvider.rebuild(mainTable, dataMapSchema) - } + dataMapProvider.initData() + if (mainTable != null && + mainTable.isAutoRefreshDataMap && + !dataMapSchema.isIndexDataMap) { + dataMapProvider.rebuild() } } Seq.empty @@ -123,7 +122,7 @@ case class CarbonCreateDataMapCommand( override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { if (dataMapProvider != null) { - dataMapProvider.freeMeta(mainTable, dataMapSchema) + dataMapProvider.cleanMeta() } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala index 1c39e85..4f3b7bc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRefreshCommand.scala @@ -21,11 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.DataCommand -import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager -import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider -import org.apache.carbondata.datamap.DataMapManager -import org.apache.carbondata.datamap.lucene.LuceneDataMapRefreshRDD +import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRefreshRDD} /** * Refresh the datamaps through sync with main table data. After sync with parent table's it enables @@ -37,6 +35,7 @@ case class CarbonDataMapRefreshCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val schema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) + val table = tableIdentifier match { case Some(identifier) => CarbonEnv.getCarbonTable(identifier)(sparkSession) @@ -46,13 +45,9 @@ case class CarbonDataMapRefreshCommand( schema.getRelationIdentifier.getTableName )(sparkSession) } - // Sync the datamap with parent table - if (DataMapClassProvider.LUCENE.getShortName.endsWith(schema.getProviderName)) { - LuceneDataMapRefreshRDD.refreshDataMap(sparkSession, table, schema) - } else { - val provider = DataMapManager.get().getDataMapProvider(schema, sparkSession) - provider.rebuild(table, schema) - } + val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession) + provider.rebuild() + // After sync success enable the datamap. DataMapStatusManager.enableDataMap(dataMapName) Seq.empty http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 19e6500..cf5a4ae 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -140,9 +140,10 @@ case class CarbonDropDataMapCommand( dbName, tableName))(sparkSession) if (dataMapProvider == null) { - dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession) + dataMapProvider = + DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) } - dataMapProvider.freeMeta(mainTable, dataMapSchema) + dataMapProvider.cleanMeta() // fires the event after dropping datamap from main table schema val dropDataMapPostEvent = @@ -181,15 +182,16 @@ case class CarbonDropDataMapCommand( Seq.empty } - private def dropDataMapFromSystemFolder(sparkSession: SparkSession) = { + private def dropDataMapFromSystemFolder(sparkSession: SparkSession): Unit = { try { if (dataMapSchema == null) { dataMapSchema = DataMapStoreManager.getInstance().getDataMapSchema(dataMapName) } if (dataMapSchema != null) { - dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema, sparkSession) + dataMapProvider = + DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession) DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName) - dataMapProvider.freeMeta(mainTable, dataMapSchema) + dataMapProvider.cleanMeta() } } catch { case e: Exception => @@ -202,7 +204,7 @@ case class CarbonDropDataMapCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { // delete the table folder if (dataMapProvider != null) { - dataMapProvider.freeData(mainTable, dataMapSchema) + dataMapProvider.cleanData() } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 540fdd2..62da7ed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -147,7 +147,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { /** * The syntax of datamap creation is as follows. - * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapClassName' + * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapProviderName' * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName */ protected lazy val createDataMap: Parser[LogicalPlan] = @@ -155,10 +155,11 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { opt(ontable) ~ (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~ (AS ~> restInput).? <~ opt(";") ^^ { - case ifnotexists ~ dmname ~ tableIdent ~ className ~ dmprops ~ query => + case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ dmprops ~ query => val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String] - CarbonCreateDataMapCommand(dmname, tableIdent, className, map, query, ifnotexists.isDefined) + CarbonCreateDataMapCommand(dmname, tableIdent, dmProviderName, map, query, + ifnotexists.isDefined) } protected lazy val ontable: Parser[TableIdentifier] = http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala index 33de06f..a8e7b6c 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapSuite.scala @@ -25,6 +25,8 @@ import scala.util.Random import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.util.CarbonProperties + class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { val inputFile = s"$resourcesPath/bloom_datamap_input.csv" val normalTable = "carbon_normal" @@ -33,12 +35,38 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { val lineNum = 500000 override protected def beforeAll(): Unit = { + new File(CarbonProperties.getInstance().getSystemFolderLocation).delete() createFile(inputFile, line = lineNum, start = 0) sql(s"DROP TABLE IF EXISTS $normalTable") sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } - test("test bloom datamap") { + private def checkQuery = { + checkAnswer( + sql(s"select * from $bloomDMSampleTable where id = 1"), + sql(s"select * from $normalTable where id = 1")) + checkAnswer( + sql(s"select * from $bloomDMSampleTable where id = 999"), + sql(s"select * from $normalTable where id = 999")) + checkAnswer( + sql(s"select * from $bloomDMSampleTable where city = 'city_1'"), + sql(s"select * from $normalTable where city = 'city_1'")) + checkAnswer( + sql(s"select * from $bloomDMSampleTable where city = 'city_999'"), + sql(s"select * from $normalTable where city = 'city_999'")) + checkAnswer( + sql(s"select count(distinct id), count(distinct name), count(distinct city)," + + s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"), + sql(s"select count(distinct id), count(distinct name), count(distinct city)," + + s" count(distinct s1), count(distinct s2) from $normalTable")) + checkAnswer( + sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + + s" from $bloomDMSampleTable"), + sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + + s" from $normalTable")) + } + + test("test create bloom datamap on table with existing data") { sql( s""" | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, @@ -54,44 +82,72 @@ class BloomCoarseGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { sql( s""" | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable - | USING '${classOf[BloomCoarseGrainDataMapFactory].getName}' - | DMProperties('BLOOM_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') """.stripMargin) + // load two segments + (1 to 2).foreach { i => + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + } + + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkQuery + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") + } + + test("test create bloom datamap and refresh datamap") { sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable - | OPTIONS('header'='false') - """.stripMargin) + | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) sql( s""" - | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable - | OPTIONS('header'='false') - """.stripMargin) - - sql(s"show datamap on table $bloomDMSampleTable").show(false) - sql(s"select * from $bloomDMSampleTable where city = 'city_5'").show(false) - sql(s"select * from $bloomDMSampleTable limit 5").show(false) - - checkExistence(sql(s"show datamap on table $bloomDMSampleTable"), true, dataMapName) -// checkAnswer(sql(s"show datamap on table $bloomDMSampleTable"), -// Row(dataMapName, classOf[BloomCoarseGrainDataMapFactory].getName, "(NA)")) - checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 1"), - sql(s"select * from $normalTable where id = 1")) - checkAnswer(sql(s"select * from $bloomDMSampleTable where id = 999"), - sql(s"select * from $normalTable where id = 999")) - checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_1'"), - sql(s"select * from $normalTable where city = 'city_1'")) - checkAnswer(sql(s"select * from $bloomDMSampleTable where city = 'city_999'"), - sql(s"select * from $normalTable where city = 'city_999'")) - checkAnswer(sql(s"select count(distinct id), count(distinct name), count(distinct city)," + - s" count(distinct s1), count(distinct s2) from $bloomDMSampleTable"), - sql(s"select count(distinct id), count(distinct name), count(distinct city)," + - s" count(distinct s1), count(distinct s2) from $normalTable")) - checkAnswer(sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + - s" from $bloomDMSampleTable"), - sql(s"select min(id), max(id), min(name), max(name), min(city), max(city)" + - s" from $normalTable")) + | CREATE TABLE $bloomDMSampleTable(id INT, name STRING, city STRING, age INT, + | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 STRING, s8 STRING) + | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128') + | """.stripMargin) + + // load two segments + (1 to 2).foreach { i => + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $normalTable + | OPTIONS('header'='false') + """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $bloomDMSampleTable + | OPTIONS('header'='false') + """.stripMargin) + } + + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $bloomDMSampleTable + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='city,id', 'BLOOM_SIZE'='640000') + """.stripMargin) + + sql(s"REFRESH DATAMAP $dataMapName ON TABLE $bloomDMSampleTable") + sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable").show(false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $bloomDMSampleTable"), true, dataMapName) + checkQuery + sql(s"DROP TABLE IF EXISTS $normalTable") + sql(s"DROP TABLE IF EXISTS $bloomDMSampleTable") } // todo: will add more tests on bloom datamap, such as exception, delete datamap, show profiler http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java new file mode 100644 index 0000000..6d062f1 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.datamap; + +public class DataMapWriterException extends RuntimeException { + public DataMapWriterException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index a9e30d3..9c3d5d6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.page.ColumnPage; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.processing.store.TablePage; /** @@ -44,14 +45,13 @@ public class DataMapWriterListener { private static final LogService LOG = LogServiceFactory.getLogService( DataMapWriterListener.class.getCanonicalName()); - // list indexed column name -> list of data map writer - private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>(); + // list indexed column -> list of data map writer + private Map<List<CarbonColumn>, List<DataMapWriter>> registry = new ConcurrentHashMap<>(); /** * register all datamap writer for specified table and segment */ - public void registerAllWriter(CarbonTable carbonTable, String segmentId, - String dataWritePath) { + public void registerAllWriter(CarbonTable carbonTable, String segmentId, String taskNo) { List<TableDataMap> tableIndices; try { tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable); @@ -62,7 +62,7 @@ public class DataMapWriterListener { if (tableIndices != null) { for (TableDataMap tableDataMap : tableIndices) { DataMapFactory factory = tableDataMap.getDataMapFactory(); - register(factory, segmentId, dataWritePath); + register(factory, segmentId, taskNo); } } } @@ -70,7 +70,7 @@ public class DataMapWriterListener { /** * Register a DataMapWriter */ - private void register(DataMapFactory factory, String segmentId, String dataWritePath) { + private void register(DataMapFactory factory, String segmentId, String taskNo) { assert (factory != null); assert (segmentId != null); DataMapMeta meta = factory.getMeta(); @@ -78,9 +78,15 @@ public class DataMapWriterListener { // if data map does not have meta, no need to register return; } - List<String> columns = factory.getMeta().getIndexedColumns(); + List<CarbonColumn> columns = factory.getMeta().getIndexedColumns(); List<DataMapWriter> writers = registry.get(columns); - DataMapWriter writer = factory.createWriter(new Segment(segmentId, null, null), dataWritePath); + DataMapWriter writer = null; + try { + writer = factory.createWriter(new Segment(segmentId), taskNo); + } catch (IOException e) { + LOG.error("Failed to create DataMapWriter: " + e.getMessage()); + throw new DataMapWriterException(e); + } if (writers != null) { writers.add(writer); } else { @@ -91,10 +97,10 @@ public class DataMapWriterListener { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId, String blockPath, String taskName) throws IOException { + public void onBlockStart(String blockId) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId, taskName); + writer.onBlockStart(blockId); } } } @@ -107,7 +113,7 @@ public class DataMapWriterListener { } } - public void onBlockletStart(int blockletId) { + public void onBlockletStart(int blockletId) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { writer.onBlockletStart(blockletId); @@ -115,7 +121,7 @@ public class DataMapWriterListener { } } - public void onBlockletEnd(int blockletId) { + public void onBlockletEnd(int blockletId) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { writer.onBlockletEnd(blockletId); @@ -130,16 +136,18 @@ public class DataMapWriterListener { * @param tablePage page data */ public void onPageAdded(int blockletId, int pageId, TablePage tablePage) throws IOException { - Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet(); - for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) { - List<String> indexedColumns = entry.getKey(); + Set<Map.Entry<List<CarbonColumn>, List<DataMapWriter>>> entries = registry.entrySet(); + for (Map.Entry<List<CarbonColumn>, List<DataMapWriter>> entry : entries) { + List<CarbonColumn> indexedColumns = entry.getKey(); ColumnPage[] pages = new ColumnPage[indexedColumns.size()]; for (int i = 0; i < indexedColumns.size(); i++) { - pages[i] = tablePage.getColumnPage(indexedColumns.get(i)); + pages[i] = tablePage.getColumnPage(indexedColumns.get(i).getColName()); } List<DataMapWriter> writers = entry.getValue(); + int pageSize = pages[0].getPageSize(); + for (DataMapWriter writer : writers) { - writer.onPageAdded(blockletId, pageId, pages); + writer.onPageAdded(blockletId, pageId, pageSize, pages); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index ea75cd2..edf67a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -154,7 +154,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces return null; } - private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) { + private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException { String[] storeLocation = getStoreLocation(tableIdentifier); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel( configuration, storeLocation, 0, iteratorIndex); @@ -302,7 +302,12 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } @Override public void run() { - doExecute(this.iterator, iteratorIndex); + try { + doExecute(this.iterator, iteratorIndex); + } catch (IOException e) { + LOGGER.error(e); + throw new RuntimeException(e); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 2ec85fc..a725936 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -22,7 +22,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Random; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.TableSpec; @@ -233,10 +232,8 @@ public class CarbonFactDataHandlerModel { CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel(); carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(configuration.getSchemaUpdatedTimeStamp()); - carbonFactDataHandlerModel.setDatabaseName( - identifier.getDatabaseName()); - carbonFactDataHandlerModel - .setTableName(identifier.getTableName()); + carbonFactDataHandlerModel.setDatabaseName(identifier.getDatabaseName()); + carbonFactDataHandlerModel.setTableName(identifier.getTableName()); carbonFactDataHandlerModel.setMeasureCount(measureCount); carbonFactDataHandlerModel.setStoreLocation(storeLocation); carbonFactDataHandlerModel.setDimLens(dimLens); @@ -262,8 +259,14 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration); DataMapWriterListener listener = new DataMapWriterListener(); - listener.registerAllWriter(configuration.getTableSpec().getCarbonTable(), - configuration.getSegmentId(), storeLocation[new Random().nextInt(storeLocation.length)]); + listener.registerAllWriter( + configuration.getTableSpec().getCarbonTable(), + configuration.getSegmentId(), + CarbonTablePath.getShardName( + carbonDataFileAttributes.getTaskId(), + bucketId, + 0, + String.valueOf(carbonDataFileAttributes.getFactTimeStamp()))); carbonFactDataHandlerModel.dataMapWriterlistener = listener; carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount(); @@ -328,7 +331,12 @@ public class CarbonFactDataHandlerModel { listener.registerAllWriter( loadModel.getCarbonDataLoadSchema().getCarbonTable(), loadModel.getSegmentId(), - tempStoreLocation[new Random().nextInt(tempStoreLocation.length)]); + CarbonTablePath.getShardName( + CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()), + carbonFactDataHandlerModel.getBucketId(), + carbonFactDataHandlerModel.getTaskExtension(), + String.valueOf(loadModel.getFactTimeStamp()))); + carbonFactDataHandlerModel.dataMapWriterlistener = listener; return carbonFactDataHandlerModel; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9db662a2/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index e5c90e0..6e557cd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -250,8 +250,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private void notifyDataMapBlockStart() { if (listener != null) { try { - String taskName = CarbonTablePath.getUniqueTaskName(carbonDataFileName); - listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, taskName); + listener.onBlockStart(carbonDataFileName); } catch (IOException e) { throw new CarbonDataWriterException("Problem while writing datamap", e); }
