Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 7ea31a6ae -> 5b978f5b7


[CARBONDATA-288] In hdfs bad record logger is failing in writing the bad records


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/809a4d00
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/809a4d00
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/809a4d00

Branch: refs/heads/master
Commit: 809a4d00976bad33002a4f4e32cac082d2e08c4f
Parents: 7ea31a6
Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com>
Authored: Sun Oct 9 02:24:57 2016 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Fri Oct 14 22:07:51 2016 +0800

----------------------------------------------------------------------
 .../hadoop/test/util/StoreCreator.java          | 10 +++--
 .../carbondata/spark/load/CarbonLoadModel.java  | 22 +++++-----
 .../carbondata/spark/load/CarbonLoaderUtil.java |  2 +-
 .../execution/command/carbonTableSchema.scala   | 12 ++++--
 .../processing/api/dataloader/SchemaInfo.java   | 18 ++++----
 .../processing/constants/LoggerAction.java      | 19 +++++++++
 .../constants/TableOptionConstant.java          | 43 ++++++++++++++++++++
 .../graphgenerator/GraphGenerator.java          |  2 +-
 .../csvbased/BadRecordslogger.java              | 40 ++++++++++--------
 .../csvbased/CarbonCSVBasedSeqGenStep.java      | 16 +++++---
 10 files changed, 132 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 5661888..7c9e170 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -62,6 +62,7 @@ import 
org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
 import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
 import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
+import org.apache.carbondata.processing.constants.TableOptionConstant;
 import org.apache.carbondata.processing.csvload.DataGraphExecuter;
 import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
 import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
