Repository: carbondata Updated Branches: refs/heads/master ceac8abf6 -> 860e144d4
http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index be5287f..f64299c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.command._ import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datamap.DataMapProvider +import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} @@ -69,6 +69,29 @@ case class CarbonCreateDataMapCommand( } dataMapSchema = new DataMapSchema(dataMapName, dmClassName) + // TODO: move this if logic inside lucene module + if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.toString)) { + val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala + if (datamaps.nonEmpty) { + datamaps.foreach(datamap => { + val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns") + val existingColumns = dmProperties("text_columns") + + def getAllSubString(columns: String): Set[String] = { + columns.inits.flatMap(_.tails).toSet + } + + val existingClmSets = getAllSubString(existingColumns) + val dmColumnsSets = getAllSubString(dmColumns) + val duplicateDMColumn = existingClmSets.intersect(dmColumnsSets).maxBy(_.length) + if (!duplicateDMColumn.isEmpty) { + throw new MalformedDataMapCommandException( + s"Create lucene datamap $dataMapName failed, datamap already exists on column(s) " + + s"$duplicateDMColumn") + } + }) + } + } if (mainTable != null && mainTable.isStreamingTable && !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString) http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala index 21aba7d..613c8b2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -28,10 +28,12 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.spark.sql.types.StringType import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider import org.apache.carbondata.core.metadata.schema.table.DataMapSchema /** * Show the datamaps on the table + * * @param tableIdentifier */ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) @@ -44,20 +46,22 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) } override def processData(sparkSession: SparkSession): Seq[Row] = { + val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]() tableIdentifier match { case Some(table) => Checker.validateTableExists(table.database, table.table, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession) if (carbonTable.hasDataMapSchema) { - val schemaList = carbonTable.getTableInfo.getDataMapSchemaList - convertToRow(schemaList) - } else { - convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)) + dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList) + } + val indexSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable) + if (!indexSchemas.isEmpty) { + dataMapSchemaList.addAll(indexSchemas) } + convertToRow(dataMapSchemaList) case _ => convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas) } - } private def convertToRow(schemaList: util.List[DataMapSchema]) = { @@ -65,9 +69,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) schemaList.asScala.map { s => var table = "(NA)" val relationIdentifier = s.getRelationIdentifier - if (relationIdentifier != null) { table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName - } Row(s.getDataMapName, s.getProviderName, table) } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index ca9a6a1..1b087bd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 458bc8d..07cdf7c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -110,9 +110,10 @@ case class CarbonDropTableCommand( dropCommand } childDropCommands.foreach(_.processMetadata(sparkSession)) - } else { - val schemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable) - childDropDataMapCommands = schemas.asScala.map{ schema => + } + val indexDatamapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable) + if (!indexDatamapSchemas.isEmpty) { + childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema => val command = CarbonDropDataMapCommand(schema.getDataMapName, ifExistsSet, Some(TableIdentifier(tableName, Some(dbName))), http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index ad3ad2e..c58d02d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand} -import org.apache.spark.sql.execution.command.datamap.CarbonDataMapRefreshCommand import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable} import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.parser.CarbonSpark2SqlParser @@ -37,7 +36,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.processing.merger.CompactionType /** * Carbon strategies for ddl commands http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 2e39c91..14950eb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -85,10 +85,10 @@ public class DataMapWriterListener { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId, String blockPath) throws IOException { + public void onBlockStart(String blockId, String blockPath, long taskId) throws IOException { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId); + writer.onBlockStart(blockId, taskId); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 5783fe5..94ade87 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -250,7 +250,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private void notifyDataMapBlockStart() { if (listener != null) { try { - listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath); + listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath, + model.getCarbonDataFileAttributes().getTaskId()); } catch (IOException e) { throw new CarbonDataWriterException("Problem while writing datamap", e); }