[CARBONDATA-2012] Add support to load pre-aggregate in one transaction Current if a table(t1) has 2 preaggregate table(p1,p2) then while loading all the pre-aggregate tables are committed(table status writing) and then the parent table is committed.
After this PR the flow would be like this: load t1 load p1 load p2 write table status for p2 with transactionID write table status for p1 with transactionID rename tablestatus_UUID to tablestatus for p2 rename tablestatus_UUID to tablestatus for p1 write table status for t1 This closes #1781 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d680e9cf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d680e9cf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d680e9cf Branch: refs/heads/branch-1.3 Commit: d680e9cf5016475e6e9b320c27be6503e1c6e66c Parents: c9a02fc Author: kunal642 <[email protected]> Authored: Mon Jan 15 14:35:56 2018 +0530 Committer: ravipesala <[email protected]> Committed: Thu Feb 1 14:42:05 2018 +0530 ---------------------------------------------------------------------- .../datastore/filesystem/LocalCarbonFile.java | 2 +- .../statusmanager/SegmentStatusManager.java | 29 ++- .../core/util/path/CarbonTablePath.java | 8 + .../hadoop/api/CarbonOutputCommitter.java | 4 + .../carbondata/events/AlterTableEvents.scala | 10 + .../spark/rdd/AggregateDataMapCompactor.scala | 31 ++- .../spark/rdd/CarbonDataRDDFactory.scala | 37 +++- .../spark/rdd/CarbonTableCompactor.scala | 33 ++- .../scala/org/apache/spark/sql/CarbonEnv.scala | 4 +- .../management/CarbonLoadDataCommand.scala | 25 ++- .../CreatePreAggregateTableCommand.scala | 7 +- .../preaaggregate/PreAggregateListeners.scala | 220 +++++++++++++++++-- .../preaaggregate/PreAggregateUtil.scala | 35 +-- .../processing/loading/events/LoadEvents.java | 13 ++ .../processing/util/CarbonLoaderUtil.java | 49 ++++- 15 files changed, 431 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java index 4ce78be..5df5a81 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFile.java @@ -233,7 +233,7 @@ public class LocalCarbonFile implements CarbonFile { @Override public boolean renameForce(String changetoName) { File destFile = new File(changetoName); - if (destFile.exists()) { + if (destFile.exists() && !file.getAbsolutePath().equals(destFile.getAbsolutePath())) { if (destFile.delete()) { return file.renameTo(new File(changetoName)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 6af0304..01f810e 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -178,23 +178,42 @@ public class SegmentStatusManager { * @return */ public static LoadMetadataDetails[] readLoadMetadata(String metadataFolderPath) { + String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.LOADMETADATA_FILENAME; + return readTableStatusFile(metadataFileName); + } + + /** + * Reads the table status file with the specified UUID if non empty. + */ + public static LoadMetadataDetails[] readLoadMetadata(String metaDataFolderPath, String uuid) { + String tableStatusFileName; + if (uuid.isEmpty()) { + tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.LOADMETADATA_FILENAME; + } else { + tableStatusFileName = metaDataFolderPath + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.LOADMETADATA_FILENAME + CarbonCommonConstants.UNDERSCORE + uuid; + } + return readTableStatusFile(tableStatusFileName); + } + + public static LoadMetadataDetails[] readTableStatusFile(String tableStatusPath) { Gson gsonObjectToRead = new Gson(); DataInputStream dataInputStream = null; BufferedReader buffReader = null; InputStreamReader inStream = null; - String metadataFileName = metadataFolderPath + CarbonCommonConstants.FILE_SEPARATOR - + CarbonCommonConstants.LOADMETADATA_FILENAME; LoadMetadataDetails[] listOfLoadFolderDetailsArray; AtomicFileOperations fileOperation = - new AtomicFileOperationsImpl(metadataFileName, FileFactory.getFileType(metadataFileName)); + new AtomicFileOperationsImpl(tableStatusPath, FileFactory.getFileType(tableStatusPath)); try { - if (!FileFactory.isFileExist(metadataFileName, FileFactory.getFileType(metadataFileName))) { + if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { return new LoadMetadataDetails[0]; } dataInputStream = fileOperation.openForRead(); inStream = new InputStreamReader(dataInputStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)); buffReader = new BufferedReader(inStream); listOfLoadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 9e66657..fab6289 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -252,6 +252,14 @@ public class CarbonTablePath extends Path { return getMetaDataDir() + File.separator + TABLE_STATUS_FILE; } + public String getTableStatusFilePathWithUUID(String uuid) { + if (!uuid.isEmpty()) { + return getTableStatusFilePath() + CarbonCommonConstants.UNDERSCORE + uuid; + } else { + return getTableStatusFilePath(); + } + } + /** * Gets absolute path of data file * http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index f6e928d..9cca1bb 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -115,8 +115,12 @@ public class CarbonOutputCommitter extends FileOutputCommitter { LoadEvents.LoadTablePreStatusUpdateEvent event = new LoadEvents.LoadTablePreStatusUpdateEvent(carbonTable.getCarbonTableIdentifier(), loadModel); + LoadEvents.LoadTablePostStatusUpdateEvent postStatusUpdateEvent = + new LoadEvents.LoadTablePostStatusUpdateEvent(loadModel); try { OperationListenerBus.getInstance().fireEvent(event, (OperationContext) operationContext); + OperationListenerBus.getInstance().fireEvent(postStatusUpdateEvent, + (OperationContext) operationContext); } catch (Exception e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala index 30e3f6f..ca1948a 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala @@ -182,6 +182,16 @@ case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession, mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo /** + * Compaction Event for handling post update status file operations, like committing child + * datamaps in one transaction + */ +case class AlterTableCompactionPostStatusUpdateEvent( + carbonTable: CarbonTable, + carbonMergerMapping: CarbonMergerMapping, + carbonLoadModel: CarbonLoadModel, + mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo + +/** * Compaction Event for handling clean up in case of any compaction failure and abort the * operation, lister has to implement this event to handle failure scenarios * http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index 5f8f389..188e776 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.OperationContext @@ -61,6 +62,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, CarbonSession.updateSessionInfoToCurrentThread(sqlContext.sparkSession) val loadCommand = operationContext.getProperty(carbonTable.getTableName + "_Compaction") .asInstanceOf[CarbonLoadDataCommand] + val uuid = Option(loadCommand.operationContext.getProperty("uuid")).getOrElse("").toString try { val newInternalOptions = loadCommand.internalOptions ++ Map("mergedSegmentName" -> mergedLoadName) @@ -70,7 +72,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, sqlContext.sparkSession, loadCommand.logicalPlan.get)) loadCommand.processData(sqlContext.sparkSession) val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( - carbonTable.getMetaDataFilepath) + carbonTable.getMetaDataFilepath, uuid) val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect { case load if loadMetaDataDetails.contains(load) => load.setMergedLoadName(mergedLoadName) @@ -83,16 +85,37 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable .getAbsoluteTableIdentifier) SegmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath, + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePathWithUUID(uuid), updatedLoadMetaDataDetails) carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava) } finally { // check if any other segments needs compaction on in case of MINOR_COMPACTION. // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold // allows it. + // Also as the load which will be fired for 2nd level compaction will read the + // tablestatus file and not the tablestatus_UUID therefore we have to commit the + // intermediate tablestatus file for 2nd level compaction to be successful. + // This is required because: + // 1. after doing 12 loads and a compaction after every 4 loads the table status file will + // have 0.1, 4.1, 8, 9, 10, 11 as Success segments. While tablestatus_UUID will have + // 0.1, 4.1, 8.1. + // 2. Now for 2nd level compaction 0.1, 8.1, 4.1 have to be merged to 0.2. therefore we + // need to read the tablestatus_UUID. But load flow should always read tablestatus file + // because it contains the actual In-Process status for the segments. + // 3. If we read the tablestatus then 8, 9, 10, 11 will keep getting compacted into 8.1. + // 4. Therefore tablestatus file will be committed in between multiple commits. if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) { - - executeCompaction() + if (!identifySegmentsToBeMerged().isEmpty) { + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + .getAbsoluteTableIdentifier) + val uuidTableStaus = carbonTablePath.getTableStatusFilePathWithUUID(uuid) + val tableStatus = carbonTablePath.getTableStatusFilePath + if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) { + FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus) + } + executeCompaction() + } } CarbonSession .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8212e85..3de0e70 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -39,6 +39,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD} import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext} import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel} +import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException @@ -62,7 +63,7 @@ import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable} -import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.exception.{CarbonDataLoadingException, NoRetryException} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.loading.sort.SortScopeOptions @@ -491,9 +492,10 @@ object CarbonDataRDDFactory { } return } + val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String] if (loadStatus == SegmentStatus.LOAD_FAILURE) { // update the load entry in table status file for changing the status to marked for delete - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") @@ -508,7 +510,7 @@ object CarbonDataRDDFactory { status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS && carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) { // update the load entry in table status file for changing the status to marked for delete - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") @@ -532,6 +534,8 @@ object CarbonDataRDDFactory { } writeDictionary(carbonLoadModel, result, writeAll = false) + operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment", + carbonLoadModel.getSegmentId) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = new LoadTablePreStatusUpdateEvent( carbonTable.getCarbonTableIdentifier, @@ -543,9 +547,21 @@ object CarbonDataRDDFactory { carbonLoadModel, loadStatus, newEntryLoadStatus, - overwriteTable) - if (!done) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + overwriteTable, + uniqueTableStatusId) + val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent = + new LoadTablePostStatusUpdateEvent(carbonLoadModel) + val commitComplete = try { + OperationListenerBus.getInstance() + .fireEvent(loadTablePostStatusUpdateEvent, operationContext) + true + } catch { + case ex: Exception => + LOGGER.error(ex, "Problem while committing data maps") + false + } + if (!done && !commitComplete) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId) LOGGER.info("********starting clean up**********") CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) LOGGER.info("********clean up done**********") @@ -731,7 +747,8 @@ object CarbonDataRDDFactory { operationContext: OperationContext): Unit = { LOGGER.info(s"compaction need status is" + s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }") - if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { + if (!carbonTable.isChildDataMap && + CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) { LOGGER.audit(s"Compaction request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val compactionSize = 0 @@ -805,7 +822,8 @@ object CarbonDataRDDFactory { carbonLoadModel: CarbonLoadModel, loadStatus: SegmentStatus, newEntryLoadStatus: SegmentStatus, - overwriteTable: Boolean): Boolean = { + overwriteTable: Boolean, + uuid: String = ""): Boolean = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val metadataDetails = if (status != null && status.size > 0 && status(0) != null) { status(0)._2._1 @@ -820,7 +838,7 @@ object CarbonDataRDDFactory { CarbonLoaderUtil .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable) val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false, - overwriteTable) + overwriteTable, uuid) if (!done) { val errorMessage = s"Dataload failed due to failure in table status updation for" + s" ${carbonLoadModel.getTableName}" @@ -835,7 +853,6 @@ object CarbonDataRDDFactory { done } - /** * repartition the input data for partition table. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index a0c8f65..8406d8d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -32,10 +32,12 @@ import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager} import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} +import org.apache.carbondata.events._ +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl +import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER import org.apache.carbondata.spark.util.CommonUtil /** @@ -245,8 +247,33 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, CarbonDataMergerUtil .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath, mergedLoadNumber, carbonLoadModel, compactionType) - - if (!statusFileUpdation) { + val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable, + carbonMergerMapping, + carbonLoadModel, + mergedLoadName) + // Used to inform the commit listener that the commit is fired from compaction flow. + operationContext.setProperty("isCompaction", "true") + val commitComplete = try { + // Once main table compaction is done and 0.1, 4.1, 8.1 is created commit will happen for + // all the tables. The commit listener will compact the child tables until no more segments + // are left. But 2nd level compaction is yet to happen on the main table therefore again the + // compaction flow will try to commit the child tables which is wrong. This check tell the + // 2nd level compaction flow that the commit for datamaps is already done. + val isCommitDone = operationContext.getProperty("commitComplete") + if (isCommitDone != null) { + isCommitDone.toString.toBoolean + } else { + OperationListenerBus.getInstance() + .fireEvent(compactionLoadStatusPostEvent, operationContext) + true + } + } catch { + case ex: Exception => + LOGGER.error(ex, "Problem while committing data maps") + false + } + operationContext.setProperty("commitComplete", commitComplete) + if (!statusFileUpdation && !commitComplete) { LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + s"${ carbonLoadModel.getTableName }") LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." + http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 870b1f3..40035ce 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -33,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util._ import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} import org.apache.carbondata.spark.rdd.SparkReadSupport import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl @@ -148,6 +148,8 @@ object CarbonEnv { AlterPreAggregateTableCompactionPostListener) .addListener(classOf[LoadMetadataEvent], LoadProcessMetaListener) .addListener(classOf[LoadMetadataEvent], CompactionProcessMetaListener) + .addListener(classOf[LoadTablePostStatusUpdateEvent], CommitPreAggregateListener) + .addListener(classOf[AlterTableCompactionPostStatusUpdateEvent], CommitPreAggregateListener) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 226a625..8e6c20e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command.management import java.text.SimpleDateFormat import java.util +import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable @@ -35,8 +36,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel} @@ -119,6 +120,7 @@ case class CarbonLoadDataCommand( } Seq.empty } + override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val carbonProperty: CarbonProperties = CarbonProperties.getInstance() @@ -176,7 +178,18 @@ case class CarbonLoadDataCommand( LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") TableProcessingOperations.deletePartialLoadDataIfExist(table, false) var isUpdateTableStatusRequired = false + // if the table is child then extract the uuid from the operation context and the parent would + // already generated UUID. + // if parent table then generate a new UUID else use empty. + val uuid = if (table.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString + } else if (table.hasAggregationDataMap) { + UUID.randomUUID().toString + } else { + "" + } try { + operationContext.setProperty("uuid", uuid) val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = new LoadTablePreExecutionEvent( table.getCarbonTableIdentifier, @@ -194,7 +207,9 @@ case class CarbonLoadDataCommand( DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { - CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable) + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( + carbonLoadModel, + isOverwriteTable) isUpdateTableStatusRequired = true } if (isOverwriteTable) { @@ -252,7 +267,7 @@ case class CarbonLoadDataCommand( case CausedBy(ex: NoRetryException) => // update the load entry in table status file for changing the status to marked for delete if (isUpdateTableStatusRequired) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } LOGGER.error(ex, s"Dataload failure for $dbName.$tableName") throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") @@ -263,7 +278,7 @@ case class CarbonLoadDataCommand( LOGGER.error(ex) // update the load entry in table status file for changing the status to marked for delete if (isUpdateTableStatusRequired) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel) + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs") throw ex http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala index dbbf90c..3de75c2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala @@ -205,11 +205,12 @@ case class CreatePreAggregateTableCommand( loadCommand.dataFrame = Some(PreAggregateUtil .getDataFrame(sparkSession, loadCommand.logicalPlan.get)) PreAggregateUtil.startDataLoadForDataMap( - parentTable, + TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)), segmentToLoad = "*", validateSegments = true, - sparkSession, - loadCommand) + loadCommand, + isOverwrite = false, + sparkSession) } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 7b273ba..ed6be97 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -17,19 +17,26 @@ package org.apache.spark.sql.execution.command.preaaggregate +import java.util.UUID + import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.{SparkSession} +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.AlterTableModel import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand} import org.apache.spark.sql.parser.CarbonSpark2SqlParser -import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable} +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events._ -import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} +import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent} /** * below class will be used to create load command for compaction @@ -71,9 +78,13 @@ object CompactionProcessMetaListener extends OperationEventListener { childDataFrame, false, sparkSession) + val uuid = Option(operationContext.getProperty("uuid")). + getOrElse(UUID.randomUUID()).toString + operationContext.setProperty("uuid", uuid) loadCommand.processMetadata(sparkSession) operationContext .setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", loadCommand) + loadCommand.operationContext = operationContext } } else if (table.isChildDataMap) { val childTableName = table.getTableName @@ -95,9 +106,13 @@ object CompactionProcessMetaListener extends OperationEventListener { childDataFrame, false, sparkSession) + val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString loadCommand.processMetadata(sparkSession) operationContext.setProperty(table.getTableName + "_Compaction", loadCommand) + operationContext.setProperty("uuid", uuid) + loadCommand.operationContext = operationContext } + } } @@ -127,12 +142,17 @@ object LoadProcessMetaListener extends OperationEventListener { val sortedList = aggregationDataMapList.sortBy(_.getOrdinal) val parentTableName = table.getTableName val databaseName = table.getDatabaseName + // if the table is child then extract the uuid from the operation context and the parent + // would already generated UUID. + // if parent table then generate a new UUID else use empty. + val uuid = + Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString val list = scala.collection.mutable.ListBuffer.empty[AggregationDataMapSchema] for (dataMapSchema: AggregationDataMapSchema <- sortedList) { val childTableName = dataMapSchema.getRelationIdentifier.getTableName val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName val childSelectQuery = if (!dataMapSchema.isTimeseriesDataMap) { - PreAggregateUtil.getChildQuery(dataMapSchema) + (PreAggregateUtil.getChildQuery(dataMapSchema), "") } else { // for timeseries rollup policy val tableSelectedForRollup = PreAggregateUtil.getRollupDataMapNameForTimeSeries(list, @@ -140,18 +160,19 @@ object LoadProcessMetaListener extends OperationEventListener { list += dataMapSchema // if non of the rollup data map is selected hit the maintable and prepare query if (tableSelectedForRollup.isEmpty) { - PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, + (PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMapSchema.getChildSchema, parentTableName, - databaseName) + databaseName), "") } else { // otherwise hit the select rollup datamap schema - PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, + (PreAggregateUtil.createTimeseriesSelectQueryForRollup(dataMapSchema.getChildSchema, tableSelectedForRollup.get, - databaseName) + databaseName), + s"$databaseName.${tableSelectedForRollup.get.getChildSchema.getTableName}") } } val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction( - childSelectQuery)).drop("preAggLoad") + childSelectQuery._1)).drop("preAggLoad") val isOverwrite = operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] val loadCommand = PreAggregateUtil.createLoadCommandForChild( @@ -159,7 +180,10 @@ object LoadProcessMetaListener extends OperationEventListener { TableIdentifier(childTableName, Some(childDatabaseName)), childDataFrame, isOverwrite, - sparkSession) + sparkSession, + timeseriesParentTableName = childSelectQuery._2) + operationContext.setProperty("uuid", uuid) + loadCommand.operationContext.setProperty("uuid", uuid) loadCommand.processMetadata(sparkSession) operationContext.setProperty(dataMapSchema.getChildSchema.getTableName, loadCommand) } @@ -191,25 +215,172 @@ object LoadPostAggregateListener extends OperationEventListener { .asInstanceOf[CarbonLoadDataCommand] childLoadCommand.dataFrame = Some(PreAggregateUtil .getDataFrame(sparkSession, childLoadCommand.logicalPlan.get)) - val childOperationContext = new OperationContext - childOperationContext - .setProperty(dataMapSchema.getChildSchema.getTableName, - operationContext.getProperty(dataMapSchema.getChildSchema.getTableName)) val isOverwrite = operationContext.getProperty("isOverwrite").asInstanceOf[Boolean] - childOperationContext.setProperty("isOverwrite", isOverwrite) - childOperationContext.setProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction", - operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")) - childLoadCommand.operationContext = childOperationContext + childLoadCommand.operationContext = operationContext + val timeseriesParent = childLoadCommand.internalOptions.get("timeseriesParent") + val (parentTableIdentifier, segmentToLoad) = + if (timeseriesParent.isDefined && timeseriesParent.get.nonEmpty) { + val (parentTableDatabase, parentTableName) = + (timeseriesParent.get.split('.')(0), timeseriesParent.get.split('.')(1)) + (TableIdentifier(parentTableName, Some(parentTableDatabase)), + operationContext.getProperty( + s"${parentTableDatabase}_${parentTableName}_Segment").toString) + } else { + (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), + carbonLoadModel.getSegmentId) + } PreAggregateUtil.startDataLoadForDataMap( - table, - carbonLoadModel.getSegmentId, + parentTableIdentifier, + segmentToLoad, validateSegments = false, - sparkSession, - childLoadCommand) + childLoadCommand, + isOverwrite, + sparkSession) + } + } + } +} + +/** + * This listener is used to commit all the child data aggregate tables in one transaction. If one + * failes all will be reverted to original state. + */ +object CommitPreAggregateListener extends OperationEventListener { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + override protected def onEvent(event: Event, + operationContext: OperationContext): Unit = { + // The same listener is called for both compaction and load therefore getting the + // carbonLoadModel from the appropriate event. + val carbonLoadModel = event match { + case loadEvent: LoadTablePostStatusUpdateEvent => + loadEvent.getCarbonLoadModel + case compactionEvent: AlterTableCompactionPostStatusUpdateEvent => + compactionEvent.carbonLoadModel + } + val isCompactionFlow = Option( + operationContext.getProperty("isCompaction")).getOrElse("false").toString.toBoolean + val dataMapSchemas = + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getDataMapSchemaList + // extract all child LoadCommands + val childLoadCommands = if (!isCompactionFlow) { + // If not compaction flow then the key for load commands will be tableName + dataMapSchemas.asScala.map { dataMapSchema => + operationContext.getProperty(dataMapSchema.getChildSchema.getTableName) + .asInstanceOf[CarbonLoadDataCommand] + } + } else { + // If not compaction flow then the key for load commands will be tableName_Compaction + dataMapSchemas.asScala.map { dataMapSchema => + operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction") + .asInstanceOf[CarbonLoadDataCommand] + } + } + if (dataMapSchemas.size() > 0) { + val uuid = operationContext.getProperty("uuid").toString + // keep committing until one fails + val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand => + val childCarbonTable = childLoadCommand.table + val carbonTablePath = + new CarbonTablePath(childCarbonTable.getCarbonTableIdentifier, + childCarbonTable.getTablePath) + // Generate table status file name with UUID, forExample: tablestatus_1 + val oldTableSchemaPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid) + // Generate table status file name without UUID, forExample: tablestatus + val newTableSchemaPath = carbonTablePath.getTableStatusFilePath + renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid) + } + // if true then the commit for one of the child tables has failed + val commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.size()) != 0 + if (commitFailed) { + LOGGER.warn("Reverting table status file to original state") + renamedDataMaps.foreach { + loadCommand => + val carbonTable = loadCommand.table + val carbonTablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, + carbonTable.getTablePath) + // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus + val backupTableSchemaPath = carbonTablePath.getTableStatusFilePath + "_backup_" + uuid + val tableSchemaPath = carbonTablePath.getTableStatusFilePath + markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, loadCommand) + renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "") } } + // after success/failure of commit delete all tablestatus files with UUID in their names. + // if commit failed then remove the segment directory + cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table), + operationContext, + uuid) + if (commitFailed) { + sys.error("Failed to update table status for pre-aggregate table") + } + } + + + } + + private def markInProgressSegmentAsDeleted(tableStatusFile: String, + operationContext: OperationContext, + loadDataCommand: CarbonLoadDataCommand): Unit = { + val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile) + val segmentBeingLoaded = + operationContext.getProperty(loadDataCommand.table.getTableUniqueName + "_Segment").toString + val newDetails = loadMetaDataDetails.collect { + case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) => + detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE) + detail + case others => others + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails) + } + + /** + * Used to rename table status files for commit operation. + */ + private def renameDataMapTableStatusFiles(sourceFileName: String, + destinationFileName: String, uuid: String) = { + val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName) + val newCarbonFile = FileFactory.getCarbonFile(destinationFileName) + if (oldCarbonFile.exists() && newCarbonFile.exists()) { + val backUpPostFix = if (uuid.nonEmpty) { + "_backup_" + uuid + } else { + "" + } + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}") + if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) { + LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName") + oldCarbonFile.renameForce(destinationFileName) + } else { + LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed") + false + } + } else { + false } + } + + /** + * Used to remove table status files with UUID and segment folders. + */ + private def cleanUpStaleTableStatusFiles( + childTables: Seq[CarbonTable], + operationContext: OperationContext, + uuid: String): Unit = { + childTables.foreach { childTable => + val carbonTablePath = new CarbonTablePath(childTable.getCarbonTableIdentifier, + childTable.getTablePath) + val metaDataDir = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath) + val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + file.getName.contains(uuid) || file.getName.contains("backup") + } + }) + tableStatusFiles.foreach(_.delete()) + } + } } /** @@ -226,6 +397,7 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent] val carbonTable = compactionEvent.carbonTable val compactionType = compactionEvent.carbonMergerMapping.campactionType + val carbonLoadModel = compactionEvent.carbonLoadModel val sparkSession = compactionEvent.sparkSession if (CarbonUtil.hasAggregationDataMap(carbonTable)) { carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema => @@ -236,6 +408,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen compactionType.toString, Some(System.currentTimeMillis()), "") + operationContext.setProperty( + dataMapSchema.getRelationIdentifier.getDatabaseName + "_" + + dataMapSchema.getRelationIdentifier.getTableName + "_Segment", + carbonLoadModel.getSegmentId) CarbonAlterTableCompactionCommand(alterTableModel, operationContext = operationContext) .run(sparkSession) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index dac5d5e..1d4ebec 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -35,13 +35,16 @@ import org.apache.spark.sql.types.DataType import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -581,32 +584,33 @@ object PreAggregateUtil { * This method will start load process on the data map */ def startDataLoadForDataMap( - parentCarbonTable: CarbonTable, + parentTableIdentifier: TableIdentifier, segmentToLoad: String, validateSegments: Boolean, - sparkSession: SparkSession, - loadCommand: CarbonLoadDataCommand): Unit = { + loadCommand: CarbonLoadDataCommand, + isOverwrite: Boolean, + sparkSession: SparkSession): Unit = { CarbonSession.threadSet( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - parentCarbonTable.getDatabaseName + "." + - parentCarbonTable.getTableName, + parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." + + parentTableIdentifier.table, segmentToLoad) CarbonSession.threadSet( CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - parentCarbonTable.getDatabaseName + "." + - parentCarbonTable.getTableName, validateSegments.toString) + parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." + + parentTableIdentifier.table, validateSegments.toString) CarbonSession.updateSessionInfoToCurrentThread(sparkSession) try { loadCommand.processData(sparkSession) } finally { CarbonSession.threadUnset( CarbonCommonConstants.CARBON_INPUT_SEGMENTS + - parentCarbonTable.getDatabaseName + "." + - parentCarbonTable.getTableName) + parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." + + parentTableIdentifier.table) CarbonSession.threadUnset( CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + - parentCarbonTable.getDatabaseName + "." + - parentCarbonTable.getTableName) + parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." + + parentTableIdentifier.table) } } @@ -885,7 +889,8 @@ object PreAggregateUtil { dataMapIdentifier: TableIdentifier, dataFrame: DataFrame, isOverwrite: Boolean, - sparkSession: SparkSession): CarbonLoadDataCommand = { + sparkSession: SparkSession, + timeseriesParentTableName: String = ""): CarbonLoadDataCommand = { val headers = columns.asScala.filter { column => !column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE) }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",") @@ -896,7 +901,8 @@ object PreAggregateUtil { Map("fileheader" -> headers), isOverwriteTable = isOverwrite, dataFrame = None, - internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"), + internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true", + "timeseriesParent" -> timeseriesParentTableName), logicalPlan = Some(dataFrame.queryExecution.logical)) loadCommand } @@ -904,4 +910,5 @@ object PreAggregateUtil { def getDataFrame(sparkSession: SparkSession, child: LogicalPlan): DataFrame = { Dataset.ofRows(sparkSession, child) } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java index 78964e7..190c72c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java @@ -147,6 +147,19 @@ public class LoadEvents { return carbonTable; } } + + public static class LoadTablePostStatusUpdateEvent extends Event { + private CarbonLoadModel carbonLoadModel; + + public LoadTablePostStatusUpdateEvent(CarbonLoadModel carbonLoadModel) { + this.carbonLoadModel = carbonLoadModel; + } + + public CarbonLoadModel getCarbonLoadModel() { + return carbonLoadModel; + } + } + /** * Class for handling clean up in case of any failure and abort the operation. */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/d680e9cf/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 12fc5c1..3a83427 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -150,6 +150,22 @@ public final class CarbonLoaderUtil { public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite) throws IOException { + return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, ""); + } + + /** + * This API will write the load level metadata for the loadmanagement module inorder to + * manage the load and query execution management smoothly. + * + * @param newMetaEntry + * @param loadModel + * @param uuid + * @return boolean which determines whether status update is done or not. + * @throws IOException + */ + public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, + CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid) + throws IOException { boolean status = false; AbsoluteTableIdentifier absoluteTableIdentifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); @@ -159,7 +175,12 @@ public final class CarbonLoaderUtil { if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType); } - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + String tableStatusPath; + if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) { + tableStatusPath = carbonTablePath.getTableStatusFilePathWithUUID(uuid); + } else { + tableStatusPath = carbonTablePath.getTableStatusFilePath(); + } SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); int retryCount = CarbonLockUtil @@ -314,7 +335,6 @@ public final class CarbonLoaderUtil { new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); try { - dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); @@ -367,7 +387,7 @@ public final class CarbonLoaderUtil { public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model, - boolean insertOverwrite) throws IOException { + boolean insertOverwrite, String uuid) throws IOException { LoadMetadataDetails newLoadMetaEntry = new LoadMetadataDetails(); SegmentStatus status = SegmentStatus.INSERT_IN_PROGRESS; if (insertOverwrite) { @@ -381,18 +401,23 @@ public final class CarbonLoaderUtil { } CarbonLoaderUtil .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false); - boolean entryAdded = - CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite); + boolean entryAdded = CarbonLoaderUtil + .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid); if (!entryAdded) { throw new IOException("Dataload failed due to failure in table status updation for " + model.getTableName()); } } + public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model, + boolean insertOverwrite) throws IOException { + readAndUpdateLoadProgressInTableMeta(model, insertOverwrite, ""); + } + /** * This method will update the load failure entry in the table status file */ - public static void updateTableStatusForFailure(CarbonLoadModel model) + public static void updateTableStatusForFailure(CarbonLoadModel model, String uuid) throws IOException { // in case if failure the load status should be "Marked for delete" so that it will be taken // care during clean up @@ -404,14 +429,22 @@ public final class CarbonLoaderUtil { } CarbonLoaderUtil .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true); - boolean entryAdded = - CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false); + boolean entryAdded = CarbonLoaderUtil.recordNewLoadMetadata( + loadMetaEntry, model, false, false, uuid); if (!entryAdded) { throw new IOException( "Failed to update failure entry in table status for " + model.getTableName()); } } + /** + * This method will update the load failure entry in the table status file with empty uuid. + */ + public static void updateTableStatusForFailure(CarbonLoadModel model) + throws IOException { + updateTableStatusForFailure(model, ""); + } + public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier) throws IOException { Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =
