This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 706e8d3 [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller. 706e8d3 is described below commit 706e8d34c40da97e0d123f58eac3f6da3953f4d0 Author: dhatchayani <dhatcha.offic...@gmail.com> AuthorDate: Tue May 28 19:29:46 2019 +0530 [CARBONDATA-3393] Merge Index Job Failure should not trigger the merge index job again. Exception should be propagated to the caller. Problem: If the merge index job is failed, the same job is triggered again. Solution: Merge index job exception has to be propagated to the caller. It should not trigger the same job again. Changes: (1) Merge index job failure will not be propagated to the caller. And will only be LOGGED. (2) Implement a new method to write the SegmentFile based on the current load timestamp. This helps in case of merge index failures and writing merge index for old store. This closes #3226 --- .../core/constants/CarbonCommonConstants.java | 12 +++++++ .../carbondata/core/metadata/SegmentFileStore.java | 21 +++++++++++ .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 41 +++++++++++++++------- 3 files changed, 62 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index aa9dd05..311019c 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -346,6 +346,18 @@ public final class CarbonCommonConstants { public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true"; /** + * It is the user defined property to specify whether to throw exception or not in case + * if the MERGE INDEX JOB is failed. Default value - TRUE + * TRUE - throws exception and fails the corresponding LOAD job + * FALSE - Logs the exception and continue with the LOAD + */ + @CarbonProperty + public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION = + "carbon.merge.index.failure.throw.exception"; + + public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT = "true"; + + /** * property to be used for specifying the max byte limit for string/varchar data type till * where storing min/max in data file will be considered */ diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 69e5dc3..cbf58c7 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -139,12 +139,32 @@ public class SegmentFileStore { */ public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID) throws IOException { + return writeSegmentFile(carbonTable, segmentId, UUID, null); + } + + /** + * Write segment file to the metadata folder of the table selecting only the current load files + * + * @param carbonTable + * @param segmentId + * @param UUID + * @param currentLoadTimeStamp + * @return + * @throws IOException + */ + public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID, + final String currentLoadTimeStamp) throws IOException { String tablePath = carbonTable.getTablePath(); boolean supportFlatFolder = carbonTable.isSupportFlatFolder(); String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId); CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath); CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { + if (null != currentLoadTimeStamp) { + return file.getName().contains(currentLoadTimeStamp) && ( + file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() + .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)); + } return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName() .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)); } @@ -185,6 +205,7 @@ public class SegmentFileStore { return null; } + /** * Move the loaded data from source folder to destination folder. */ diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala index c101d02..bb930b4 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala @@ -20,6 +20,7 @@ package org.apache.spark.rdd import org.apache.spark.{Partition, TaskContext} import org.apache.spark.sql.SparkSession +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties @@ -38,6 +39,8 @@ case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String) object CarbonMergeFilesRDD { + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + /** * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment * @@ -70,9 +73,8 @@ object CarbonMergeFilesRDD { readFileFooterFromCarbonDataFile).collect() } else { try { - if (CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, - CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) { + if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, + CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)) { new CarbonMergeFilesRDD( sparkSession, carbonTable, @@ -82,19 +84,34 @@ object CarbonMergeFilesRDD { readFileFooterFromCarbonDataFile).collect() } } catch { - case _: Exception => - if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) { - new CarbonMergeFilesRDD( - sparkSession, - carbonTable, - segmentIds, - segmentFileNameToSegmentIdMap, - carbonTable.isHivePartitionTable, - readFileFooterFromCarbonDataFile).collect() + case ex: Exception => + val message = "Merge Index files request is failed " + + s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage + LOGGER.error(message) + if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION, + CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT)) { + throw new RuntimeException(message, ex) } } } } + + /** + * Check whether the Merge Index Property is set by the user. + * If not set, take the default value of the property. + * + * @return + */ + def isPropertySet(property: String, defaultValue: String): Boolean = { + var mergeIndex: Boolean = false + try { + mergeIndex = CarbonProperties.getInstance().getProperty(property, defaultValue).toBoolean + } catch { + case _: Exception => + mergeIndex = defaultValue.toBoolean + } + mergeIndex + } } /**