This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b121ac6 [CARBONDATA-3980] Load fails with aborted exception when Bad
records action is unspecified
b121ac6 is described below
commit b121ac6c18d03d30a22974bad7504870dd80b41a
Author: ShreelekhyaG <[email protected]>
AuthorDate: Thu Sep 10 16:30:05 2020 +0530
[CARBONDATA-3980] Load fails with aborted exception when Bad records action
is unspecified
Why is this PR needed?
Load fails with aborted exception when Bad records action is unspecified.
When the partition column is loaded with a bad record value, load fails
with 'Job aborted' message in cluster. However in complete stack trace we can
see the actual error message. (Like, 'Data load failed due to bad record: The
value with column name projectjoindate and column data type TIMESTAMP is not a
valid TIMESTAMP type')
What changes were proposed in this PR?
Fix bad record error message for the partition column. Added the error
message to operationContext map and if its not null throwing exception with
errorMessage from CarbonLoadDataCommand.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #3919
---
.../command/management/CarbonLoadDataCommand.scala | 7 ++++++-
.../command/management/CommonLoadUtils.scala | 1 +
.../StandardPartitionBadRecordLoggerTest.scala | 19 +++++++++++++++++++
3 files changed, 26 insertions(+), 1 deletion(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b17969b..d5c3c84 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -191,7 +191,12 @@ case class CarbonLoadDataCommand(databaseNameOp:
Option[String],
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
- throw ex
+ val errorMessage = operationContext.getProperty("Error message")
+ if (errorMessage != null) {
+ throw new RuntimeException(errorMessage.toString, ex.getCause)
+ } else {
+ throw ex
+ }
}
Seq.empty
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f574e12..5c46127 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1064,6 +1064,7 @@ object CommonLoadUtils {
if (loadParams.updateModel.isDefined) {
CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get,
executorMessage)
}
+ loadParams.operationContext.setProperty("Error message", errorMessage)
LOGGER.info(errorMessage)
LOGGER.error(ex)
throw ex
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index c19c51e..488291d 100644
---
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -219,6 +219,25 @@ class StandardPartitionBadRecordLoggerTest extends
QueryTest with BeforeAndAfter
}
}
+ test("test load with partition column having bad record value") {
+ sql("drop table if exists dataloadOptionTests")
+ sql("CREATE TABLE dataloadOptionTests (empno int, empname String,
designation String, " +
+ "workgroupcategory int, workgroupcategoryname String, deptno int,
projectjoindate " +
+ "Timestamp, projectenddate Date,attendance int,utilization int,salary
int) PARTITIONED BY " +
+ "(deptname String,doj Timestamp,projectcode int) STORED AS carbondata ")
+ val csvFilePath = s"$resourcesPath/data.csv"
+ val ex = intercept[RuntimeException] {
+ sql("LOAD DATA local inpath '" + csvFilePath +
+ "' INTO TABLE dataloadOptionTests OPTIONS
('bad_records_action'='FAIL', 'DELIMITER'= '," +
+ "', 'QUOTECHAR'= '\"',
'dateformat'='DD-MM-YYYY','timestampformat'='DD-MM-YYYY')")
+ }
+ assert(ex.getMessage.contains(
+ "DataLoad failure: Data load failed due to bad record: The value with
column name " +
+ "projectjoindate and column data type TIMESTAMP is not a valid TIMESTAMP
type.Please " +
+ "enable bad record logger to know the detail reason."))
+ sql("drop table dataloadOptionTests")
+ }
+
def drop(): Unit = {
sql("drop table IF EXISTS sales")
sql("drop table IF EXISTS serializable_values")