[CARBONDATA-1218] In case of data-load failure the BadRecordsLogger.badRecordEntry map holding the task Status is not removing the task Entry
Problem For GLOBAL_SORT scope option in case of data-load failure the BadRecordsLogger.badRecordEntry map holding the task Status is not removing the task Entry. Because of this the next load is getting failed even though the data being loaded has no bad records. Solution The map entry must be removed after load completion either success or fail. Refactored the Bad record logger. This closes #1082 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/837fdd2c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/837fdd2c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/837fdd2c Branch: refs/heads/branch-1.3 Commit: 837fdd2cb91780c9193efb5b37cd107c9fa36591 Parents: bcf3ca3 Author: mohammadshahidkhan <[email protected]> Authored: Fri Jun 23 12:26:25 2017 +0530 Committer: manishgupta88 <[email protected]> Committed: Fri Jan 5 10:47:37 2018 +0530 ---------------------------------------------------------------------- .../streaming/CarbonStreamRecordWriter.java | 4 +- .../badrecordloger/BadRecordLoggerTest.scala | 3 +- .../load/DataLoadProcessorStepOnSpark.scala | 30 +++-- .../spark/load/GlobalSortHelper.scala | 14 ++- .../processing/loading/BadRecordsLogger.java | 3 + .../loading/BadRecordsLoggerProvider.java | 96 +++++++++++++++ .../processing/loading/DataLoadExecutor.java | 17 +-- .../steps/DataConverterProcessorStepImpl.java | 98 +-------------- ...ConverterProcessorWithBucketingStepImpl.java | 79 +----------- .../processing/util/CarbonBadRecordUtil.java | 122 +++++++++++++++++++ .../util/CarbonDataProcessorUtil.java | 57 --------- 11 files changed, 267 insertions(+), 256 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java index bad2f44..7d862d4 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java @@ -41,6 +41,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.format.FileHeader; import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; import org.apache.carbondata.processing.loading.DataLoadProcessBuilder; @@ -49,7 +50,6 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.loading.parser.RowParser; import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl; -import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -150,7 +150,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { // initialize parser and converter rowParser = new RowParserImpl(dataFields, configuration); - badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(configuration); + badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); converter = new RowConverterImpl(configuration.getDataFields(), configuration, badRecordLogger); configuration.setCardinalityFinder(converter); converter.initialize(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala index 463ddbf..797a972 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.test.util.QueryTest @@ -324,6 +324,7 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll { sql("drop table empty_timestamp") sql("drop table empty_timestamp_false") sql("drop table dataloadOptionTests") + sql("drop table IF EXISTS loadIssue") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 2c74657..21de003 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -30,16 +30,16 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException import org.apache.carbondata.core.datastore.row.CarbonRow import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, TableProcessingOperations} +import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations} import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl import org.apache.carbondata.processing.loading.sort.SortStepRowUtil -import org.apache.carbondata.processing.loading.steps.{DataConverterProcessorStepImpl, DataWriterProcessorStepImpl} +import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl import org.apache.carbondata.processing.sort.sortdata.SortParameters import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory} -import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} +import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil} import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow} import org.apache.carbondata.spark.util.Util @@ -103,18 +103,20 @@ object DataLoadProcessorStepOnSpark { rowCounter: Accumulator[Int]): Iterator[CarbonRow] = { val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString) val conf = DataLoadProcessBuilder.createConfiguration(model) - val badRecordLogger = DataConverterProcessorStepImpl.createBadRecordLogger(conf) + val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf) val rowConverter = new RowConverterImpl(conf.getDataFields, conf, badRecordLogger) rowConverter.initialize() TaskContext.get().addTaskCompletionListener { context => - DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter) - GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum) + val hasBadRecord: Boolean = CarbonBadRecordUtil.hasBadRecord(model) + close(conf, badRecordLogger, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord) } TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) => - DataConverterProcessorStepImpl.close(badRecordLogger, conf, rowConverter) - GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum) + val hasBadRecord : Boolean = CarbonBadRecordUtil.hasBadRecord(model) + close(conf, badRecordLogger, rowConverter) + GlobalSortHelper.badRecordsLogger(model, partialSuccessAccum, hasBadRecord) wrapException(e, model) } @@ -130,6 +132,18 @@ object DataLoadProcessorStepOnSpark { } } + def close(conf: CarbonDataLoadConfiguration, + badRecordLogger: BadRecordsLogger, + rowConverter: RowConverterImpl): Unit = { + if (badRecordLogger != null) { + badRecordLogger.closeStreams() + CarbonBadRecordUtil.renameBadRecord(conf) + } + if (rowConverter != null) { + rowConverter.finish() + } + } + def convertTo3Parts( rows: Iterator[CarbonRow], index: Int, http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala index a42680e..4e3fc88 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/GlobalSortHelper.scala @@ -27,10 +27,16 @@ import org.apache.carbondata.processing.loading.BadRecordsLogger object GlobalSortHelper { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def badRecordsLogger(loadModel: CarbonLoadModel, badRecordsAccum: Accumulator[Int]): Unit = { - val key = new CarbonTableIdentifier(loadModel.getDatabaseName, loadModel.getTableName, null) - .getBadRecordLoggerKey - if (null != BadRecordsLogger.hasBadRecord(key)) { + /** + * + * @param loadModel Carbon load model instance + * @param badRecordsAccum Accumulator to maintain the load state if 0 then success id !0 then + * partial successfull + * @param hasBadRecord if <code>true<code> then load bad records vice versa. + */ + def badRecordsLogger(loadModel: CarbonLoadModel, + badRecordsAccum: Accumulator[Int], hasBadRecord: Boolean): Unit = { + if (hasBadRecord) { LOGGER.error("Data Load is partially success for table " + loadModel.getTableName) badRecordsAccum.add(1) } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java index bc0ce3a..d668329 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java @@ -271,6 +271,9 @@ public class BadRecordsLogger { * closeStreams void */ public synchronized void closeStreams() { + // removing taskKey Entry while closing the stream + // This will make sure the cleanup of the task status even in case of some failure. + removeBadRecordKey(taskKey); CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java new file mode 100644 index 0000000..614a959 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.loading; + +import java.io.File; + +import org.apache.carbondata.common.constants.LoggerAction; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; + +/** + * This class provides the BadRecordsLogger instance + */ +public class BadRecordsLoggerProvider { + /** + * method returns the BadRecordsLogger instance + * @param configuration + * @return + */ + public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) { + boolean badRecordsLogRedirect = false; + boolean badRecordConvertNullDisable = false; + boolean isDataLoadFail = false; + boolean badRecordsLoggerEnable = Boolean.parseBoolean( + configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE) + .toString()); + Object bad_records_action = + configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION) + .toString(); + if (null != bad_records_action) { + LoggerAction loggerAction = null; + try { + loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase()); + } catch (IllegalArgumentException e) { + loggerAction = LoggerAction.FORCE; + } + switch (loggerAction) { + case FORCE: + badRecordConvertNullDisable = false; + break; + case REDIRECT: + badRecordsLogRedirect = true; + badRecordConvertNullDisable = true; + break; + case IGNORE: + badRecordsLogRedirect = false; + badRecordConvertNullDisable = true; + break; + case FAIL: + isDataLoadFail = true; + break; + } + } + CarbonTableIdentifier identifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + return new BadRecordsLogger(identifier.getBadRecordLoggerKey(), + identifier.getTableName() + '_' + System.currentTimeMillis(), + getBadLogStoreLocation(configuration, + identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier + .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration + .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()), + badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail); + } + + public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration, + String storeLocation) { + String badLogStoreLocation = (String) configuration + .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); + if (null == badLogStoreLocation) { + badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + } + badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; + + return badLogStoreLocation; + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java index 10b19b7..fc5c41f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java @@ -25,6 +25,7 @@ import org.apache.carbondata.processing.loading.exception.BadRecordFoundExceptio import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.loading.exception.NoRetryException; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonBadRecordUtil; /** * It executes the data load. @@ -49,8 +50,7 @@ public class DataLoadExecutor { // 2. execute the step loadProcessorStep.execute(); // check and remove any bad record key from bad record entry logger static map - if (badRecordFound( - loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())) { + if (CarbonBadRecordUtil.hasBadRecord(loadModel)) { LOGGER.error("Data Load is partially success for table " + loadModel.getTableName()); } else { LOGGER.info("Data loading is successful for table " + loadModel.getTableName()); @@ -65,9 +65,6 @@ public class DataLoadExecutor { LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName()); throw new CarbonDataLoadingException( "Data Loading failed for table " + loadModel.getTableName(), e); - } finally { - removeBadRecordKey( - loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier()); } } @@ -87,16 +84,6 @@ public class DataLoadExecutor { } /** - * This method will remove the bad record key from bad record logger - * - * @param carbonTableIdentifier - */ - private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) { - String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey(); - BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey); - } - - /** * Method to clean all the resource */ public void close() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java index a0592f6..90a340d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java @@ -17,28 +17,22 @@ package org.apache.carbondata.processing.loading.steps; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.constants.LoggerAction; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; import org.apache.carbondata.processing.loading.converter.RowConverter; import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.carbondata.processing.util.CarbonBadRecordUtil; /** * Replace row data fields with dictionary values if column is configured dictionary encoded. @@ -64,7 +58,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte super.initialize(); child.initialize(); converters = new ArrayList<>(); - badRecordLogger = createBadRecordLogger(configuration); + badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); RowConverter converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger); configuration.setCardinalityFinder(converter); @@ -121,70 +115,12 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte throw new UnsupportedOperationException(); } - public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) { - boolean badRecordsLogRedirect = false; - boolean badRecordConvertNullDisable = false; - boolean isDataLoadFail = false; - boolean badRecordsLoggerEnable = Boolean.parseBoolean( - configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE) - .toString()); - Object bad_records_action = - configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION) - .toString(); - if (null != bad_records_action) { - LoggerAction loggerAction = null; - try { - loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase()); - } catch (IllegalArgumentException e) { - loggerAction = LoggerAction.FORCE; - } - switch (loggerAction) { - case FORCE: - badRecordConvertNullDisable = false; - break; - case REDIRECT: - badRecordsLogRedirect = true; - badRecordConvertNullDisable = true; - break; - case IGNORE: - badRecordsLogRedirect = false; - badRecordConvertNullDisable = true; - break; - case FAIL: - isDataLoadFail = true; - break; - } - } - CarbonTableIdentifier identifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); - return new BadRecordsLogger(identifier.getBadRecordLoggerKey(), - identifier.getTableName() + '_' + System.currentTimeMillis(), - getBadLogStoreLocation(configuration, - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration - .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()), - badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail); - } - - public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration, - String storeLocation) { - String badLogStoreLocation = (String) configuration - .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); - if (null == badLogStoreLocation) { - badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); - } - badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; - - return badLogStoreLocation; - } - @Override public void close() { if (!closed) { if (null != badRecordLogger) { badRecordLogger.closeStreams(); - renameBadRecord(badRecordLogger, configuration); + CarbonBadRecordUtil.renameBadRecord(configuration); } super.close(); if (converters != null) { @@ -197,32 +133,6 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte } } - public static void close(BadRecordsLogger badRecordLogger, CarbonDataLoadConfiguration - configuration, RowConverter converter) { - if (badRecordLogger != null) { - badRecordLogger.closeStreams(); - renameBadRecord(badRecordLogger, configuration); - } - if (converter != null) { - converter.finish(); - } - } - - private static void renameBadRecord(BadRecordsLogger badRecordLogger, - CarbonDataLoadConfiguration configuration) { - // rename operation should be performed only in case either bad reccords loggers is enabled - // or bad records redirect is enabled - if (badRecordLogger.isBadRecordLoggerEnable() || badRecordLogger.isBadRecordsLogRedirect()) { - // rename the bad record in progress to normal - CarbonTableIdentifier identifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); - CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration, - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId() - + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()); - } - } - @Override protected String getStepName() { return "Data Converter"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java index 82112b7..a1181c9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -17,32 +17,26 @@ package org.apache.carbondata.processing.loading.steps; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.carbondata.common.CarbonIterator; -import org.apache.carbondata.common.constants.LoggerAction; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.BadRecordsLoggerProvider; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; -import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; import org.apache.carbondata.processing.loading.converter.RowConverter; import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl; import org.apache.carbondata.processing.loading.partition.Partitioner; import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl; import org.apache.carbondata.processing.loading.row.CarbonRowBatch; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; +import org.apache.carbondata.processing.util.CarbonBadRecordUtil; /** * Replace row data fields with dictionary values if column is configured dictionary encoded. @@ -71,7 +65,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa super.initialize(); child.initialize(); converters = new ArrayList<>(); - badRecordLogger = createBadRecordLogger(); + badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(configuration); RowConverter converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger); configuration.setCardinalityFinder(converter); @@ -146,69 +140,13 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa throw new UnsupportedOperationException(); } - private BadRecordsLogger createBadRecordLogger() { - boolean badRecordsLogRedirect = false; - boolean badRecordConvertNullDisable = false; - boolean isDataLoadFail = false; - boolean badRecordsLoggerEnable = Boolean.parseBoolean( - configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE) - .toString()); - Object bad_records_action = - configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION) - .toString(); - if (null != bad_records_action) { - LoggerAction loggerAction = null; - try { - loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase()); - } catch (IllegalArgumentException e) { - loggerAction = LoggerAction.FORCE; - } - switch (loggerAction) { - case FORCE: - badRecordConvertNullDisable = false; - break; - case REDIRECT: - badRecordsLogRedirect = true; - badRecordConvertNullDisable = true; - break; - case IGNORE: - badRecordsLogRedirect = false; - badRecordConvertNullDisable = true; - break; - case FAIL: - isDataLoadFail = true; - break; - } - } - CarbonTableIdentifier identifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); - return new BadRecordsLogger(identifier.getBadRecordLoggerKey(), - identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation( - identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier - .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId() - + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()), - badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail); - } - - private String getBadLogStoreLocation(String storeLocation) { - String badLogStoreLocation = (String) configuration - .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); - if (null == badLogStoreLocation) { - badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); - } - badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; - - return badLogStoreLocation; - } - @Override public void close() { if (!closed) { super.close(); if (null != badRecordLogger) { badRecordLogger.closeStreams(); - renameBadRecord(configuration); + CarbonBadRecordUtil.renameBadRecord(configuration); } if (converters != null) { for (RowConverter converter : converters) { @@ -217,15 +155,6 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa } } } - private static void renameBadRecord(CarbonDataLoadConfiguration configuration) { - // rename the bad record in progress to normal - CarbonTableIdentifier identifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); - CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration, - identifier.getDatabaseName() + File.separator + identifier.getTableName() - + File.separator + configuration.getSegmentId() + File.separator + configuration - .getTaskNo()); - } @Override protected String getStepName() { return "Data Converter with Bucketing"; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java new file mode 100644 index 0000000..26a6f77 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.util; + +import java.io.File; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.BadRecordsLogger; +import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +/** + * Common methods used for the bad record handling + */ +public class CarbonBadRecordUtil { + private static final LogService LOGGER = + LogServiceFactory.getLogService(CarbonDataProcessorUtil.class.getName()); + + /** + * The method used to rename badrecord files from inprogress to normal + * + * @param configuration + */ + public static void renameBadRecord(CarbonDataLoadConfiguration configuration) { + // rename the bad record in progress to normal + CarbonTableIdentifier identifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + renameBadRecordsFromInProgressToNormal(configuration, + identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator + + configuration.getSegmentId() + File.separator + configuration.getTaskNo()); + } + + /** + * @param configuration + * @param storeLocation + */ + private static void renameBadRecordsFromInProgressToNormal( + CarbonDataLoadConfiguration configuration, String storeLocation) { + // get the base store location + String badLogStoreLocation = (String) configuration + .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); + if (null == badLogStoreLocation) { + badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + } + badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; + + FileFactory.FileType fileType = FileFactory.getFileType(badLogStoreLocation); + try { + if (!FileFactory.isFileExist(badLogStoreLocation, fileType)) { + return; + } + } catch (IOException e1) { + LOGGER.info("bad record folder does not exist"); + } + CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType); + + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile pathname) { + if (pathname.getName().indexOf(CarbonCommonConstants.FILE_INPROGRESS_STATUS) > -1) { + return true; + } + return false; + } + }); + + String badRecordsInProgressFileName = null; + String changedFileName = null; + for (CarbonFile badFiles : listFiles) { + badRecordsInProgressFileName = badFiles.getName(); + + changedFileName = badLogStoreLocation + File.separator + badRecordsInProgressFileName + .substring(0, badRecordsInProgressFileName.lastIndexOf('.')); + + badFiles.renameTo(changedFileName); + + if (badFiles.exists()) { + if (!badFiles.delete()) { + LOGGER.error("Unable to delete File : " + badFiles.getName()); + } + } + } + } + + /** + * The method removes the entry if exist and returns <code>true</code> if bad records exist + * else <code>false</code> + * + * @param loadModel + * @return + */ + public static boolean hasBadRecord(CarbonLoadModel loadModel) { + String key = loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier() + .getBadRecordLoggerKey(); + return (null != BadRecordsLogger.hasBadRecord(key)); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/837fdd2c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index beb1ad1..2a4cc00 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -32,12 +32,7 @@ import org.apache.carbondata.common.constants.LoggerAction; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.constants.CarbonLoadOptionConstants; import org.apache.carbondata.core.datastore.ColumnType; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.impl.FileFactory.FileType; import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -96,58 +91,6 @@ public final class CarbonDataProcessorUtil { } /** - * @param configuration - * @param storeLocation - */ - public static void renameBadRecordsFromInProgressToNormal( - CarbonDataLoadConfiguration configuration, String storeLocation) { - // get the base store location - String badLogStoreLocation = (String) configuration - .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH); - if (null == badLogStoreLocation) { - badLogStoreLocation = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); - } - badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; - - FileType fileType = FileFactory.getFileType(badLogStoreLocation); - try { - if (!FileFactory.isFileExist(badLogStoreLocation, fileType)) { - return; - } - } catch (IOException e1) { - LOGGER.info("bad record folder does not exist"); - } - CarbonFile carbonFile = FileFactory.getCarbonFile(badLogStoreLocation, fileType); - - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile pathname) { - if (pathname.getName().indexOf(CarbonCommonConstants.FILE_INPROGRESS_STATUS) > -1) { - return true; - } - return false; - } - }); - - String badRecordsInProgressFileName = null; - String changedFileName = null; - for (CarbonFile badFiles : listFiles) { - badRecordsInProgressFileName = badFiles.getName(); - - changedFileName = badLogStoreLocation + File.separator + badRecordsInProgressFileName - .substring(0, badRecordsInProgressFileName.lastIndexOf('.')); - - badFiles.renameTo(changedFileName); - - if (badFiles.exists()) { - if (!badFiles.delete()) { - LOGGER.error("Unable to delete File : " + badFiles.getName()); - } - } - } - } - - /** * This method will be used to delete sort temp location is it is exites */ public static void deleteSortLocationIfExists(String[] locations) {
