akashrn5 commented on a change in pull request #3865:
URL: https://github.com/apache/carbondata/pull/3865#discussion_r469029220



##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
     sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
-    intercept[Exception] {
-      sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
-    }
+    checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3)))
+    // String whilch length greater than 32000 will be considered as bad 
record and will be inserted as null in table

Review comment:
       ```suggestion
       // String which length greater than 32000 will be considered as bad 
record and will be inserted as null in table
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -2456,4 +2456,10 @@ private CarbonCommonConstants() {
    * property which defines the insert stage flow
    */
   public static final String IS_INSERT_STAGE = "is_insert_stage";
+
+  public static final String STRING_LENGTH_EXCEEDED_MESSAGE =
+      "Record %s of column %s exceeded " + MAX_CHARS_PER_COLUMN_DEFAULT +
+          " bytes. Please consider long string data type.";

Review comment:
       ```suggestion
             " characters. Please consider long string data type.";
   ```

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -20,17 +20,16 @@ package 
org.apache.carbondata.integration.spark.testsuite.dataload
 import java.math.BigDecimal
 
 import scala.collection.mutable.ArrayBuffer
-

Review comment:
       remove unnecessary changes if not required

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -20,17 +20,16 @@ package 
org.apache.carbondata.integration.spark.testsuite.dataload
 import java.math.BigDecimal
 
 import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterEach
-

Review comment:
       same as above

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
##########
@@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream 
dataOutputStream,
     }
   }
 
+  private byte[] getNullForBytes(byte[] value) {

Review comment:
       i think this is not required, can reuse `updateNullValue`, please check

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
##########
@@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream 
dataOutputStream,
     }
   }
 
+  private byte[] getNullForBytes(byte[] value) {
+    String badRecordAction = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
+    if 
(badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION))
 {
+      if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
+    }
+    return value;
+  }
+
   private void checkAndWriteByteArray(Object input, DataOutputStream 
dataOutputStream,
       BadRecordLogHolder logHolder, Boolean isWithoutConverter, String 
parsedValue, byte[] value)
       throws IOException {
     if (isWithoutConverter) {
       if (this.carbonDimension.getDataType() == DataTypes.STRING && input 
instanceof String
           && ((String)input).length() > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-        throw new CarbonDataLoadingException("Dataload failed, String size 
cannot exceed "
-            + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        
logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,
+            input.toString(), this.carbonDimension.getColName()));
+        value = getNullForBytes(value);
       }
       updateValueToByteStream(dataOutputStream, value);
     } else {
       if (this.carbonDimension.getDataType() == DataTypes.STRING
           && value.length > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-        throw new CarbonDataLoadingException("Dataload failed, String size 
cannot exceed "
-            + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        
logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,

Review comment:
       i think here after updating bad record reason, you should update null 
value to stream

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
     sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
-    intercept[Exception] {
-      sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
-    }
+    checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3)))
+    // String whilch length greater than 32000 will be considered as bad 
record and will be inserted as null in table
+    sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(3)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3)))
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata_dup 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    // Update strings of length greater than 32000 will invalidate the whole 
row.
+    sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select 
concat(load32000chardata.dim2,'aaaa') " +
+      "from load32000chardata where load32000chardata.mes1=3) where 
load32000chardata_dup.mes1=3").show()
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3), Row("32000", null, 3)))
+
+    val longChar: String = RandomStringUtils.randomAlphabetic(33000)
+    // BAD_RECORD_ACTION = "REDIRECT"
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
"REDIRECT");
+    sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)")
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3), Row("32000", null, 3)))
+
+    // BAD_RECORD_ACTION = "IGNORE"
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "IGNORE");
+    sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)")
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3), Row("32000", null, 3)))
+
+    // BAD_RECORD_ACTION = "FAIL"
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
     intercept[Exception] {
-      sql("update load32000chardata_dup 
set(load32000chardata_dup.dim2)=(select concat(load32000chardata.dim2,'aaaa') 
from load32000chardata)").show()
+      sql(s"insert into load32000chardata_dup values('32000', '$longChar', 3)")
     }
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3), Row("32000", null, 3)))
+    CarbonProperties.getInstance()

Review comment:
       you should change property anywhere ,better to do in beforeall and reset 
in afterall, please follow this.

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
##########
@@ -330,21 +331,35 @@ public void writeByteArray(Object input, DataOutputStream 
dataOutputStream,
     }
   }
 
+  private byte[] getNullForBytes(byte[] value) {
+    String badRecordAction = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
+    if 
(badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION))
 {
+      if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+        return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+      } else {
+        return CarbonCommonConstants.EMPTY_BYTE_ARRAY;
+      }
+    }
+    return value;
+  }
+
   private void checkAndWriteByteArray(Object input, DataOutputStream 
dataOutputStream,
       BadRecordLogHolder logHolder, Boolean isWithoutConverter, String 
parsedValue, byte[] value)
       throws IOException {
     if (isWithoutConverter) {
       if (this.carbonDimension.getDataType() == DataTypes.STRING && input 
instanceof String
           && ((String)input).length() > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-        throw new CarbonDataLoadingException("Dataload failed, String size 
cannot exceed "
-            + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        
logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,
+            input.toString(), this.carbonDimension.getColName()));
+        value = getNullForBytes(value);

Review comment:
       same as above

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
     sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
-    intercept[Exception] {
-      sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()

Review comment:
       here i cannot see any bad record action so by default it should be fail, 
why its force, please check and unset if some previous test case failed to do so

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
##########
@@ -154,13 +153,44 @@ class TestLoadDataGeneral extends QueryTest with 
BeforeAndAfterEach {
     sql("CREATE TABLE load32000chardata(dim1 String, dim2 String, mes1 int) 
STORED AS carbondata")
     sql("CREATE TABLE load32000chardata_dup(dim1 String, dim2 String, mes1 
int) STORED AS carbondata")
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
-    intercept[Exception] {
-      sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
-    }
+    checkAnswer(sql("select count(*) from load32000chardata"), Seq(Row(3)))
+    // String whilch length greater than 32000 will be considered as bad 
record and will be inserted as null in table
+    sql("insert into load32000chardata_dup select 
dim1,concat(load32000chardata.dim2,'aaaa'),mes1 from load32000chardata").show()
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(3)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3)))
     sql(s"LOAD DATA LOCAL INPATH '$testdata' into table load32000chardata_dup 
OPTIONS('FILEHEADER'='dim1,dim2,mes1')")
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    // Update strings of length greater than 32000 will invalidate the whole 
row.
+    sql("update load32000chardata_dup set(load32000chardata_dup.dim2)=(select 
concat(load32000chardata.dim2,'aaaa') " +
+      "from load32000chardata where load32000chardata.mes1=3) where 
load32000chardata_dup.mes1=3").show()
+    checkAnswer(sql("select count(*) from load32000chardata_dup"), Seq(Row(6)))
+    checkAnswer(sql("select * from load32000chardata_dup where mes1=3"), 
Seq(Row("32000", null, 3), Row("32000", null, 3)))
+
+    val longChar: String = RandomStringUtils.randomAlphabetic(33000)
+    // BAD_RECORD_ACTION = "REDIRECT"
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, 
"REDIRECT");

Review comment:
       for redirect please check the redirect value for assert

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
##########
@@ -301,8 +301,9 @@ public void writeByteArray(Object input, DataOutputStream 
dataOutputStream,
           }
           if (this.carbonDimension.getDataType() == DataTypes.STRING
               && value.length > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size 
cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " 
bytes");
+            
logHolder.setReason(String.format(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE,

Review comment:
       i think can reuse `updateNullValue` method

##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
##########
@@ -194,11 +194,10 @@ class VarcharDataTypesBasicTestCase extends QueryTest 
with BeforeAndAfterEach wi
     // query should pass
     checkAnswer(sql("select * from testlongstring"),
       Seq(Row(1, "ab", "cool"), Row(1, "ab1", longChar), Row(1, "abc", 
longChar)))
-    // insert long string should fail as unset is done

Review comment:
       this change is not required, please find who is setting as force and 
forgot to unset

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
##########
@@ -82,21 +83,25 @@ public Object convert(Object value, BadRecordLogHolder 
logHolder)
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, 
dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && parsedValue.length > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException(String.format(
-                "Dataload failed, String size cannot exceed %d bytes,"
-                    + " please consider long string data type",
-                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
+            
logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE);
+            String badRecordAction = CarbonProperties.getInstance()

Review comment:
       here no need to check the bad records action, just return `null` value 
for it should handle in `rowConverterImpl`

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
##########
@@ -82,21 +83,25 @@ public Object convert(Object value, BadRecordLogHolder 
logHolder)
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, 
dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && parsedValue.length > 
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException(String.format(
-                "Dataload failed, String size cannot exceed %d bytes,"
-                    + " please consider long string data type",
-                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
+            
logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE);
+            String badRecordAction = CarbonProperties.getInstance()
+                .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION);
+            if 
(badRecordAction.equalsIgnoreCase(CarbonCommonConstants.FORCE_BAD_RECORD_ACTION))
 {
+              parsedValue = getNullValue();
+            }
           }
           return parsedValue;
         } else {
           Object parsedValue = DataTypeUtil
               .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, 
dateFormat);
           if (dataType == DataTypes.STRING && parsedValue.toString().length()
               > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException(String.format(
-                "Dataload failed, String size cannot exceed %d bytes,"
-                    + " please consider long string data type",
-                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
+            
logHolder.setReason(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE);
+            if (CarbonProperties.getInstance()
+                .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION)

Review comment:
       same as above, check if it handles for all the bad records action




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to