Repository: incubator-carbondata Updated Branches: refs/heads/master 7ea31a6ae -> 5b978f5b7
[CARBONDATA-288] In hdfs bad record logger is failing in writing the bad records Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/809a4d00 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/809a4d00 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/809a4d00 Branch: refs/heads/master Commit: 809a4d00976bad33002a4f4e32cac082d2e08c4f Parents: 7ea31a6 Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Sun Oct 9 02:24:57 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Oct 14 22:07:51 2016 +0800 ---------------------------------------------------------------------- .../hadoop/test/util/StoreCreator.java | 10 +++-- .../carbondata/spark/load/CarbonLoadModel.java | 22 +++++----- .../carbondata/spark/load/CarbonLoaderUtil.java | 2 +- .../execution/command/carbonTableSchema.scala | 12 ++++-- .../processing/api/dataloader/SchemaInfo.java | 18 ++++---- .../processing/constants/LoggerAction.java | 19 +++++++++ .../constants/TableOptionConstant.java | 43 ++++++++++++++++++++ .../graphgenerator/GraphGenerator.java | 2 +- .../csvbased/BadRecordslogger.java | 40 ++++++++++-------- .../csvbased/CarbonCSVBasedSeqGenStep.java | 16 +++++--- 10 files changed, 132 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index 5661888..7c9e170 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -62,6 +62,7 @@ import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.lcm.fileoperations.FileWriteOperation; import org.apache.carbondata.processing.api.dataloader.DataLoadModel; import org.apache.carbondata.processing.api.dataloader.SchemaInfo; +import org.apache.carbondata.processing.constants.TableOptionConstant; import org.apache.carbondata.processing.csvload.DataGraphExecuter; import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus; import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus; @@ -358,9 +359,12 @@ public class StoreCreator { schmaModel.setCommentCharacter("#"); info.setDatabaseName(databaseName); info.setTableName(tableName); - info.setSerializationNullFormat("serialization_null_format" + "," + "\\N"); - info.setBadRecordsLoggerEnable("bad_records_logger_enable"+","+"false"); - info.setBadRecordsLoggerEnable("bad_records_action"+","+"force"); + info.setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); + info.setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); + info.setBadRecordsLoggerAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "force"); generateGraph(schmaModel, info, loadModel.getTableName(), "0", loadModel.getSchema(), null, loadModel.getLoadMetadataDetails()); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java index 3fd481b..106ad71 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java @@ -117,9 +117,9 @@ public class CarbonLoadModel implements Serializable { private String badRecordsLoggerEnable; /** - * defines the option to specify the bad record log redirect to raw csv + * defines the option to specify the bad record logger action */ - private String badRecordsLoggerRedirect; + private String badRecordsAction; /** * Max number of columns that needs to be parsed by univocity parser @@ -348,7 +348,7 @@ public class CarbonLoadModel implements Serializable { copy.segmentId = segmentId; copy.serializationNullFormat = serializationNullFormat; copy.badRecordsLoggerEnable = badRecordsLoggerEnable; - copy.badRecordsLoggerRedirect =badRecordsLoggerRedirect; + copy.badRecordsAction = badRecordsAction; copy.escapeChar = escapeChar; copy.quoteChar = quoteChar; copy.commentChar = commentChar; @@ -391,7 +391,7 @@ public class CarbonLoadModel implements Serializable { copyObj.segmentId = segmentId; copyObj.serializationNullFormat = serializationNullFormat; copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable; - copyObj.badRecordsLoggerRedirect =badRecordsLoggerRedirect; + copyObj.badRecordsAction = badRecordsAction; copyObj.escapeChar = escapeChar; copyObj.quoteChar = quoteChar; copyObj.commentChar = commentChar; @@ -612,19 +612,19 @@ public class CarbonLoadModel implements Serializable { } /** - * returns option to specify the bad record log redirect to raw csv + * returns option to specify the bad record logger action * @return */ - public String getBadRecordsLoggerRedirect() { - return badRecordsLoggerRedirect; + public String getBadRecordsAction() { + return badRecordsAction; } /** - * set option to specify the bad record log redirect to raw csv - * @param badRecordsLoggerRedirect + * set option to specify the bad record logger action + * @param badRecordsAction */ - public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) { - this.badRecordsLoggerRedirect = badRecordsLoggerRedirect; + public void setBadRecordsAction(String badRecordsAction) { + this.badRecordsAction = badRecordsAction; } public String getRddIteratorKey() { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 18c9538..8bc5fdc 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -198,7 +198,7 @@ public final class CarbonLoaderUtil { info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2()); info.setSerializationNullFormat(loadModel.getSerializationNullFormat()); info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable()); - info.setBadRecordsLoggerRedirect(loadModel.getBadRecordsLoggerRedirect()); + info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction()); generateGraph(schmaModel, info, loadModel, outPutLoc); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 6dc6668..7b6a213 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -54,6 +54,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.integration.spark.merger.CompactionType import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.lcm.status.SegmentStatusManager +import org.apache.carbondata.processing.constants.TableOptionConstant import org.apache.carbondata.processing.etl.DataLoadingException import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -1135,12 +1136,15 @@ case class LoadTable( carbonLoadModel.setEscapeChar(escapeChar) carbonLoadModel.setQuoteChar(quoteChar) carbonLoadModel.setCommentChar(commentchar) - carbonLoadModel.setSerializationNullFormat("serialization_null_format" + "," + - serializationNullFormat) carbonLoadModel - .setBadRecordsLoggerEnable("bad_records_logger_enable" + "," + badRecordsLoggerEnable) + .setSerializationNullFormat( + TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + serializationNullFormat) carbonLoadModel - .setBadRecordsLoggerRedirect("bad_records_action" + "," + badRecordsLoggerRedirect) + .setBadRecordsLoggerEnable( + TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable) + carbonLoadModel + .setBadRecordsAction( + TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect) if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) || complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) || http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java index 56de5ca..a00e913 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java +++ b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java @@ -69,9 +69,9 @@ public class SchemaInfo { */ private String badRecordsLoggerEnable; /** - * defines the option to specify whether to redirect the bad record logger to raw csv or not + * defines the option to specify whether to bad record logger action */ - private String badRecordsLoggerRedirect; + private String badRecordsLoggerAction; public String getComplexDelimiterLevel1() { @@ -215,18 +215,18 @@ public class SchemaInfo { } /** - * returns the option to set to redirect the badrecord logger to raw csv + * returns the option to set bad record logger action * @return */ - public String getBadRecordsLoggerRedirect() { - return badRecordsLoggerRedirect; + public String getBadRecordsLoggerAction() { + return badRecordsLoggerAction; } /** - * set the option to set to redirect the badrecord logger to raw csv - * @param badRecordsLoggerRedirect + * set the option to set set bad record logger action + * @param badRecordsLoggerAction */ - public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) { - this.badRecordsLoggerRedirect = badRecordsLoggerRedirect; + public void setBadRecordsLoggerAction(String badRecordsLoggerAction) { + this.badRecordsLoggerAction = badRecordsLoggerAction; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java index bef65a9..622be42 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java +++ b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.carbondata.processing.constants; /** http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java new file mode 100644 index 0000000..eef69f5 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.constants; + +/** + * enum holds the value related to the ddl option + */ +public enum TableOptionConstant { + SERIALIZATION_NULL_FORMAT("serialization_null_format"), + BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"), + BAD_RECORDS_ACTION("bad_records_action"); + + private String name; + + /** + * constructor to initialize the enum value + * @param name + */ + TableOptionConstant(String name) { + this.name = name; + } + + public String getName() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java index 1612ca1..1b5b68f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java +++ b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java @@ -921,7 +921,7 @@ public class GraphGenerator { TableOptionWrapper tableOptionWrapper = TableOptionWrapper.getTableOptionWrapperInstance(); tableOptionWrapper.setTableOption(schemaInfo.getSerializationNullFormat()); tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerEnable()); - tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerRedirect()); + tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerAction()); return tableOptionWrapper; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java index c373d62..ba33212 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java @@ -32,7 +32,6 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType; import org.apache.carbondata.core.util.CarbonUtil; @@ -69,9 +68,13 @@ public class BadRecordslogger { private BufferedWriter bufferedCSVWriter; private DataOutputStream outCSVStream; /** - * + * bad record log file path + */ + private String logFilePath; + /** + * csv file path */ - private CarbonFile logFile; + private String csvFilePath; /** * task key which is DatabaseName/TableName/tablename @@ -145,14 +148,11 @@ public class BadRecordslogger { * */ private synchronized void writeBadRecordsToFile(StringBuilder logStrings) { - - if (null == logFile) { - String filePath = + if (null == logFilePath) { + logFilePath = this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION + CarbonCommonConstants.FILE_INPROGRESS_STATUS; - logFile = FileFactory.getCarbonFile(filePath, FileFactory.getFileType(filePath)); } - try { if (null == bufferedWriter) { FileType fileType = FileFactory.getFileType(storePath); @@ -161,13 +161,13 @@ public class BadRecordslogger { FileFactory.mkdirs(this.storePath, fileType); // create the files - FileFactory.createNewFile(logFile.getPath(), fileType); + FileFactory.createNewFile(logFilePath, fileType); } - outStream = FileFactory.getDataOutputStream(logFile.getPath(), fileType); + outStream = FileFactory.getDataOutputStream(logFilePath, fileType); bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream, - Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); } bufferedWriter.write(logStrings.toString()); @@ -185,12 +185,16 @@ public class BadRecordslogger { } /** + * method will write the row having bad record in the csv file. * + * @param logStrings */ private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings) { - String filePath = - this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION - + CarbonCommonConstants.FILE_INPROGRESS_STATUS; + if (null == csvFilePath) { + csvFilePath = + this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION + + CarbonCommonConstants.FILE_INPROGRESS_STATUS; + } try { if (null == bufferedCSVWriter) { FileType fileType = FileFactory.getFileType(storePath); @@ -199,10 +203,10 @@ public class BadRecordslogger { FileFactory.mkdirs(this.storePath, fileType); // create the files - FileFactory.createNewFile(filePath, fileType); + FileFactory.createNewFile(csvFilePath, fileType); } - outCSVStream = FileFactory.getDataOutputStream(filePath, fileType); + outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType); bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); @@ -211,9 +215,9 @@ public class BadRecordslogger { bufferedCSVWriter.write(logStrings.toString()); bufferedCSVWriter.newLine(); } catch (FileNotFoundException e) { - LOGGER.error("Bad Log Files not found"); + LOGGER.error("Bad record csv Files not found"); } catch (IOException e) { - LOGGER.error("Error While writing bad log File"); + LOGGER.error("Error While writing bad record csv File"); } finally { badRecordEntry.put(taskKey, "Partially"); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java index 8959179..94b2df8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java +++ b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java @@ -75,6 +75,10 @@ import org.apache.carbondata.processing.schema.metadata.HierarchiesInfo; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.processing.util.RemoveDictionaryUtil; +import static org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_ACTION; +import static org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE; +import static org.apache.carbondata.processing.constants.TableOptionConstant.SERIALIZATION_NULL_FORMAT; + import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.ValueMeta; @@ -439,12 +443,12 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep { data.getSurrogateKeyGen() .setDimensionOrdinalToDimensionMapping(populateNameToCarbonDimensionMap()); } - serializationNullFormat = meta.getTableOptionWrapper().get("serialization_null_format"); - badRecordsLoggerEnable = - Boolean.parseBoolean(meta.getTableOptionWrapper().get("bad_records_logger_enable")); - badRecordConvertNullDisable = true; + serializationNullFormat = + meta.getTableOptionWrapper().get(SERIALIZATION_NULL_FORMAT.getName()); + badRecordsLoggerEnable = Boolean + .parseBoolean(meta.getTableOptionWrapper().get(BAD_RECORDS_LOGGER_ENABLE.getName())); String bad_records_action = - meta.getTableOptionWrapper().get("bad_records_action"); + meta.getTableOptionWrapper().get(BAD_RECORDS_ACTION.getName()); if(null != bad_records_action) { LoggerAction loggerAction = null; try { @@ -458,9 +462,11 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep { break; case REDIRECT: badRecordsLogRedirect = true; + badRecordConvertNullDisable = true; break; case IGNORE: badRecordsLogRedirect = false; + badRecordConvertNullDisable = true; break; } }