[CARBONDATA-1977][PARTITION] Fix aggregation table loading after loading of partition table.
Aggregate tables are not loading for the partition tables. Because the load events are not fired during the partition table load. This closes #1757 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/829e7aa4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/829e7aa4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/829e7aa4 Branch: refs/heads/branch-1.3 Commit: 829e7aa4e28e065d5162bd0ec48b4edfa4f6dc3d Parents: cdf2c02 Author: ravipesala <[email protected]> Authored: Wed Jan 3 17:02:33 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Sat Jan 6 02:19:09 2018 +0800 ---------------------------------------------------------------------- .../carbondata/events/OperationContext.java | 5 +- .../hadoop/api/CarbonOutputCommitter.java | 20 +++ .../hadoop/api/CarbonTableOutputFormat.java | 5 + .../preaggregate/TestPreAggregateLoad.scala | 22 +++ .../apache/carbondata/events/LoadEvents.scala | 60 -------- .../spark/rdd/CarbonDataRDDFactory.scala | 7 +- .../org/apache/spark/sql/CarbonSession.scala | 1 + .../management/CarbonLoadDataCommand.scala | 21 ++- .../preaaggregate/PreAggregateListeners.scala | 8 +- .../datasources/CarbonFileFormat.scala | 4 + .../spark/sql/optimizer/CarbonFilters.scala | 43 +++++- .../processing/loading/events/LoadEvents.java | 152 +++++++++++++++++++ 12 files changed, 272 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/core/src/main/java/org/apache/carbondata/events/OperationContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/events/OperationContext.java b/core/src/main/java/org/apache/carbondata/events/OperationContext.java index f6fe676..0fcd438 100644 --- a/core/src/main/java/org/apache/carbondata/events/OperationContext.java +++ b/core/src/main/java/org/apache/carbondata/events/OperationContext.java @@ -16,6 +16,7 @@ */ package org.apache.carbondata.events; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; @@ -23,7 +24,9 @@ import java.util.Map; * One OperationContext per one operation. * OperationContext active till operation execution completes */ -public class OperationContext { +public class OperationContext implements Serializable { + + private static final long serialVersionUID = -8808813829717624986L; private Map<String, Object> operationProperties = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index d0a5fd9..525249a 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -32,6 +32,10 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter; +import org.apache.carbondata.events.OperationContext; +import org.apache.carbondata.events.OperationListenerBus; +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil; +import org.apache.carbondata.processing.loading.events.LoadEvents; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonLoaderUtil; @@ -94,6 +98,22 @@ public class CarbonOutputCommitter extends FileOutputCommitter { long segmentSize = CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable); if (segmentSize > 0) { + String operationContextStr = + context.getConfiguration().get( + CarbonTableOutputFormat.OPERATION_CONTEXT, + null); + if (operationContextStr != null) { + OperationContext operationContext = + (OperationContext) ObjectSerializationUtil.convertStringToObject(operationContextStr); + LoadEvents.LoadTablePreStatusUpdateEvent event = + new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(), + loadModel); + try { + OperationListenerBus.getInstance().fireEvent(event, operationContext); + } catch (Exception e) { + throw new IOException(e); + } + } CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet); mergeCarbonIndexFiles(segmentPath); String updateTime = http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 897c929..2c72b39 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -103,6 +103,11 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Stri public static final String SEGMENTS_TO_BE_DELETED = "mapreduce.carbontable.segments.to.be.removed"; + /** + * It is used only to fire events in case of any child tables to be loaded. + */ + public static final String OPERATION_CONTEXT = "mapreduce.carbontable.operation.context"; + private static final Log LOG = LogFactory.getLog(CarbonTableOutputFormat.class); private CarbonOutputCommitter committer; http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala index ff1c330..d794f32 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala @@ -216,4 +216,26 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, originalBadRecordsAction) } + test("test partition load into main table with pre-aggregate table") { + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, city string, age int) partitioned by(name string) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + createAllAggregateTables("maintable") + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + checkAnswer(sql(s"select * from maintable_preagg_avg"), + Seq(Row(1, 31, 1), Row(2, 27, 1), Row(3, 70, 2), Row(4, 55, 2))) + checkAnswer(sql(s"select * from maintable_preagg_count"), + Seq(Row(1, 1), Row(2, 1), Row(3, 2), Row(4, 2))) + checkAnswer(sql(s"select * from maintable_preagg_min"), + Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 26))) + checkAnswer(sql(s"select * from maintable_preagg_max"), + Seq(Row(1, 31), Row(2, 27), Row(3, 35), Row(4, 29))) + sql("drop table if exists maintable") + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala deleted file mode 100644 index 022ad72..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.events - -import org.apache.spark.sql.SparkSession - -import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.processing.loading.model.CarbonLoadModel - -/** - * Class for handling operations before start of a load process. - * Example usage: For validation purpose - */ -case class LoadTablePreExecutionEvent(sparkSession: SparkSession, - carbonTableIdentifier: CarbonTableIdentifier, - carbonLoadModel: CarbonLoadModel, - factPath: String, - isDataFrameDefined: Boolean, - optionsFinal: scala.collection.mutable.Map[String, String], - // userProvidedOptions are needed if we need only the load options given by user - userProvidedOptions: Map[String, String], - isOverWriteTable: Boolean) extends Event with LoadEventInfo - -/** - * Class for handling operations after data load completion and before final - * commit of load operation. Example usage: For loading pre-aggregate tables - */ -case class LoadTablePostExecutionEvent(sparkSession: SparkSession, - carbonTableIdentifier: CarbonTableIdentifier, - carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo - -/** - * Event for handling operations after data load completion and before final - * commit of load operation. Example usage: For loading pre-aggregate tables - */ -case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession, - carbonTableIdentifier: CarbonTableIdentifier, - carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo - -/** - * Class for handling clean up in case of any failure and abort the operation. - */ -case class LoadTableAbortExecutionEvent(sparkSession: SparkSession, - carbonTableIdentifier: CarbonTableIdentifier, - carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index c6a1178..7982071 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -58,10 +58,11 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.loading.sort.SortScopeOptions @@ -527,11 +528,9 @@ object CarbonDataRDDFactory { writeDictionary(carbonLoadModel, result, writeAll = false) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = - LoadTablePreStatusUpdateEvent( - sqlContext.sparkSession, + new LoadTablePreStatusUpdateEvent( carbonTable.getCarbonTableIdentifier, carbonLoadModel) - operationContext.setProperty("isOverwrite", overwriteTable) OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) val done = updateTableStatus( http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index b4e11c1..7ee3434 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -35,6 +35,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, ThreadLocalSessionInfo} import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 01bb5b3..0c6879c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -62,12 +62,14 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException import org.apache.carbondata.format +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.{BadRecordFoundException, NoRetryException} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.util.CarbonLoaderUtil @@ -167,14 +169,15 @@ case class CarbonLoadDataCommand( try { val operationContext = new OperationContext val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = - LoadTablePreExecutionEvent(sparkSession, + new LoadTablePreExecutionEvent( table.getCarbonTableIdentifier, carbonLoadModel, factPath, dataFrame.isDefined, - optionsFinal, - options, + optionsFinal.asJava, + options.asJava, isOverwriteTable) + operationContext.setProperty("isOverwrite", isOverwriteTable) OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") @@ -232,7 +235,7 @@ case class CarbonLoadDataCommand( operationContext) } val loadTablePostExecutionEvent: LoadTablePostExecutionEvent = - new LoadTablePostExecutionEvent(sparkSession, + new LoadTablePostExecutionEvent( table.getCarbonTableIdentifier, carbonLoadModel) OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) @@ -614,7 +617,8 @@ case class CarbonLoadDataCommand( sizeInBytes, isOverwriteTable, carbonLoadModel, - sparkSession) + sparkSession, + operationContext) val convertedPlan = CarbonReflectionUtils.getInsertIntoCommand( table = convertRelation, @@ -679,7 +683,8 @@ case class CarbonLoadDataCommand( sizeInBytes: Long, overWrite: Boolean, loadModel: CarbonLoadModel, - sparkSession: SparkSession): LogicalRelation = { + sparkSession: SparkSession, + operationContext: OperationContext): LogicalRelation = { val table = loadModel.getCarbonDataLoadSchema.getCarbonTable val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType))) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions @@ -701,6 +706,7 @@ case class CarbonLoadDataCommand( val dataSchema = StructType(metastoreSchema .filterNot(field => partitionSchema.contains(field.name))) + val operationContextStr = ObjectSerializationUtil.convertObjectToString(operationContext) val options = new mutable.HashMap[String, String]() options ++= catalogTable.storage.properties options += (("overwrite", overWriteLocal.toString)) @@ -708,6 +714,7 @@ case class CarbonLoadDataCommand( options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) options += (("staticpartition", partition.nonEmpty.toString)) + options += (("operationcontext", operationContextStr)) options ++= this.options if (updateModel.isDefined) { options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 5e232f6..ea5cfed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand @@ -28,6 +29,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} object LoadPostAggregateListener extends OperationEventListener { /** @@ -37,8 +39,8 @@ object LoadPostAggregateListener extends OperationEventListener { */ override def onEvent(event: Event, operationContext: OperationContext): Unit = { val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] - val sparkSession = loadEvent.sparkSession - val carbonLoadModel = loadEvent.carbonLoadModel + val sparkSession = SparkSession.getActiveSession.get + val carbonLoadModel = loadEvent.getCarbonLoadModel val table = CarbonEnv.getCarbonTable(Option(carbonLoadModel.getDatabaseName), carbonLoadModel.getTableName)(sparkSession) if (CarbonUtil.hasAggregationDataMap(table)) { @@ -126,7 +128,7 @@ object LoadPreAggregateTablePreListener extends OperationEventListener { */ override def onEvent(event: Event, operationContext: OperationContext): Unit = { val loadEvent = event.asInstanceOf[LoadTablePreExecutionEvent] - val carbonLoadModel = loadEvent.carbonLoadModel + val carbonLoadModel = loadEvent.getCarbonLoadModel val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val isInternalLoadCall = carbonLoadModel.isAggLoadRequest if (table.isChildDataMap && !isInternalLoadCall) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 15deae1..4b368de 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -124,6 +124,10 @@ with Serializable { if (segemntsTobeDeleted.isDefined) { conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get) } + val operationContextStr = options.get("operationcontext") + if (operationContextStr.isDefined) { + conf.set(CarbonTableOutputFormat.OPERATION_CONTEXT, operationContextStr.get) + } CarbonTableOutputFormat.setLoadModel(conf, model) new OutputWriterFactory { http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 09546cd..5027a66 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.optimizer +import scala.collection.JavaConverters._ + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.CastExpressionOptimization @@ -26,12 +28,14 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.PartitionMapFileStore import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -418,6 +422,11 @@ object CarbonFilters { def getPartitions(partitionFilters: Seq[Expression], sparkSession: SparkSession, identifier: TableIdentifier): Seq[String] = { + // first try to read partitions in case if the trigger comes from the aggregation table load. + val partitionsForAggTable = getPartitionsForAggTable(sparkSession, identifier) + if (partitionsForAggTable.isDefined) { + return partitionsForAggTable.get + } val partitions = { try { if (CarbonProperties.getInstance(). @@ -448,5 +457,37 @@ object CarbonFilters { }.toSet.toSeq } + /** + * In case of loading aggregate tables it needs to be get only from the main table load in + * progress segment. So we should read from the partition map file of that segment. + */ + def getPartitionsForAggTable(sparkSession: SparkSession, + identifier: TableIdentifier): Option[Seq[String]] = { + // when validate segments is disabled then only read from partitionmap + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (carbonSessionInfo != null) { + val validateSegments = carbonSessionInfo.getSessionParams + .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + + CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + "." + + identifier.table, "true").toBoolean + if (!validateSegments) { + val segmentNumbersFromProperty = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + + CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + + "." + identifier.table) + val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession) + val segmentPath = + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNumbersFromProperty) + val partitionMapper = new PartitionMapFileStore() + partitionMapper.readAllPartitionsOfSegment(segmentPath) + Some(partitionMapper.getPartitionMap.asScala.map(_._2).flatMap(_.asScala).toSet.toSeq) + } else { + None + } + } else { + None + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/829e7aa4/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java new file mode 100644 index 0000000..b00a67e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading.events; + +import java.util.Map; + +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.events.Event; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +public class LoadEvents { + /** + * Class for handling operations before start of a load process. + * Example usage: For validation purpose + */ + public static class LoadTablePreExecutionEvent extends Event { + private CarbonTableIdentifier carbonTableIdentifier; + private CarbonLoadModel carbonLoadModel; + private String factPath; + private boolean isDataFrameDefined; + private Map<String, String> optionsFinal; + // userProvidedOptions are needed if we need only the load options given by user + private Map<String, String> userProvidedOptions; + private boolean isOverWriteTable; + + public LoadTablePreExecutionEvent(CarbonTableIdentifier carbonTableIdentifier, + CarbonLoadModel carbonLoadModel, String factPath, boolean isDataFrameDefined, + Map<String, String> optionsFinal, Map<String, String> userProvidedOptions, + boolean isOverWriteTable) { + this.carbonTableIdentifier = carbonTableIdentifier; + this.carbonLoadModel = carbonLoadModel; + this.factPath = factPath; + this.isDataFrameDefined = isDataFrameDefined; + this.optionsFinal = optionsFinal; + this.userProvidedOptions = userProvidedOptions; + this.isOverWriteTable = isOverWriteTable; + } + + public CarbonTableIdentifier getCarbonTableIdentifier() { + return carbonTableIdentifier; + } + + public CarbonLoadModel getCarbonLoadModel() { + return carbonLoadModel; + } + + public String getFactPath() { + return factPath; + } + + public boolean isDataFrameDefined() { + return isDataFrameDefined; + } + + public Map<String, String> getOptionsFinal() { + return optionsFinal; + } + + public Map<String, String> getUserProvidedOptions() { + return userProvidedOptions; + } + + public boolean isOverWriteTable() { + return isOverWriteTable; + } + } + + /** + * Class for handling operations after data load completion and before final + * commit of load operation. Example usage: For loading pre-aggregate tables + */ + + public static class LoadTablePostExecutionEvent extends Event { + private CarbonTableIdentifier carbonTableIdentifier; + private CarbonLoadModel carbonLoadModel; + + public LoadTablePostExecutionEvent(CarbonTableIdentifier carbonTableIdentifier, + CarbonLoadModel carbonLoadModel) { + this.carbonTableIdentifier = carbonTableIdentifier; + this.carbonLoadModel = carbonLoadModel; + } + + public CarbonTableIdentifier getCarbonTableIdentifier() { + return carbonTableIdentifier; + } + + public CarbonLoadModel getCarbonLoadModel() { + return carbonLoadModel; + } + } + + /** + * Event for handling operations after data load completion and before final + * commit of load operation. Example usage: For loading pre-aggregate tables + */ + + public static class LoadTablePreStatusUpdateEvent extends Event { + private CarbonLoadModel carbonLoadModel; + private CarbonTableIdentifier carbonTableIdentifier; + + public LoadTablePreStatusUpdateEvent(CarbonTableIdentifier carbonTableIdentifier, + CarbonLoadModel carbonLoadModel) { + this.carbonTableIdentifier = carbonTableIdentifier; + this.carbonLoadModel = carbonLoadModel; + } + + public CarbonLoadModel getCarbonLoadModel() { + return carbonLoadModel; + } + + public CarbonTableIdentifier getCarbonTableIdentifier() { + return carbonTableIdentifier; + } + } + + /** + * Class for handling clean up in case of any failure and abort the operation. + */ + + public static class LoadTableAbortExecutionEvent extends Event { + private CarbonTableIdentifier carbonTableIdentifier; + private CarbonLoadModel carbonLoadModel; + public LoadTableAbortExecutionEvent(CarbonTableIdentifier carbonTableIdentifier, + CarbonLoadModel carbonLoadModel) { + this.carbonTableIdentifier = carbonTableIdentifier; + this.carbonLoadModel = carbonLoadModel; + } + + public CarbonTableIdentifier getCarbonTableIdentifier() { + return carbonTableIdentifier; + } + + public CarbonLoadModel getCarbonLoadModel() { + return carbonLoadModel; + } + } +}