@@ -358,9 +359,12 @@ public class StoreCreator {
     schmaModel.setCommentCharacter("#");
     info.setDatabaseName(databaseName);
     info.setTableName(tableName);
-    info.setSerializationNullFormat("serialization_null_format" + "," + "\\N");
-    info.setBadRecordsLoggerEnable("bad_records_logger_enable"+","+"false");
-    info.setBadRecordsLoggerEnable("bad_records_action"+","+"force");
+    info.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
+    info.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + 
"false");
+    info.setBadRecordsLoggerAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "force");
 
     generateGraph(schmaModel, info, loadModel.getTableName(), "0", 
loadModel.getSchema(), null,
         loadModel.getLoadMetadataDetails());

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
index 3fd481b..106ad71 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
@@ -117,9 +117,9 @@ public class CarbonLoadModel implements Serializable {
   private String badRecordsLoggerEnable;
 
   /**
-   * defines the option to specify the bad record log redirect to raw csv
+   * defines the option to specify the bad record logger action
    */
-  private String badRecordsLoggerRedirect;
+  private String badRecordsAction;
 
   /**
    * Max number of columns that needs to be parsed by univocity parser
@@ -348,7 +348,7 @@ public class CarbonLoadModel implements Serializable {
     copy.segmentId = segmentId;
     copy.serializationNullFormat = serializationNullFormat;
     copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copy.badRecordsLoggerRedirect =badRecordsLoggerRedirect;
+    copy.badRecordsAction = badRecordsAction;
     copy.escapeChar = escapeChar;
     copy.quoteChar = quoteChar;
     copy.commentChar = commentChar;
@@ -391,7 +391,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.segmentId = segmentId;
     copyObj.serializationNullFormat = serializationNullFormat;
     copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copyObj.badRecordsLoggerRedirect =badRecordsLoggerRedirect;
+    copyObj.badRecordsAction = badRecordsAction;
     copyObj.escapeChar = escapeChar;
     copyObj.quoteChar = quoteChar;
     copyObj.commentChar = commentChar;
@@ -612,19 +612,19 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   *  returns option to specify the bad record log redirect to raw csv
+   *  returns option to specify the bad record logger action
    * @return
    */
-  public String getBadRecordsLoggerRedirect() {
-    return badRecordsLoggerRedirect;
+  public String getBadRecordsAction() {
+    return badRecordsAction;
   }
 
   /**
-   * set option to specify the bad record log redirect to raw csv
-   * @param badRecordsLoggerRedirect
+   * set option to specify the bad record logger action
+   * @param badRecordsAction
    */
-  public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) {
-    this.badRecordsLoggerRedirect = badRecordsLoggerRedirect;
+  public void setBadRecordsAction(String badRecordsAction) {
+    this.badRecordsAction = badRecordsAction;
   }
 
   public String getRddIteratorKey() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 18c9538..8bc5fdc 100644
--- 
a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ 
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -198,7 +198,7 @@ public final class CarbonLoaderUtil {
     info.setComplexDelimiterLevel2(loadModel.getComplexDelimiterLevel2());
     info.setSerializationNullFormat(loadModel.getSerializationNullFormat());
     info.setBadRecordsLoggerEnable(loadModel.getBadRecordsLoggerEnable());
-    info.setBadRecordsLoggerRedirect(loadModel.getBadRecordsLoggerRedirect());
+    info.setBadRecordsLoggerAction(loadModel.getBadRecordsAction());
 
     generateGraph(schmaModel, info, loadModel, outPutLoc);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6dc6668..7b6a213 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonUtil}
 import org.apache.carbondata.integration.spark.merger.CompactionType
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
+import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -1135,12 +1136,15 @@ case class LoadTable(
       carbonLoadModel.setEscapeChar(escapeChar)
       carbonLoadModel.setQuoteChar(quoteChar)
       carbonLoadModel.setCommentChar(commentchar)
-      carbonLoadModel.setSerializationNullFormat("serialization_null_format" + 
"," +
-        serializationNullFormat)
       carbonLoadModel
-        .setBadRecordsLoggerEnable("bad_records_logger_enable" + "," + 
badRecordsLoggerEnable)
+        .setSerializationNullFormat(
+          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," + 
serializationNullFormat)
       carbonLoadModel
-        .setBadRecordsLoggerRedirect("bad_records_action" + "," + 
badRecordsLoggerRedirect)
+        .setBadRecordsLoggerEnable(
+          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + 
badRecordsLoggerEnable)
+      carbonLoadModel
+        .setBadRecordsAction(
+          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + 
badRecordsLoggerRedirect)
 
       if (delimiter.equalsIgnoreCase(complex_delimiter_level_1) ||
           
complex_delimiter_level_1.equalsIgnoreCase(complex_delimiter_level_2) ||

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
 
b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
index 56de5ca..a00e913 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
@@ -69,9 +69,9 @@ public class SchemaInfo {
    */
   private String badRecordsLoggerEnable;
   /**
-   * defines the option to specify whether to redirect the bad record logger 
to raw csv or not
+   * defines the option to specify whether to bad record logger action
    */
-  private String badRecordsLoggerRedirect;
+  private String badRecordsLoggerAction;
 
 
   public String getComplexDelimiterLevel1() {
@@ -215,18 +215,18 @@ public class SchemaInfo {
   }
 
   /**
-   * returns the option to set to redirect the badrecord logger to raw csv
+   * returns the option to set bad record logger action
    * @return
    */
-  public String getBadRecordsLoggerRedirect() {
-    return badRecordsLoggerRedirect;
+  public String getBadRecordsLoggerAction() {
+    return badRecordsLoggerAction;
   }
 
   /**
-   * set the option to set to redirect the badrecord logger to raw csv
-   * @param badRecordsLoggerRedirect
+   * set the option to set set bad record logger action
+   * @param badRecordsLoggerAction
    */
-  public void setBadRecordsLoggerRedirect(String badRecordsLoggerRedirect) {
-    this.badRecordsLoggerRedirect = badRecordsLoggerRedirect;
+  public void setBadRecordsLoggerAction(String badRecordsLoggerAction) {
+    this.badRecordsLoggerAction = badRecordsLoggerAction;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
 
b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
index bef65a9..622be42 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.carbondata.processing.constants;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
 
b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
new file mode 100644
index 0000000..eef69f5
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.constants;
+
+/**
+ * enum holds the value related to the ddl option
+ */
+public enum TableOptionConstant {
+  SERIALIZATION_NULL_FORMAT("serialization_null_format"),
+  BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"),
+  BAD_RECORDS_ACTION("bad_records_action");
+
+  private String name;
+
+  /**
+   * constructor to initialize the enum value
+   * @param name
+   */
+  TableOptionConstant(String name) {
+    this.name = name;
+  }
+
+  public String getName() {
+    return name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
 
b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
index 1612ca1..1b5b68f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -921,7 +921,7 @@ public class GraphGenerator {
     TableOptionWrapper tableOptionWrapper = 
TableOptionWrapper.getTableOptionWrapperInstance();
     tableOptionWrapper.setTableOption(schemaInfo.getSerializationNullFormat());
     tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerEnable());
-    
tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerRedirect());
+    tableOptionWrapper.setTableOption(schemaInfo.getBadRecordsLoggerAction());
     return tableOptionWrapper;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
index c373d62..ba33212 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -69,9 +68,13 @@ public class BadRecordslogger {
   private BufferedWriter bufferedCSVWriter;
   private DataOutputStream outCSVStream;
   /**
-   *
+   * bad record log file path
+   */
+  private String logFilePath;
+  /**
+   * csv file path
    */
-  private CarbonFile logFile;
+  private String csvFilePath;
 
   /**
    * task key which is DatabaseName/TableName/tablename
@@ -145,14 +148,11 @@ public class BadRecordslogger {
    *
    */
   private synchronized void writeBadRecordsToFile(StringBuilder logStrings) {
-
-    if (null == logFile) {
-      String filePath =
+    if (null == logFilePath) {
+      logFilePath =
           this.storePath + File.separator + this.fileName + 
CarbonCommonConstants.LOG_FILE_EXTENSION
               + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
-      logFile = FileFactory.getCarbonFile(filePath, 
FileFactory.getFileType(filePath));
     }
-
     try {
       if (null == bufferedWriter) {
         FileType fileType = FileFactory.getFileType(storePath);
@@ -161,13 +161,13 @@ public class BadRecordslogger {
           FileFactory.mkdirs(this.storePath, fileType);
 
           // create the files
-          FileFactory.createNewFile(logFile.getPath(), fileType);
+          FileFactory.createNewFile(logFilePath, fileType);
         }
 
-        outStream = FileFactory.getDataOutputStream(logFile.getPath(), 
fileType);
+        outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
 
         bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
-                Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
 
       }
       bufferedWriter.write(logStrings.toString());
@@ -185,12 +185,16 @@ public class BadRecordslogger {
   }
 
   /**
+   * method will write the row having bad record in the csv file.
    *
+   * @param logStrings
    */
   private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings) 
{
-    String filePath =
-        this.storePath + File.separator + this.fileName + 
CarbonCommonConstants.CSV_FILE_EXTENSION
-            + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    if (null == csvFilePath) {
+      csvFilePath =
+          this.storePath + File.separator + this.fileName + 
CarbonCommonConstants.CSV_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    }
     try {
       if (null == bufferedCSVWriter) {
         FileType fileType = FileFactory.getFileType(storePath);
@@ -199,10 +203,10 @@ public class BadRecordslogger {
           FileFactory.mkdirs(this.storePath, fileType);
 
           // create the files
-          FileFactory.createNewFile(filePath, fileType);
+          FileFactory.createNewFile(csvFilePath, fileType);
         }
 
-        outCSVStream = FileFactory.getDataOutputStream(filePath, fileType);
+        outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
 
         bufferedCSVWriter = new BufferedWriter(new 
OutputStreamWriter(outCSVStream,
             Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
@@ -211,9 +215,9 @@ public class BadRecordslogger {
       bufferedCSVWriter.write(logStrings.toString());
       bufferedCSVWriter.newLine();
     } catch (FileNotFoundException e) {
-      LOGGER.error("Bad Log Files not found");
+      LOGGER.error("Bad record csv Files not found");
     } catch (IOException e) {
-      LOGGER.error("Error While writing bad log File");
+      LOGGER.error("Error While writing bad record csv File");
     }
     finally {
       badRecordEntry.put(taskKey, "Partially");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/809a4d00/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
 
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index 8959179..94b2df8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -75,6 +75,10 @@ import 
org.apache.carbondata.processing.schema.metadata.HierarchiesInfo;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 
+import static 
org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_ACTION;
+import static 
org.apache.carbondata.processing.constants.TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE;
+import static 
org.apache.carbondata.processing.constants.TableOptionConstant.SERIALIZATION_NULL_FORMAT;
+
 import org.pentaho.di.core.exception.KettleException;
 import org.pentaho.di.core.row.RowMetaInterface;
 import org.pentaho.di.core.row.ValueMeta;
@@ -439,12 +443,12 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
           data.getSurrogateKeyGen()
               
.setDimensionOrdinalToDimensionMapping(populateNameToCarbonDimensionMap());
         }
-        serializationNullFormat = 
meta.getTableOptionWrapper().get("serialization_null_format");
-        badRecordsLoggerEnable =
-            
Boolean.parseBoolean(meta.getTableOptionWrapper().get("bad_records_logger_enable"));
-        badRecordConvertNullDisable = true;
+        serializationNullFormat =
+            
meta.getTableOptionWrapper().get(SERIALIZATION_NULL_FORMAT.getName());
+        badRecordsLoggerEnable = Boolean
+            
.parseBoolean(meta.getTableOptionWrapper().get(BAD_RECORDS_LOGGER_ENABLE.getName()));
         String bad_records_action =
-            meta.getTableOptionWrapper().get("bad_records_action");
+            meta.getTableOptionWrapper().get(BAD_RECORDS_ACTION.getName());
         if(null != bad_records_action) {
           LoggerAction loggerAction = null;
           try {
@@ -458,9 +462,11 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
               break;
             case REDIRECT:
               badRecordsLogRedirect = true;
+              badRecordConvertNullDisable = true;
               break;
             case IGNORE:
               badRecordsLogRedirect = false;
+              badRecordConvertNullDisable = true;
               break;
           }
         }

Reply via email to