http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java new file mode 100644 index 0000000..ee667c2 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -0,0 +1,1385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.merger; + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.CarbonLockFactory; +import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.mutate.CarbonUpdateUtil; +import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; +import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; +import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonStorePath; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; +import org.apache.carbondata.processing.model.CarbonLoadModel; + +/** + * utility class for load merging. + */ +public final class CarbonDataMergerUtil { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName()); + + private CarbonDataMergerUtil() { + + } + + /** + * Returns the size of all the carbondata files present in the segment. + * @param carbonFile + * @return + */ + private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) { + long factSize = 0; + + // carbon data file case. + CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() { + + @Override public boolean accept(CarbonFile file) { + return CarbonTablePath.isCarbonDataFile(file.getName()); + } + }); + + for (CarbonFile fact : factFile) { + factSize += fact.getSize(); + } + + return factSize; + } + + /** + * To check whether the merge property is enabled or not. + * + * @return + */ + + public static boolean checkIfAutoLoadMergingRequired() { + // load merge is not supported as per new store format + // moving the load merge check in early to avoid unnecessary load listing causing IOException + // check whether carbons segment merging operation is enabled or not. + // default will be false. + String isLoadMergeEnabled = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, + CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE); + if (isLoadMergeEnabled.equalsIgnoreCase("false")) { + return false; + } + return true; + } + + /** + * Form the Name of the New Merge Folder + * + * @param segmentsToBeMergedList + * @return + */ + public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) { + String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName(); + if (firstSegmentName.contains(".")) { + String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf(".")); + String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1); + int fraction = Integer.parseInt(afterDecimal) + 1; + String mergedSegmentName = beforeDecimal + "." + fraction; + return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName; + } else { + String mergeName = firstSegmentName + "." + 1; + return CarbonCommonConstants.LOAD_FOLDER + mergeName; + } + + } + + + /** + * Form the Name of the New Merge Folder + * + * @param segmentToBeMerged + * @return + */ + public static String getMergedLoadName(final String segmentToBeMerged) { + String firstSegmentName = segmentToBeMerged; + if (firstSegmentName.contains(".")) { + String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf(".")); + String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1); + int fraction = Integer.parseInt(afterDecimal) + 1; + String mergedSegmentName = beforeDecimal + "." + fraction; + return mergedSegmentName; + } else { + String mergeName = firstSegmentName + "." + 1; + return mergeName; + } + + } + + /** + * Update Both Segment Update Status and Table Status for the case of IUD Delete + * delta compaction. + * + * @param loadsToMerge + * @param metaDataFilepath + * @param carbonLoadModel + * @return + */ + public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus( + List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath, + CarbonLoadModel carbonLoadModel) { + + boolean status = false; + boolean updateLockStatus = false; + boolean tableLockStatus = false; + + String timestamp = carbonLoadModel.getFactTimeStamp(); + + List<String> updatedDeltaFilesList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update + // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with + // Table Status Metadata file (For Update Block Compaction) it has to update the + // Table Update Status Metadata File (For corresponding Delete Delta File). + // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in + // A) Table Update Status Metadata File (Block Level) + // * For each blocks which is being compacted Mark 'Compacted' as the Status. + // B) Table Status Metadata file (Segment Level) + // * loadStatus won't be changed to "compacted' + // * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current + // timestamp (which is being passed from driver) + // First the Table Update Status Metadata File should be updated as we need to get + // the updated blocks for the segment from Table Status Metadata Update Delta Start and + // End Timestamp. + + // Table Update Status Metadata Update. + AbsoluteTableIdentifier absoluteTableIdentifier = + carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + SegmentUpdateStatusManager segmentUpdateStatusManager = + new SegmentUpdateStatusManager(absoluteTableIdentifier); + + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + + ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); + ICarbonLock statusLock = segmentStatusManager.getTableStatusLock(); + + // Update the Compacted Blocks with Compacted Status. + try { + updatedDeltaFilesList = segmentUpdateStatusManager + .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName().toString()); + } catch (Exception e) { + LOGGER.error("Error while getting the Update Delta Blocks."); + status = false; + return status; + } + + if (updatedDeltaFilesList.size() > 0) { + try { + updateLockStatus = updateLock.lockWithRetries(); + tableLockStatus = statusLock.lockWithRetries(); + + List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size()); + + for (String compactedBlocks : updatedDeltaFilesList) { + // Try to BlockName + String fullBlock = compactedBlocks; + int endIndex = fullBlock.lastIndexOf(File.separator); + String blkNoExt = fullBlock.substring(endIndex + 1, fullBlock.lastIndexOf("-")); + blockNames.add(blkNoExt); + } + + if (updateLockStatus && tableLockStatus) { + + SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager + .readLoadMetadata(); + + for (String compactedBlocks : blockNames) { + // Check is the compactedBlocks name matches with oldDetails + for (int i = 0; i < updateLists.length; i++) { + if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks) + && !CarbonCommonConstants.COMPACTED.equalsIgnoreCase(updateLists[i].getStatus()) + && !CarbonCommonConstants.MARKED_FOR_DELETE + .equalsIgnoreCase(updateLists[i].getStatus())) { + updateLists[i].setStatus(CarbonCommonConstants.COMPACTED); + } + } + } + + LoadMetadataDetails[] loadDetails = + segmentStatusManager.readLoadMetadata(metaDataFilepath); + + for (LoadMetadataDetails loadDetail : loadDetails) { + if (loadsToMerge.contains(loadDetail)) { + loadDetail.setUpdateDeltaStartTimestamp(timestamp); + loadDetail.setUpdateDeltaEndTimestamp(timestamp); + if (loadDetail.getLoadName().equalsIgnoreCase("0")) { + loadDetail + .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); + } + } + } + + try { + segmentUpdateStatusManager + .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); + segmentStatusManager + .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); + status = true; + } catch (IOException e) { + LOGGER.error( + "Error while writing metadata. The metadata file path is " + carbonTablePath + .getMetadataDirectoryPath()); + status = false; + } + } else { + LOGGER.error("Not able to acquire the lock."); + status = false; + } + } catch (Exception e) { + LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath + .getMetadataDirectoryPath()); + status = false; + + } finally { + if (updateLockStatus) { + if (updateLock.unlock()) { + LOGGER.info("Unlock the segment update lock successfully."); + } else { + LOGGER.error("Not able to unlock the segment update lock."); + } + } + if (tableLockStatus) { + if (statusLock.unlock()) { + LOGGER.info("Unlock the table status lock successfully."); + } else { + LOGGER.error("Not able to unlock the table status lock."); + } + } + } + } + return status; + } + + /** + * method to update table status in case of IUD Update Delta Compaction. + * @param loadsToMerge + * @param metaDataFilepath + * @param MergedLoadName + * @param carbonLoadModel + * @param compactionType + * @return + */ + public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge, + String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel, + long mergeLoadStartTime, CompactionType compactionType) { + + boolean tableStatusUpdationStatus = false; + AbsoluteTableIdentifier absoluteTableIdentifier = + carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + + try { + if (carbonLock.lockWithRetries()) { + LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "." + + carbonLoadModel.getTableName() + " for table status updation "); + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String statusFilePath = carbonTablePath.getTableStatusFilePath(); + + LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); + + String mergedLoadNumber = MergedLoadName.substring( + MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length()); + + long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime(); + for (LoadMetadataDetails loadDetail : loadDetails) { + // check if this segment is merged. + if (loadsToMerge.contains(loadDetail)) { + // if the compacted load is deleted after the start of the compaction process, + // then need to discard the compaction process and treat it as failed compaction. + if (loadDetail.getLoadStatus() + .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) { + LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName() + + " is deleted after the compaction is started."); + return false; + } + loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED); + loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp); + loadDetail.setMergedLoadName(mergedLoadNumber); + } + } + + // create entry for merged one. + LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); + loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId()); + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS); + long loadEnddate = CarbonUpdateUtil.readCurrentTime(); + loadMetadataDetails.setLoadEndTime(loadEnddate); + loadMetadataDetails.setLoadName(mergedLoadNumber); + loadMetadataDetails.setLoadStartTime(mergeLoadStartTime); + loadMetadataDetails.setPartitionCount("0"); + // if this is a major compaction then set the segment as major compaction. + if (compactionType == CompactionType.MAJOR_COMPACTION) { + loadMetadataDetails.setMajorCompacted("true"); + } + + List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails)); + + // put the merged folder entry + updatedDetailsList.add(loadMetadataDetails); + + try { + SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath, + updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()])); + tableStatusUpdationStatus = true; + } catch (IOException e) { + LOGGER.error("Error while writing metadata"); + } + } else { + LOGGER.error( + "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "." + + carbonLoadModel.getTableName() + "for table status updation"); + } + } finally { + if (carbonLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel + .getDatabaseName() + "." + carbonLoadModel.getTableName()); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "." + + carbonLoadModel.getTableName() + " during table status updation"); + } + } + return tableStatusUpdationStatus; + } + + /** + * To identify which all segments can be merged. + * + * @param storeLocation + * @param carbonLoadModel + * @param compactionSize + * @return + */ + public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation, + CarbonLoadModel carbonLoadModel, long compactionSize, + List<LoadMetadataDetails> segments, CompactionType compactionType) { + + List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments); + + sortSegments(sortedSegments); + + // Check for segments which are qualified for IUD compaction. + if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) { + + List<LoadMetadataDetails> listOfSegmentsToBeMerged = + identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel); + + return listOfSegmentsToBeMerged; + } + + // check preserve property and preserve the configured number of latest loads. + + List<LoadMetadataDetails> listOfSegmentsAfterPreserve = + checkPreserveSegmentsPropertyReturnRemaining(sortedSegments); + + // filter the segments if the compaction based on days is configured. + + List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval = + identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve); + List<LoadMetadataDetails> listOfSegmentsToBeMerged; + // identify the segments to merge based on the Size of the segments across partition. + if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) { + + listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize, + listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation); + } else { + + listOfSegmentsToBeMerged = + identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval); + } + + return listOfSegmentsToBeMerged; + } + + /** + * Sorting of the segments. + * @param segments + */ + public static void sortSegments(List segments) { + // sort the segment details. + Collections.sort(segments, new Comparator<LoadMetadataDetails>() { + @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) { + double seg1Id = Double.parseDouble(seg1.getLoadName()); + double seg2Id = Double.parseDouble(seg2.getLoadName()); + if (seg1Id - seg2Id < 0) { + return -1; + } + if (seg1Id - seg2Id > 0) { + return 1; + } + return 0; + } + }); + } + + /** + * This method will return the list of loads which are loaded at the same interval. + * This property is configurable. + * + * @param listOfSegmentsBelowThresholdSize + * @return + */ + private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate( + List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) { + + List<LoadMetadataDetails> loadsOfSameDate = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + long numberOfDaysAllowedToMerge = 0; + try { + numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, + CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT)); + if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) { + LOGGER.error( + "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT + + " is incorrect." + + " Correct value should be in range of 0 -100. Taking the default value."); + numberOfDaysAllowedToMerge = + Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT); + } + + } catch (NumberFormatException e) { + numberOfDaysAllowedToMerge = + Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT); + } + // if true then process loads according to the load date. + if (numberOfDaysAllowedToMerge > 0) { + + // filter loads based on the loaded date + boolean first = true; + Date segDate1 = null; + SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); + for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) { + + if (first) { + segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf); + first = false; + continue; + } + long segmentDate = segment.getLoadStartTime(); + Date segDate2 = null; + try { + segDate2 = sdf.parse(sdf.format(segmentDate)); + } catch (ParseException e) { + LOGGER.error("Error while parsing segment start time" + e.getMessage()); + } + + if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) { + loadsOfSameDate.add(segment); + } + // if the load is beyond merged date. + // then reset everything and continue search for loads. + else if (loadsOfSameDate.size() < 2) { + loadsOfSameDate.clear(); + // need to add the next segment as first and to check further + segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf); + } else { // case where a load is beyond merge date and there is at least 2 loads to merge. + break; + } + } + } else { + return listOfSegmentsBelowThresholdSize; + } + + return loadsOfSameDate; + } + + /** + * @param loadsOfSameDate + * @param segment + * @return + */ + private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate, + LoadMetadataDetails segment, SimpleDateFormat sdf) { + long baselineLoadStartTime = segment.getLoadStartTime(); + Date segDate1 = null; + try { + segDate1 = sdf.parse(sdf.format(baselineLoadStartTime)); + } catch (ParseException e) { + LOGGER.error("Error while parsing segment start time" + e.getMessage()); + } + loadsOfSameDate.add(segment); + return segDate1; + } + + /** + * Method to check if the load dates are complied to the configured dates. + * + * @param segDate1 + * @param segDate2 + * @return + */ + private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2, + long numberOfDaysAllowedToMerge) { + if (segDate1 == null || segDate2 == null) { + return false; + } + // take 1 st date add the configured days . + Calendar cal1 = Calendar.getInstance(); + cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate()); + Calendar cal2 = Calendar.getInstance(); + cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate()); + + long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis(); + + if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) { + return true; + } + return false; + } + + /** + * Identify the segments to be merged based on the Size in case of Major compaction. + * + * @param compactionSize + * @param listOfSegmentsAfterPreserve + * @param carbonLoadModel + * @param storeLocation + * @return + */ + private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize( + long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve, + CarbonLoadModel carbonLoadModel, String storeLocation) { + + List<LoadMetadataDetails> segmentsToBeMerged = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + CarbonTableIdentifier tableIdentifier = + carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(); + + + // total length + long totalLength = 0; + + // check size of each segment , sum it up across partitions + for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) { + + String segId = segment.getLoadName(); + // variable to store one segment size across partition. + long sizeOfOneSegmentAcrossPartition = + getSizeOfSegment(storeLocation, tableIdentifier, segId); + + // if size of a segment is greater than the Major compaction size. then ignore it. + if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) { + // if already 2 segments have been found for merging then stop scan here and merge. + if (segmentsToBeMerged.size() > 1) { + break; + } else { // if only one segment is found then remove the earlier one in list. + // reset the total length to 0. + segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + totalLength = 0; + continue; + } + } + + totalLength += sizeOfOneSegmentAcrossPartition; + + // in case of major compaction the size doesnt matter. all the segments will be merged. + if (totalLength < (compactionSize * 1024 * 1024)) { + segmentsToBeMerged.add(segment); + } else { // if already 2 segments have been found for merging then stop scan here and merge. + if (segmentsToBeMerged.size() > 1) { + break; + } else { // if only one segment is found then remove the earlier one in list and put this. + // reset the total length to the current identified segment. + segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + segmentsToBeMerged.add(segment); + totalLength = sizeOfOneSegmentAcrossPartition; + } + } + + } + + return segmentsToBeMerged; + } + + /** + * For calculating the size of the specified segment + * @param storeLocation + * @param tableIdentifier + * @param segId + * @return + */ + private static long getSizeOfSegment(String storeLocation, + CarbonTableIdentifier tableIdentifier, String segId) { + String loadPath = getStoreLocation(storeLocation, tableIdentifier, segId); + CarbonFile segmentFolder = + FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); + return getSizeOfFactFileInLoad(segmentFolder); + } + + /** + * This method will get the store location for the given path, segemnt id and partition id + * + * @param storePath + * @param carbonTableIdentifier + * @param segmentId + * @return + */ + private static String getStoreLocation(String storePath, + CarbonTableIdentifier carbonTableIdentifier, String segmentId) { + CarbonTablePath carbonTablePath = + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier); + return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + } + + + /** + * Identify the segments to be merged based on the segment count + * + * @param listOfSegmentsAfterPreserve + * @return + */ + private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount( + List<LoadMetadataDetails> listOfSegmentsAfterPreserve) { + + List<LoadMetadataDetails> mergedSegments = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + List<LoadMetadataDetails> unMergedSegments = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + + int[] noOfSegmentLevelsCount = + CarbonProperties.getInstance().getCompactionSegmentLevelCount(); + + int level1Size = 0; + int level2Size = 0; + int size = noOfSegmentLevelsCount.length; + + if (size >= 2) { + level1Size = noOfSegmentLevelsCount[0]; + level2Size = noOfSegmentLevelsCount[1]; + } else if (size == 1) { + level1Size = noOfSegmentLevelsCount[0]; + } + + int unMergeCounter = 0; + int mergeCounter = 0; + + // check size of each segment , sum it up across partitions + for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) { + + String segName = segment.getLoadName(); + + // if a segment is already merged 2 levels then it s name will become .2 + // need to exclude those segments from minor compaction. + // if a segment is major compacted then should not be considered for minor. + if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || ( + segment.isMajorCompacted() != null && segment.isMajorCompacted() + .equalsIgnoreCase("true"))) { + continue; + } + + // check if the segment is merged or not + + if (!isMergedSegment(segName)) { + //if it is an unmerged segment then increment counter + unMergeCounter++; + unMergedSegments.add(segment); + if (unMergeCounter == (level1Size)) { + return unMergedSegments; + } + } else { + mergeCounter++; + mergedSegments.add(segment); + if (mergeCounter == (level2Size)) { + return mergedSegments; + } + } + } + return new ArrayList<>(0); + } + + /** + * To check if the segment is merged or not. + * @param segName + * @return + */ + private static boolean isMergedSegment(String segName) { + if (segName.contains(".")) { + return true; + } + return false; + } + + /** + * checks number of loads to be preserved and returns remaining valid segments + * + * @param segments + * @return + */ + private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining( + List<LoadMetadataDetails> segments) { + // check whether the preserving of the segments from merging is enabled or not. + // get the number of loads to be preserved. + int numberOfSegmentsToBePreserved = + CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved(); + // get the number of valid segments and retain the latest loads from merging. + return CarbonDataMergerUtil + .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved); + } + + /** + * Retain the number of segments which are to be preserved and return the remaining list of + * segments. + * + * @param loadMetadataDetails + * @param numberOfSegToBeRetained + * @return + */ + private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining( + List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) { + + List<LoadMetadataDetails> validList = + new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + for (LoadMetadataDetails segment : loadMetadataDetails) { + if (isSegmentValid(segment)) { + validList.add(segment); + } + } + + // check if valid list is big enough for removing the number of seg to be retained. + // last element + int removingIndex = validList.size() - 1; + + for (int i = validList.size(); i > 0; i--) { + if (numberOfSegToBeRetained == 0) { + break; + } + // remove last segment + validList.remove(removingIndex--); + numberOfSegToBeRetained--; + } + return validList; + + } + + /** + * This will give the compaction sizes configured based on compaction type. + * + * @param compactionType + * @return + */ + public static long getCompactionSize(CompactionType compactionType) { + + long compactionSize = 0; + switch (compactionType) { + case MAJOR_COMPACTION: + compactionSize = CarbonProperties.getInstance().getMajorCompactionSize(); + break; + default: // this case can not come. + } + return compactionSize; + } + + /** + * For getting the comma separated valid segments for merging. + * + * @param loadMetadataDetails + * @return + */ + public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) { + StringBuilder builder = new StringBuilder(); + for (LoadMetadataDetails segment : loadMetadataDetails) { + //check if this load is an already merged load. + if (null != segment.getMergedLoadName()) { + builder.append(segment.getMergedLoadName()).append(","); + } else { + builder.append(segment.getLoadName()).append(","); + } + } + builder.deleteCharAt(builder.length() - 1); + return builder.toString(); + } + + /** + * This method returns the valid segments attached to the table Identifier. + * + * @param absoluteTableIdentifier + * @return + */ + public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) + throws IOException { + + SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null; + try { + validAndInvalidSegments = + new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); + } catch (IOException e) { + LOGGER.error("Error while getting valid segment list for a table identifier"); + throw new IOException(); + } + return validAndInvalidSegments.getValidSegments(); + } + + + /** + * Removing the already merged segments from list. + */ + public static List<LoadMetadataDetails> filterOutNewlyAddedSegments( + List<LoadMetadataDetails> segments, + LoadMetadataDetails lastSeg) { + + // take complete list of segments. + List<LoadMetadataDetails> list = new ArrayList<>(segments); + // sort list + CarbonDataMergerUtil.sortSegments(list); + + // first filter out newly added segments. + return list.subList(0, list.indexOf(lastSeg) + 1); + + } + + /** + * method to identify the segments qualified for merging in case of IUD Compaction. + * + * @param carbonLoadModel + * @param compactionType + * @return + */ + private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD( + List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) { + + List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size()); + + AbsoluteTableIdentifier absoluteTableIdentifier = + carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); + + int numberUpdateDeltaFilesThreshold = + CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); + for (LoadMetadataDetails seg : segments) { + if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(), + absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(), + numberUpdateDeltaFilesThreshold)) { + validSegments.add(seg); + } + } + return validSegments; + } + + private static boolean isSegmentValid(LoadMetadataDetails seg) { + return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) + || seg.getLoadStatus() + .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg + .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE); + } + + /** + * method gets the segments list which get qualified for IUD compaction. + * @param Segments + * @param absoluteTableIdentifier + * @param compactionTypeIUD + * @return + */ + public static List<String> getSegListIUDCompactionQualified(List<String> Segments, + AbsoluteTableIdentifier absoluteTableIdentifier, + SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) { + + List<String> validSegments = new ArrayList<>(); + + if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) { + int numberDeleteDeltaFilesThreshold = + CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction(); + List<String> deleteSegments = new ArrayList<>(); + for (String seg : Segments) { + if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager, + numberDeleteDeltaFilesThreshold)) { + deleteSegments.add(seg); + } + } + if (deleteSegments.size() > 0) { + // This Code Block Append the Segname along with the Blocks selected for Merge instead of + // only taking the segment name. This will help to parallelize better for each block + // in case of Delete Horizontal Compaction. + for (String segName : deleteSegments) { + List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager, + numberDeleteDeltaFilesThreshold); + if (tempSegments != null) { + for (String tempSeg : tempSegments) { + validSegments.add(tempSeg); + } + } + } + } + } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) { + int numberUpdateDeltaFilesThreshold = + CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction(); + for (String seg : Segments) { + if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager, + numberUpdateDeltaFilesThreshold)) { + validSegments.add(seg); + } + } + } + return validSegments; + } + + /** + * Check if the blockname of the segment belongs to the Valid Update Delta List or not. + * @param seg + * @param blkName + * @param segmentUpdateStatusManager + * @return + */ + public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName, + SegmentUpdateStatusManager segmentUpdateStatusManager) { + + List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg); + + String fullBlock = blkName; + String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR); + String blockName = FileParts[FileParts.length - 1]; + + for (String str : list) { + if (str.contains(blockName)) { + return true; + } + } + return false; + } + + /** + * This method traverses Update Delta Files inside the seg and return true + * if UpdateDelta Files are more than IUD Compaction threshold. + * + * @param seg + * @param absoluteTableIdentifier + * @param segmentUpdateStatusManager + * @param numberDeltaFilesThreshold + * @return + */ + public static Boolean checkUpdateDeltaFilesInSeg(String seg, + AbsoluteTableIdentifier absoluteTableIdentifier, + SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { + + CarbonFile[] updateDeltaFiles = null; + Set<String> uniqueBlocks = new HashSet<String>(); + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg); + CarbonFile segDir = + FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); + CarbonFile[] allSegmentFiles = segDir.listFiles(); + + updateDeltaFiles = segmentUpdateStatusManager + .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, + false, allSegmentFiles); + + if (updateDeltaFiles == null) { + return false; + } + + // The update Delta files may have Spill over blocks. Will consider multiple spill over + // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which + // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate + // Spill Over Blocks will choose files with unique taskID. + for (CarbonFile blocks : updateDeltaFiles) { + // Get Task ID and the Timestamp from the Block name for e.g. + // part-0-3-1481084721319.carbondata => "3-1481084721319" + String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName()); + String timestamp = + CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName()); + String taskAndTimeStamp = task + "-" + timestamp; + uniqueBlocks.add(taskAndTimeStamp); + } + if (uniqueBlocks.size() > numberDeltaFilesThreshold) { + return true; + } else { + return false; + } + } + + /** + * Check is the segment passed qualifies for IUD delete delta compaction or not i.e. + * if the number of delete delta files present in the segment is more than + * numberDeltaFilesThreshold. + * + * @param seg + * @param segmentUpdateStatusManager + * @param numberDeltaFilesThreshold + * @return + */ + private static boolean checkDeleteDeltaFilesInSeg(String seg, + SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { + + Set<String> uniqueBlocks = new HashSet<String>(); + List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); + + for (final String blockName : blockNameList) { + + CarbonFile[] deleteDeltaFiles = + segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); + + // The Delete Delta files may have Spill over blocks. Will consider multiple spill over + // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which + // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate + // Spill Over Blocks will choose files with unique taskID. + for (CarbonFile blocks : deleteDeltaFiles) { + // Get Task ID and the Timestamp from the Block name for e.g. + // part-0-3-1481084721319.carbondata => "3-1481084721319" + String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName()); + String timestamp = + CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName()); + String taskAndTimeStamp = task + "-" + timestamp; + uniqueBlocks.add(taskAndTimeStamp); + } + + if (uniqueBlocks.size() > numberDeltaFilesThreshold) { + return true; + } + } + return false; + } + + /** + * Check is the segment passed qualifies for IUD delete delta compaction or not i.e. + * if the number of delete delta files present in the segment is more than + * numberDeltaFilesThreshold. + * @param seg + * @param segmentUpdateStatusManager + * @param numberDeltaFilesThreshold + * @return + */ + + private static List<String> getDeleteDeltaFilesInSeg(String seg, + SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { + + List<String> blockLists = new ArrayList<>(); + List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg); + + for (final String blockName : blockNameList) { + + CarbonFile[] deleteDeltaFiles = + segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); + + if (deleteDeltaFiles.length > numberDeltaFilesThreshold) { + blockLists.add(seg + "/" + blockName); + } + } + return blockLists; + } + + /** + * Returns true is horizontal compaction is enabled. + * @return + */ + public static boolean isHorizontalCompactionEnabled() { + if ((CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled, + CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) { + return true; + } else { + return false; + } + } + + /** + * method to compact Delete Delta files in case of IUD Compaction. + * + * @param seg + * @param blockName + * @param absoluteTableIdentifier + * @param segmentUpdateDetails + * @param timestamp + * @return + * @throws IOException + */ + public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg, + String blockName, AbsoluteTableIdentifier absoluteTableIdentifier, + SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException { + + SegmentUpdateStatusManager segmentUpdateStatusManager = + new SegmentUpdateStatusManager(absoluteTableIdentifier); + + List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1); + + // set the update status. + segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails); + + CarbonFile[] deleteDeltaFiles = + segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); + + String destFileName = + blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT; + String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath() + + CarbonCommonConstants.FILE_SEPARATOR + destFileName; + + List<String> deleteFilePathList = new ArrayList<String>(); + for (CarbonFile cFile : deleteDeltaFiles) { + deleteFilePathList.add(cFile.getCanonicalPath()); + } + + CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult(); + blockDetails.setBlockName(blockName); + blockDetails.setSegmentName(seg); + blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString()); + blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString()); + + try { + if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) { + blockDetails.setCompactionStatus(true); + } else { + blockDetails.setCompactionStatus(false); + } + resultList.add(blockDetails); + } catch (IOException e) { + LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is " + + fullBlockFilePath); + throw new IOException(); + } + return resultList; + } + + /** + * this method compact the delete delta files. + * @param deleteDeltaFiles + * @param blockName + * @param fullBlockFilePath + * @return + */ + public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles, + String blockName, String fullBlockFilePath) throws IOException { + + DeleteDeltaBlockDetails deleteDeltaBlockDetails = null; + CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); + try { + deleteDeltaBlockDetails = + dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName); + } catch (Exception e) { + String blockFilePath = fullBlockFilePath + .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); + LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath); + throw new IOException(); + } + CarbonDeleteDeltaWriterImpl carbonDeleteWriter = + new CarbonDeleteDeltaWriterImpl(fullBlockFilePath, + FileFactory.getFileType(fullBlockFilePath)); + try { + carbonDeleteWriter.write(deleteDeltaBlockDetails); + } catch (IOException e) { + LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath); + throw new IOException(); + } + return true; + } + + public static Boolean updateStatusFile( + List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table, + String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) { + + List<SegmentUpdateDetails> segmentUpdateDetails = + new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size()); + + + // Check the list output. + for (CarbonDataMergerUtilResult carbonDataMergerUtilResult : updateDataMergerDetailsList) { + if (carbonDataMergerUtilResult.getCompactionStatus()) { + SegmentUpdateDetails tempSegmentUpdateDetails = new SegmentUpdateDetails(); + tempSegmentUpdateDetails.setSegmentName(carbonDataMergerUtilResult.getSegmentName()); + tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName()); + + for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager + .getUpdateStatusDetails()) { + if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName()) + && origDetails.getSegmentName() + .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) { + + tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock()); + tempSegmentUpdateDetails.setStatus(origDetails.getStatus()); + break; + } + } + + tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp( + carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp()); + tempSegmentUpdateDetails + .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp()); + + segmentUpdateDetails.add(tempSegmentUpdateDetails); + } else return false; + } + + CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true); + + // Update the Table Status. + String metaDataFilepath = table.getMetaDataFilepath(); + AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); + + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + + boolean lockStatus = false; + + try { + lockStatus = carbonLock.lockWithRetries(); + if (lockStatus) { + LOGGER.info( + "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + + " for table status updation"); + + LoadMetadataDetails[] listOfLoadFolderDetailsArray = + segmentStatusManager.readLoadMetadata(metaDataFilepath); + + for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { + if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { + loadMetadata.setUpdateStatusFileName( + CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); + } + } + try { + segmentStatusManager + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + } catch (IOException e) { + return false; + } + } else { + LOGGER.error("Not able to acquire the lock for Table status updation for table " + table + .getDatabaseName() + "." + table.getFactTableName()); + } + } finally { + if (lockStatus) { + if (carbonLock.unlock()) { + LOGGER.info( + "Table unlocked successfully after table status updation" + table.getDatabaseName() + + "." + table.getFactTableName()); + } else { + LOGGER.error( + "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table + .getFactTableName() + " during table status updation"); + } + } + } + return true; + } + + /** + * This will update the property of segments as major compacted. + * @param model + * @param changedSegDetails + */ + public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model, + List<LoadMetadataDetails> changedSegDetails, + List<LoadMetadataDetails> preservedSegment) throws Exception { + + String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath(); + AbsoluteTableIdentifier absoluteTableIdentifier = + model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath); + List<LoadMetadataDetails> originalList = Arrays.asList(details); + for (LoadMetadataDetails segment : changedSegDetails) { + if (preservedSegment.contains(segment)) { + continue; + } + originalList.get(originalList.indexOf(segment)).setMajorCompacted("true"); + + } + + + ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( + model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(), + LockUsage.TABLE_STATUS_LOCK); + + try { + if (carbonTableStatusLock.lockWithRetries()) { + LOGGER.info( + "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName() + + " for table status updation "); + CarbonTablePath carbonTablePath = CarbonStorePath + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); + + segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), + originalList.toArray(new LoadMetadataDetails[originalList.size()])); + } else { + LOGGER.error( + "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model + .getTableName() + "for table status updation"); + throw new Exception("Failed to update the MajorCompactionStatus."); + } + } catch (IOException e) { + LOGGER.error("Error while writing metadata"); + throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage()); + } finally { + if (carbonTableStatusLock.unlock()) { + LOGGER.info( + "Table unlocked successfully after table status updation" + model.getDatabaseName() + + "." + model.getTableName()); + } else { + LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model + .getTableName() + " during table status updation"); + } + } + + } +}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java new file mode 100644 index 0000000..aa3d801 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.merger; + +import org.apache.carbondata.core.mutate.SegmentUpdateDetails; + +public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails { + private boolean compactionStatus; + + public boolean getCompactionStatus() { + return compactionStatus; + } + + public void setCompactionStatus(Boolean status) { + compactionStatus = status; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java new file mode 100644 index 0000000..ebf3683 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; +import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * This class will process the query result and convert the data + * into a format compatible for data load + */ +public class CompactionResultSortProcessor extends AbstractResultProcessor { + + /** + * LOGGER + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName()); + /** + * carbon load model that contains all the required information for load + */ + private CarbonLoadModel carbonLoadModel; + /** + * carbon table + */ + private CarbonTable carbonTable; + /** + * sortDataRows instance for sorting each row read ad writing to sort temp file + */ + private SortDataRows sortDataRows; + /** + * final merger for merge sort + */ + private SingleThreadFinalSortFilesMerger finalMerger; + /** + * data handler VO object + */ + private CarbonFactHandler dataHandler; + /** + * segment properties for getting dimension cardinality and other required information of a block + */ + private SegmentProperties segmentProperties; + /** + * compaction type to decide whether taskID need to be extracted from carbondata files + */ + private CompactionType compactionType; + /** + * boolean mapping for no dictionary columns in schema + */ + private boolean[] noDictionaryColMapping; + /** + * agg type defined for measures + */ + private char[] aggType; + /** + * segment id + */ + private String segmentId; + /** + * temp store location to be sued during data load + */ + private String tempStoreLocation; + /** + * table name + */ + private String tableName; + /** + * no dictionary column count in schema + */ + private int noDictionaryCount; + /** + * total count of measures in schema + */ + private int measureCount; + /** + * dimension count excluding complex dimension and no dictionary column count + */ + private int dimensionColumnCount; + /** + * whether the allocated tasks has any record + */ + private boolean isRecordFound; + + /** + * @param carbonLoadModel + * @param carbonTable + * @param segmentProperties + * @param compactionType + * @param tableName + */ + public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable, + SegmentProperties segmentProperties, CompactionType compactionType, String tableName) { + this.carbonLoadModel = carbonLoadModel; + this.carbonTable = carbonTable; + this.segmentProperties = segmentProperties; + this.segmentId = carbonLoadModel.getSegmentId(); + this.compactionType = compactionType; + this.tableName = tableName; + } + + /** + * This method will iterate over the query result and convert it into a format compatible + * for data loading + * + * @param resultIteratorList + */ + public boolean execute(List<RawResultIterator> resultIteratorList) { + boolean isCompactionSuccess = false; + try { + initTempStoreLocation(); + initSortDataRows(); + initAggType(); + processResult(resultIteratorList); + // After delete command, if no records are fetched from one split, + // below steps are not required to be initialized. + if (isRecordFound) { + initializeFinalThreadMergerForMergeSort(); + initDataHandler(); + readAndLoadDataFromSortTempFiles(); + } + isCompactionSuccess = true; + } catch (Exception e) { + LOGGER.error(e, "Compaction failed: " + e.getMessage()); + } finally { + // clear temp files and folders created during compaction + deleteTempStoreLocation(); + } + return isCompactionSuccess; + } + + /** + * This method will clean up the local folders and files created during compaction process + */ + private void deleteTempStoreLocation() { + if (null != tempStoreLocation) { + try { + CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) }); + } catch (IOException | InterruptedException e) { + LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage()); + } + } + } + + /** + * This method will iterate over the query result and perform row sorting operation + * + * @param resultIteratorList + */ + private void processResult(List<RawResultIterator> resultIteratorList) + throws Exception { + for (RawResultIterator resultIterator : resultIteratorList) { + while (resultIterator.hasNext()) { + addRowForSorting(prepareRowObjectForSorting(resultIterator.next())); + isRecordFound = true; + } + } + try { + sortDataRows.startSorting(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + + /** + * This method will prepare the data from raw object that will take part in sorting + * + * @param row + * @return + */ + private Object[] prepareRowObjectForSorting(Object[] row) { + ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0]; + // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount]; + List<CarbonDimension> dimensions = segmentProperties.getDimensions(); + Object[] preparedRow = new Object[dimensions.size() + measureCount]; + // convert the dictionary from MDKey to surrogate key + byte[] dictionaryKey = wrapper.getDictionaryKey(); + long[] keyArray = segmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey); + Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount]; + for (int i = 0; i < keyArray.length; i++) { + dictionaryValues[i] = Long.valueOf(keyArray[i]).intValue(); + } + int noDictionaryIndex = 0; + int dictionaryIndex = 0; + for (int i = 0; i < dimensions.size(); i++) { + CarbonDimension dims = dimensions.get(i); + if (dims.hasEncoding(Encoding.DICTIONARY)) { + // dictionary + preparedRow[i] = dictionaryValues[dictionaryIndex++]; + } else { + // no dictionary dims + preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++); + } + } + // fill all the measures + // measures will always start from 1st index in the row object array + int measureIndexInRow = 1; + for (int i = 0; i < measureCount; i++) { + preparedRow[dimensionColumnCount + i] = + getConvertedMeasureValue(row[measureIndexInRow++], aggType[i]); + } + return preparedRow; + } + + /** + * This method will convert the spark decimal to java big decimal type + * + * @param value + * @param aggType + * @return + */ + private Object getConvertedMeasureValue(Object value, char aggType) { + switch (aggType) { + case CarbonCommonConstants.BIG_DECIMAL_MEASURE: + value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal(); + return value; + default: + return value; + } + } + + /** + * This method will read sort temp files, perform merge sort and add it to store for data loading + */ + private void readAndLoadDataFromSortTempFiles() throws Exception { + try { + finalMerger.startFinalMerge(); + while (finalMerger.hasNext()) { + Object[] rowRead = finalMerger.next(); + CarbonRow row = new CarbonRow(rowRead); + // convert the row from surrogate key to MDKey + Object[] outputRow = CarbonDataProcessorUtil + .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount, + segmentProperties.getComplexDimensions().size()); + dataHandler.addDataToStore(outputRow); + } + dataHandler.finish(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } catch (Exception e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } finally { + if (null != dataHandler) { + try { + dataHandler.closeHandler(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem loading data during compaction: " + e.getMessage()); + } + } + } + } + + /** + * add row to a temp array which will we written to a sort temp file after sorting + * + * @param row + */ + private void addRowForSorting(Object[] row) throws Exception { + try { + // prepare row array using RemoveDictionaryUtil class + sortDataRows.addRow(row); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception("Row addition for sorting failed during compaction: " + e.getMessage()); + } + } + + /** + * create an instance of sort data rows + */ + private void initSortDataRows() throws Exception { + measureCount = carbonTable.getMeasureByTableName(tableName).size(); + List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName); + noDictionaryColMapping = new boolean[dimensions.size()]; + int i = 0; + for (CarbonDimension dimension : dimensions) { + if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) { + i++; + continue; + } + noDictionaryColMapping[i++] = true; + noDictionaryCount++; + } + dimensionColumnCount = dimensions.size(); + SortParameters parameters = createSortParameters(); + SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters); + // TODO: Now it is only supported onheap merge, but we can have unsafe merge + // as well by using UnsafeSortDataRows. + this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger); + try { + this.sortDataRows.initialize(); + } catch (CarbonSortKeyAndGroupByException e) { + LOGGER.error(e); + throw new Exception( + "Error initializing sort data rows object during compaction: " + e.getMessage()); + } + } + + /** + * This method will create the sort parameters VO object + * + * @return + */ + private SortParameters createSortParameters() { + SortParameters parameters = SortParameters + .createSortParameters(carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, + segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, + carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(), + noDictionaryColMapping); + return parameters; + } + + /** + * create an instance of finalThread merger which will perform merge sort on all the + * sort temp files + */ + private void initializeFinalThreadMergerForMergeSort() { + String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR + + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + finalMerger = + new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount, + segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount, + aggType, noDictionaryColMapping); + } + + /** + * initialise carbon data writer instance + */ + private void initDataHandler() throws Exception { + CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel + .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, + tempStoreLocation); + setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, + carbonFactDataHandlerModel); + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, + CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + try { + dataHandler.initialise(); + } catch (CarbonDataWriterException e) { + LOGGER.error(e); + throw new Exception("Problem initialising data handler during compaction: " + e.getMessage()); + } + } + + /** + * initialise temporary store location + */ + private void initTempStoreLocation() { + tempStoreLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName, + carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, false); + } + + /** + * initialise aggregation type for measures for their storage format + */ + private void initAggType() { + aggType = CarbonDataProcessorUtil.initAggType(carbonTable, tableName, measureCount); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java new file mode 100644 index 0000000..6b9c80a --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +/** + * This enum is used to define the types of Compaction. + * We have 3 types. one is Minor another is Major and + * finally a compaction done after UPDATE-DELETE operation + * called IUD compaction. + */ +public enum CompactionType { + MINOR_COMPACTION, + MAJOR_COMPACTION, + IUD_UPDDEL_DELTA_COMPACTION, + IUD_DELETE_DELTA_COMPACTION, + NONE +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java new file mode 100644 index 0000000..ecf4408 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import org.apache.carbondata.core.datastore.block.Distributable; + +/** + * Block to Node mapping + */ +public class NodeBlockRelation implements Comparable<NodeBlockRelation> { + + private final Distributable block; + private final String node; + + public NodeBlockRelation(Distributable block, String node) { + this.block = block; + this.node = node; + + } + + public Distributable getBlock() { + return block; + } + + public String getNode() { + return node; + } + + @Override public int compareTo(NodeBlockRelation obj) { + return this.getNode().compareTo(obj.getNode()); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof NodeBlockRelation)) { + return false; + } + NodeBlockRelation o = (NodeBlockRelation) obj; + return node.equals(o.node); + } + + @Override public int hashCode() { + return node.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java new file mode 100644 index 0000000..ec2ddaf --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import java.util.List; + +import org.apache.carbondata.core.datastore.block.Distributable; + +public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> { + + private final List<Distributable> blocks; + private final String node; + + public NodeMultiBlockRelation(String node, List<Distributable> blocks) { + this.node = node; + this.blocks = blocks; + + } + + public List<Distributable> getBlocks() { + return blocks; + } + + public String getNode() { + return node; + } + + @Override public int compareTo(NodeMultiBlockRelation obj) { + return this.blocks.size() - obj.getBlocks().size(); + } + + @Override public boolean equals(Object obj) { + if (!(obj instanceof NodeMultiBlockRelation)) { + return false; + } + NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj; + return blocks.equals(o.blocks) && node.equals(o.node); + } + + @Override public int hashCode() { + return blocks.hashCode() + node.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java new file mode 100644 index 0000000..b7aa32c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.merger; + +import java.io.File; +import java.util.AbstractQueue; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.metadata.CarbonMetadata; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; +import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.processing.merger.exeception.SliceMergerException; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; + +/** + * This is the Merger class responsible for the merging of the segments. + */ +public class RowResultMergerProcessor extends AbstractResultProcessor { + + private CarbonFactHandler dataHandler; + private SegmentProperties segprop; + /** + * record holder heap + */ + private AbstractQueue<RawResultIterator> recordHolderHeap; + + private TupleConversionAdapter tupleConvertor; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName()); + + public RowResultMergerProcessor(String databaseName, + String tableName, SegmentProperties segProp, String tempStoreLocation, + CarbonLoadModel loadModel, CompactionType compactionType) { + this.segprop = segProp; + if (!new File(tempStoreLocation).mkdirs()) { + LOGGER.error("Error while new File(tempStoreLocation).mkdirs() "); + } + CarbonTable carbonTable = CarbonMetadata.getInstance() + .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName); + CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel + .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, + tempStoreLocation); + setDataFileAttributesInModel(loadModel, compactionType, carbonTable, + carbonFactDataHandlerModel); + carbonFactDataHandlerModel.setCompactionFlow(true); + dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); + tupleConvertor = new TupleConversionAdapter(segProp); + } + + private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) { + // create the List of RawResultIterator. + recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(), + new RowResultMergerProcessor.CarbonMdkeyComparator()); + } + + /** + * Merge function + * + */ + public boolean execute(List<RawResultIterator> resultIteratorList) { + initRecordHolderHeap(resultIteratorList); + boolean mergeStatus = false; + int index = 0; + boolean isDataPresent = false; + try { + + // add all iterators to the queue + for (RawResultIterator leaftTupleIterator : resultIteratorList) { + this.recordHolderHeap.add(leaftTupleIterator); + index++; + } + RawResultIterator iterator = null; + while (index > 1) { + // iterator the top record + iterator = this.recordHolderHeap.poll(); + Object[] convertedRow = iterator.next(); + if (null == convertedRow) { + index--; + continue; + } + if (!isDataPresent) { + dataHandler.initialise(); + isDataPresent = true; + } + // get the mdkey + addRow(convertedRow); + // if there is no record in the leaf and all then decrement the + // index + if (!iterator.hasNext()) { + index--; + continue; + } + // add record to heap + this.recordHolderHeap.add(iterator); + } + // if record holder is not empty then iterator the slice holder from + // heap + iterator = this.recordHolderHeap.poll(); + while (true) { + Object[] convertedRow = iterator.next(); + if (null == convertedRow) { + break; + } + // do it only once + if (!isDataPresent) { + dataHandler.initialise(); + isDataPresent = true; + } + addRow(convertedRow); + // check if leaf contains no record + if (!iterator.hasNext()) { + break; + } + } + if (isDataPresent) + { + this.dataHandler.finish(); + } + mergeStatus = true; + } catch (Exception e) { + LOGGER.error(e, e.getMessage()); + LOGGER.error("Exception in compaction merger " + e.getMessage()); + mergeStatus = false; + } finally { + try { + if (isDataPresent) { + this.dataHandler.closeHandler(); + } + } catch (CarbonDataWriterException e) { + LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage()); + mergeStatus = false; + } + } + + return mergeStatus; + } + + /** + * Below method will be used to add sorted row + * + * @throws SliceMergerException + */ + private void addRow(Object[] carbonTuple) throws SliceMergerException { + Object[] rowInWritableFormat; + + rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple); + try { + this.dataHandler.addDataToStore(rowInWritableFormat); + } catch (CarbonDataWriterException e) { + throw new SliceMergerException("Problem in merging the slice", e); + } + } + + /** + * Comparator class for comparing 2 raw row result. + */ + private class CarbonMdkeyComparator implements Comparator<RawResultIterator> { + + @Override public int compare(RawResultIterator o1, RawResultIterator o2) { + + Object[] row1 = new Object[0]; + Object[] row2 = new Object[0]; + try { + row1 = o1.fetchConverted(); + row2 = o2.fetchConverted(); + } catch (KeyGenException e) { + LOGGER.error(e.getMessage()); + } + if (null == row1 || null == row2) { + return 0; + } + ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0]; + ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0]; + int compareResult = 0; + int[] columnValueSizes = segprop.getEachDimColumnValueSize(); + int dictionaryKeyOffset = 0; + byte[] dimCols1 = key1.getDictionaryKey(); + byte[] dimCols2 = key2.getDictionaryKey(); + int noDicIndex = 0; + for (int eachColumnValueSize : columnValueSizes) { + // case of dictionary cols + if (eachColumnValueSize > 0) { + + compareResult = ByteUtil.UnsafeComparer.INSTANCE + .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2, + dictionaryKeyOffset, eachColumnValueSize); + dictionaryKeyOffset += eachColumnValueSize; + } else { // case of no dictionary + + byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex); + byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex); + compareResult = + ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2); + noDicIndex++; + + } + if (0 != compareResult) { + return compareResult; + } + } + return 0; + } + } + +}
