Repository: carbondata Updated Branches: refs/heads/master e43be5e74 -> 7e0803fec
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala index a987127..15ae30f 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala @@ -35,7 +35,6 @@ import org.apache.carbondata.events._ import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.MergeResultImpl -import org.apache.carbondata.spark.util.CommonUtil /** * This class is used to perform compaction on carbon table. @@ -201,6 +200,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, if (finalMergeStatus) { val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName) + var segmentFilesForIUDCompact = new util.ArrayList[Segment]() var segmentFileName: String = null if (carbonTable.isHivePartitionTable) { val readPath = @@ -220,6 +220,23 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, carbonLoadModel.getTablePath) } segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT + } else { + // Get the segment files each updated segment in case of IUD compaction + if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { + val segmentFilesList = loadsToMerge.asScala.map{seg => + val file = SegmentFileStore.writeSegmentFile( + carbonTable.getTablePath, + seg.getLoadName, + carbonLoadModel.getFactTimeStamp.toString) + new Segment(seg.getLoadName, file) + }.filter(_.getSegmentFileName != null).asJava + segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList) + } else { + segmentFileName = SegmentFileStore.writeSegmentFile( + carbonTable.getTablePath, + mergedLoadNumber, + carbonLoadModel.getFactTimeStamp.toString) + } } // trigger event for compaction val alterTableCompactionPreStatusUpdateEvent = @@ -238,11 +255,12 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel, CarbonDataMergerUtil .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge, carbonTable.getMetadataPath, - carbonLoadModel)) || + carbonLoadModel, + segmentFilesForIUDCompact)) || CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus( loadsToMerge, - carbonTable.getMetadataPath, - mergedLoadNumber, + carbonTable.getMetadataPath, + mergedLoadNumber, carbonLoadModel, compactionType, segmentFileName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala index 2d19fd4..b27a150 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala @@ -51,14 +51,14 @@ case class CarbonCountStar( val rowCount = CarbonUpdateUtil.getRowCount( tableInputFormat.getBlockRowCount( job, - absoluteTableIdentifier, + carbonTable, CarbonFilters.getPartitions( Seq.empty, sparkSession, TableIdentifier( carbonTable.getTableName, Some(carbonTable.getDatabaseName))).map(_.asJava).orNull), - absoluteTableIdentifier) + carbonTable) val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]]) val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray) val row = if (outUnsafeRows) unsafeProjection(value) else value http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index 8eaeab1..0c6d2ba 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -105,12 +105,12 @@ object DeleteExecution { val blockMappingVO = carbonInputFormat.getBlockRowCount( job, - absoluteTableIdentifier, + carbonTable, CarbonFilters.getPartitions( Seq.empty, sparkSession, TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull) - val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier) + val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable) CarbonUpdateUtil .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index f88e767..8c88d0e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -73,7 +73,7 @@ object HorizontalCompaction { // SegmentUpdateStatusManager reads the Table Status File and Table Update Status // file and save the content in segmentDetails and updateDetails respectively. val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager( - absTableIdentifier) + carbonTable) if (isUpdateOperation) { @@ -199,7 +199,7 @@ object HorizontalCompaction { .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length) val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName, - absTableIdentifier, + carbonTable, updateStatusDetails, timestamp) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index b583c6a..25a0d8e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -122,7 +122,9 @@ case class CarbonAlterTableAddHivePartitionCommand( CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false) val newMetaEntry = loadModel.getCurrentLoadMetadataDetail val segmentFileName = - loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT + SegmentFileStore.genSegmentFileName( + loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) + + CarbonTablePath.SEGMENT_EXT newMetaEntry.setSegmentFile(segmentFileName) val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath) CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index 9c2835e..756bc97 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -210,9 +210,7 @@ case class CarbonAlterTableDropPartitionCommand( for (thread <- threadArray) { thread.join() } - val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, - carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) + val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable) refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 1bdf414..929de0a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -231,9 +231,7 @@ case class CarbonAlterTableSplitPartitionCommand( threadArray.foreach { thread => thread.join() } - val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, - carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName) - val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier) + val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable) refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava) } catch { case e: Exception => http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 47da9a5..7123b93 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -96,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { private def driverSideCountStar(logicalRelation: LogicalRelation): Boolean = { val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] val segmentUpdateStatusManager = new SegmentUpdateStatusManager( - relation.carbonRelation.metaData.carbonTable.getAbsoluteTableIdentifier) + relation.carbonRelation.metaData.carbonTable) val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata() if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) { false http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index ea5eb42..5bc85f8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -141,7 +141,7 @@ public final class CarbonDataMergerUtil { */ public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus( List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath, - CarbonLoadModel carbonLoadModel) { + CarbonLoadModel carbonLoadModel, List<Segment> segmentFilesToBeUpdated) { boolean status = false; boolean updateLockStatus = false; @@ -171,7 +171,7 @@ public final class CarbonDataMergerUtil { carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(identifier); + new SegmentUpdateStatusManager(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); @@ -230,6 +230,13 @@ public final class CarbonDataMergerUtil { loadDetail .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); } + // Update segement file name to status file + int segmentFileIndex = + segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName())); + if (segmentFileIndex > -1) { + loadDetail.setSegmentFile( + segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName()); + } } } @@ -1135,18 +1142,17 @@ public final class CarbonDataMergerUtil { * * @param seg * @param blockName - * @param absoluteTableIdentifier * @param segmentUpdateDetails * @param timestamp * @return * @throws IOException */ public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg, - String blockName, AbsoluteTableIdentifier absoluteTableIdentifier, + String blockName, CarbonTable table, SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException { SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + new SegmentUpdateStatusManager(table); List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 65827b0..aabe91a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -47,7 +47,6 @@ import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; -import org.apache.carbondata.core.metadata.SegmentFileStore; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; @@ -334,60 +333,6 @@ public final class CarbonLoaderUtil { return status; } - /** - * This API will update the segmentFile of a passed segment. - * - * @return boolean which determines whether status update is done or not. - * @throws IOException - */ - private static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile) - throws IOException { - boolean status = false; - String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath); - String metadataPath = CarbonTablePath.getMetadataPath(tablePath); - AbsoluteTableIdentifier absoluteTableIdentifier = - AbsoluteTableIdentifier.from(tablePath, null, null); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); - int retryCount = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT); - int maxTimeout = CarbonLockUtil - .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, - CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT); - try { - if (carbonLock.lockWithRetries(retryCount, maxTimeout)) { - LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation"); - LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(metadataPath); - - for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) { - // if the segments is in the list of marked for delete then update the status. - if (segmentId.equals(detail.getLoadName())) { - detail.setSegmentFile(segmentFile); - break; - } - } - - SegmentStatusManager - .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); - status = true; - } else { - LOGGER.error( - "Not able to acquire the lock for Table status updation for table path " + tablePath); - } - ; - } finally { - if (carbonLock.unlock()) { - LOGGER.info("Table unlocked successfully after table status updation" + tablePath); - } else { - LOGGER.error( - "Unable to unlock Table lock for table" + tablePath + " during table status updation"); - } - } - return status; - } - private static void addToStaleFolders(AbsoluteTableIdentifier identifier, List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException { String path = CarbonTablePath.getSegmentPath( @@ -1102,26 +1047,15 @@ public final class CarbonLoaderUtil { /** * Merge index files with in the segment of partitioned table * @param segmentId - * @param tablePath + * @param table * @return * @throws IOException */ - public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath) + public static String mergeIndexFilesinPartitionedSegment(String segmentId, CarbonTable table) throws IOException { - CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus = - new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath); - String uniqueId = ""; - if (segmentIndexFIleMergeStatus != null) { - uniqueId = System.currentTimeMillis() + ""; - String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT; - String path = - CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR - + newSegmentFileName; - SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path); - updateSegmentFile(tablePath, segmentId, newSegmentFileName); - deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted()); - } - return uniqueId; + String tablePath = table.getTablePath(); + return new CarbonIndexFileMergeWriter(table) + .mergeCarbonIndexFilesOfSegment(segmentId, tablePath); } private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
