[CARBONDATA-1249] Wrong order of columns in redirected csv for bad records
Problem:
Wrong order of columns in redirected csv for bad records
The RowParser rearrage the csv raw data based on the inputMapping &
outputMapping.
So the converter step does not have actual raw csv record to log or redirect
the bad record details.
Steps to repprodcue:
Create employee(Name string, age int, project string) stored by 'carbondata'
LOAD DATA LOCAL INPATH '' INTO table employee
options('BAD_RECORDS_ACTION'='REDIRECT')
Data:
Name,age,Project
Sam,27,Carbon
Ruhi,23x,Hadoop
The second record is bad record so it will be writtern to the csv file at the
bad record loation.
Expected:
Ruhi,23x,Hadoop
This closes #1116
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e40b34b0
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e40b34b0
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e40b34b0
Branch: refs/heads/branch-1.3
Commit: e40b34b08210981cbe3bffe73cacff2577fbd8cb
Parents: 1f54c47
Author: mohammadshahidkhan <[email protected]>
Authored: Wed Jun 28 19:22:45 2017 +0530
Committer: ravipesala <[email protected]>
Committed: Thu Jan 4 20:45:40 2018 +0530
----------------------------------------------------------------------
.../core/datastore/row/CarbonRow.java | 24 +++++--
.../badrecordloger/BadRecordLoggerTest.scala | 68 +++++++++++++++++++-
.../TestDataLoadWithColumnsMoreThanSchema.scala | 4 ++
.../load/DataLoadProcessorStepOnSpark.scala | 14 +++-
.../spark/sql/test/TestQueryExecutor.scala | 8 ++-
.../converter/impl/RowConverterImpl.java | 6 +-
.../loading/model/CarbonLoadModel.java | 1 +
.../loading/steps/InputProcessorStepImpl.java | 26 ++++++--
.../util/CarbonDataProcessorUtil.java | 28 ++++++++
9 files changed, 159 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git
a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
index d981fa4..8702421 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -27,12 +27,24 @@ public class CarbonRow implements Serializable {
private Object[] data;
+ private Object[] rawData;
+
public short bucketNumber;
public CarbonRow(Object[] data) {
this.data = data;
}
+ /**
+ *
+ * @param data contains column values for only schema columns
+ * @param rawData contains complete row of the rawData
+ */
+ public CarbonRow(Object[] data, Object[] rawData) {
+ this.data = data;
+ this.rawData = rawData;
+ }
+
public Object[] getData() {
return data;
}
@@ -61,14 +73,14 @@ public class CarbonRow implements Serializable {
data[ordinal] = value;
}
- public CarbonRow getCopy() {
- Object[] copy = new Object[data.length];
- System.arraycopy(data, 0, copy, 0, copy.length);
- return new CarbonRow(copy);
- }
-
@Override public String toString() {
return Arrays.toString(data);
}
+ public Object[] getRawData() {
+ return rawData;
+ }
+ public void setRawData(Object[] rawData) {
+ this.rawData = rawData;
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 3c7f5e2..463ddbf 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -17,15 +17,19 @@
package org.apache.carbondata.spark.testsuite.badrecordloger
-import java.io.File
+import java.io.{File, FileFilter}
+import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
/**
* Test Class for detailed query on timestamp datatypes
*
@@ -46,6 +50,7 @@ class BadRecordLoggerTest extends QueryTest with
BeforeAndAfterAll {
sql("drop table IF EXISTS empty_timestamp")
sql("drop table IF EXISTS empty_timestamp_false")
sql("drop table IF EXISTS dataloadOptionTests")
+ sql("drop table IF EXISTS sales_test")
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country
String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED
BY 'carbondata'""")
@@ -247,8 +252,69 @@ class BadRecordLoggerTest extends QueryTest with
BeforeAndAfterAll {
}
}
+ test("validate redirected data") {
+ cleanBadRecordPath("default", "sales_test")
+ val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales_test(ID BigInt, date long, country
int,
+ actual_price Double, Quantity String, sold_price Decimal(19,2))
STORED BY 'carbondata'""")
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ try {
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_test
OPTIONS" +
+
"('bad_records_logger_enable'='false','bad_records_action'='redirect',
'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"')");
+ } catch {
+ case e: Exception => {
+ assert(true)
+ }
+ }
+ val redirectCsvPath = getRedirectCsvPath("default", "sales_test", "0", "0")
+ assert(checkRedirectedCsvContentAvailableInSource(csvFilePath,
redirectCsvPath))
+ }
+
+ def getRedirectCsvPath(dbName: String, tableName: String, segment: String,
task: String) = {
+ var badRecordLocation = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+ badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName +
"/" + segment + "/" +
+ task
+ val listFiles = new File(badRecordLocation).listFiles(new FileFilter {
+ override def accept(pathname: File): Boolean = {
+ pathname.getPath.endsWith(".csv")
+ }
+ })
+ listFiles(0)
+ }
+
+ /**
+ *
+ * @param csvFilePath
+ * @param redirectCsvPath
+ */
+ def checkRedirectedCsvContentAvailableInSource(csvFilePath: String,
+ redirectCsvPath: File): Boolean = {
+ val origFileLineList = FileUtils.readLines(new File(csvFilePath))
+ val redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
+ val iterator = redirectedFileLineList.iterator()
+ while (iterator.hasNext) {
+ if (!origFileLineList.contains(iterator.next())) {
+ return false;
+ }
+ }
+ return true
+ }
+
+ def cleanBadRecordPath(dbName: String, tableName: String) = {
+ var badRecordLocation = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
+ badRecordLocation = badRecordLocation + "/" + dbName + "/" + tableName
+
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(badRecordLocation))
+ }
+
override def afterAll {
sql("drop table sales")
+ sql("drop table sales_test")
sql("drop table serializable_values")
sql("drop table serializable_values_false")
sql("drop table insufficientColumn")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
----------------------------------------------------------------------
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
index c09d285..1e34ec8 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
@@ -30,6 +30,10 @@ class TestDataLoadWithColumnsMoreThanSchema extends
QueryTest with BeforeAndAfte
override def beforeAll {
sql("DROP TABLE IF EXISTS char_test")
sql("DROP TABLE IF EXISTS hive_char_test")
+ sql("DROP TABLE IF EXISTS max_columns_value_test")
+ sql("DROP TABLE IF EXISTS boundary_max_columns_test")
+ sql("DROP TABLE IF EXISTS valid_max_columns_test")
+ sql("DROP TABLE IF EXISTS max_columns_test")
sql("DROP TABLE IF EXISTS smart_500_DE")
sql("CREATE TABLE char_test (imei string,age int,task bigint,num
double,level decimal(10,3),productdate timestamp,mark int,name string)STORED BY
'org.apache.carbondata.format'")
sql("CREATE TABLE hive_char_test (imei string,age int,task bigint,num
double,level decimal(10,3),productdate timestamp,mark int,name string)row
format delimited fields terminated by ','")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 154d3ed..2c74657 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -39,7 +39,7 @@ import
org.apache.carbondata.processing.loading.sort.SortStepRowUtil
import
org.apache.carbondata.processing.loading.steps.{DataConverterProcessorStepImpl,
DataWriterProcessorStepImpl}
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler,
CarbonFactHandlerFactory}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil,
CarbonLoaderUtil}
import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
import org.apache.carbondata.spark.util.Util
@@ -71,7 +71,7 @@ object DataLoadProcessorStepOnSpark {
val model: CarbonLoadModel =
modelBroadcast.value.getCopyWithTaskNo(index.toString)
val conf = DataLoadProcessBuilder.createConfiguration(model)
val rowParser = new RowParserImpl(conf.getDataFields, conf)
-
+ val isRawDataRequired = CarbonDataProcessorUtil.isRawDataRequired(conf)
TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable)
=>
wrapException(e, model)
}
@@ -79,8 +79,16 @@ object DataLoadProcessorStepOnSpark {
new Iterator[CarbonRow] {
override def hasNext: Boolean = rows.hasNext
+
+
override def next(): CarbonRow = {
- val row = new CarbonRow(rowParser.parseRow(rows.next()))
+ var row : CarbonRow = null
+ if(isRawDataRequired) {
+ val rawRow = rows.next()
+ row = new CarbonRow(rowParser.parseRow(rawRow), rawRow)
+ } else {
+ row = new CarbonRow(rowParser.parseRow(rows.next()))
+ }
rowCounter.add(1)
row
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index 9e30b02..78214ae 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -62,7 +62,13 @@ object TestQueryExecutor {
property
}
}
-
+ val badStorePath = s"$integrationPath/spark-common-test/target/badrecord";
+ try {
+ FileFactory.mkdirs(badStorePath, FileFactory.getFileType(badStorePath))
+ } catch {
+ case e : Exception =>
+ throw e;
+ }
val hdfsUrl = {
val property = System.getProperty("hdfs.url")
if (property == null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 7fc8ed3..c5313cb 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -156,14 +156,12 @@ public class RowConverterImpl implements RowConverter {
@Override
public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
- //TODO: only copy if it is bad record
- CarbonRow copy = row.getCopy();
logHolder.setLogged(false);
logHolder.clear();
for (int i = 0; i < fieldConverters.length; i++) {
fieldConverters[i].convert(row, logHolder);
if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
- badRecordLogger.addBadRecordsToBuilder(copy.getData(),
logHolder.getReason());
+ badRecordLogger.addBadRecordsToBuilder(row.getRawData(),
logHolder.getReason());
if (badRecordLogger.isDataLoadFail()) {
String error = "Data load failed due to bad record: " +
logHolder.getReason();
if (!badRecordLogger.isBadRecordLoggerEnable()) {
@@ -178,6 +176,8 @@ public class RowConverterImpl implements RowConverter {
}
}
}
+ // rawData will not be required after this so reset the entry to null.
+ row.setRawData(null);
return row;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 8a295d9..d41455f 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -453,6 +453,7 @@ public class CarbonLoadModel implements Serializable {
copy.sortScope = sortScope;
copy.batchSortSizeInMb = batchSortSizeInMb;
copy.isAggLoadRequest = isAggLoadRequest;
+ copy.badRecordsLocation = badRecordsLocation;
return copy;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
index ae7ece1..4078a13 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.processing.loading.DataField;
import org.apache.carbondata.processing.loading.parser.RowParser;
import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
/**
* It reads data from record reader and sends data to next step.
@@ -51,7 +52,7 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
* executor service to execute the query
*/
public ExecutorService executorService;
-
+ boolean isRawDataRequired = false;
public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
CarbonIterator<Object[]>[] inputIterators) {
super(configuration, null);
@@ -68,6 +69,8 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
executorService = Executors.newCachedThreadPool(new CarbonThreadFactory(
"InputProcessorPool:" +
configuration.getTableIdentifier().getCarbonTableIdentifier()
.getTableName()));
+ // if logger is enabled then raw data will be required.
+ this.isRawDataRequired =
CarbonDataProcessorUtil.isRawDataRequired(configuration);
}
@Override public Iterator<CarbonRowBatch>[] execute() {
@@ -77,7 +80,7 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
for (int i = 0; i < outIterators.length; i++) {
outIterators[i] =
new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
- configuration.isPreFetch(), executorService, rowCounter);
+ configuration.isPreFetch(), executorService, rowCounter,
isRawDataRequired);
}
return outIterators;
}
@@ -150,9 +153,11 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
private AtomicLong rowCounter;
+ private boolean isRawDataRequired = false;
+
public InputProcessorIterator(List<CarbonIterator<Object[]>>
inputIterators,
RowParser rowParser, int batchSize, boolean preFetch, ExecutorService
executorService,
- AtomicLong rowCounter) {
+ AtomicLong rowCounter, boolean isRawDataRequired) {
this.inputIterators = inputIterators;
this.batchSize = batchSize;
this.rowParser = rowParser;
@@ -164,6 +169,7 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
this.preFetch = preFetch;
this.nextBatch = false;
this.firstTime = true;
+ this.isRawDataRequired = isRawDataRequired;
}
@Override
@@ -235,9 +241,17 @@ public class InputProcessorStepImpl extends
AbstractDataLoadProcessorStep {
// Create batch and fill it.
CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
int count = 0;
- while (internalHasNext() && count < batchSize) {
- carbonRowBatch.addRow(new
CarbonRow(rowParser.parseRow(currentIterator.next())));
- count++;
+ if (isRawDataRequired) {
+ while (internalHasNext() && count < batchSize) {
+ Object[] rawRow = currentIterator.next();
+ carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(rawRow),
rawRow));
+ count++;
+ }
+ } else {
+ while (internalHasNext() && count < batchSize) {
+ carbonRowBatch.addRow(new
CarbonRow(rowParser.parseRow(currentIterator.next())));
+ count++;
+ }
}
rowCounter.getAndAdd(carbonRowBatch.getSize());
return carbonRowBatch;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/e40b34b0/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index a18147a..beb1ad1 100644
---
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.carbondata.common.constants.LoggerAction;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -56,6 +57,7 @@ import
org.apache.carbondata.processing.datatypes.PrimitiveDataType;
import org.apache.carbondata.processing.datatypes.StructDataType;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.DataField;
+import
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
@@ -619,4 +621,30 @@ public final class CarbonDataProcessorUtil {
}
return errorMessage;
}
+ /**
+ * The method returns true is either logger is enabled or action is redirect
+ * @param configuration
+ * @return
+ */
+ public static boolean isRawDataRequired(CarbonDataLoadConfiguration
configuration) {
+ boolean isRawDataRequired = Boolean.parseBoolean(
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+ .toString());
+ // if logger is disabled then check if action is redirect then raw data
will be required.
+ if (!isRawDataRequired) {
+ Object bad_records_action =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION);
+ if (null != bad_records_action) {
+ LoggerAction loggerAction = null;
+ try {
+ loggerAction =
LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+ } catch (IllegalArgumentException e) {
+ loggerAction = LoggerAction.FORCE;
+ }
+ isRawDataRequired = loggerAction == LoggerAction.REDIRECT;
+ }
+ }
+ return isRawDataRequired;
+ }
+
}
\ No newline at end of file