This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 22342f8  [CARBONDATA-4285] Fix alter add complex columns with global 
sort compaction failure
22342f8 is described below

commit 22342f847d7db515e5f8c17525522085f49bd2a5
Author: Mahesh Raju Somalaraju <[email protected]>
AuthorDate: Thu Sep 16 22:41:54 2021 +0530

    [CARBONDATA-4285] Fix alter add complex columns with global sort compaction 
failure
    
    Why is this PR needed?
    Alter add complex columns with global sort compaction is failing due to
    
    AOI exception : Currently creating default complex delimiter list in global 
sort compaction
    with size of 3. For map case need extra complex delimiter for handling the 
key-value
    bad record handling: When we add complex columns after insert the data, 
complex columns
    has null data for previously loaded segments. this null value is going to 
treat as bad
    record and compaction is failed.
    
    What changes were proposed in this PR?
    In Global sort compaction flow create default complex delimiter with 4, as 
already
    doing in load flow.
    Bad records handling pruned for compaction case. No need to check bad 
records for
    compaction as they are already checked while loading. previously loaded 
segments data
    we are inserting again in compaction case
    
    This closes #4218
---
 .../hadoop/api/CarbonTableOutputFormat.java        |  9 ++-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 12 +++-
 .../spark/load/DataLoadProcessorStepOnSpark.scala  |  5 +-
 .../spark/rdd/CarbonTableCompactor.scala           |  3 +-
 .../alterTable/TestAlterTableAddColumns.scala      | 80 ++++++++++++++++++++++
 .../processing/loading/BadRecordsLogger.java       |  9 ++-
 .../loading/BadRecordsLoggerProvider.java          | 12 +++-
 .../loading/converter/impl/RowConverterImpl.java   | 14 ++--
 8 files changed, 128 insertions(+), 16 deletions(-)

diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index ed447a5..12f68d8 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -365,11 +365,16 @@ public class CarbonTableOutputFormat extends 
FileOutputFormat<NullWritable, Obje
     if (null == complexDelim) {
       complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() 
+ ","
           + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + ","
-          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value();
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value() + ","
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value();
     }
     String[] split = complexDelim.split(",");
     model.setComplexDelimiter(split[0]);
-    if (split.length > 2) {
+    if (split.length > 3) {
+      model.setComplexDelimiter(split[1]);
+      model.setComplexDelimiter(split[2]);
+      model.setComplexDelimiter(split[3]);
+    } else if (split.length > 2) {
       model.setComplexDelimiter(split[1]);
       model.setComplexDelimiter(split[2]);
     } else if (split.length > 1) {
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 83d1890..fe1de5c 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -73,7 +73,8 @@ object DataLoadProcessBuilderOnSpark {
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
       hadoopConf: Configuration,
-      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]])
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, 
SegmentMetaDataInfo]],
+      isCompactionFlow: Boolean = false)
   : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
@@ -121,8 +122,13 @@ object DataLoadProcessBuilderOnSpark {
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
-      DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, 
partialSuccessAccum,
-        convertStepRowCounter)
+      DataLoadProcessorStepOnSpark.convertFunc(rows,
+        index,
+        modelBroadcast,
+        partialSuccessAccum,
+        convertStepRowCounter,
+        false,
+        isCompactionFlow)
     }.filter(_ != null) // Filter the bad record
 
     // 3. Sort
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 1694579..8f59200 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -221,10 +221,11 @@ object DataLoadProcessorStepOnSpark {
       modelBroadcast: Broadcast[CarbonLoadModel],
       partialSuccessAccum: LongAccumulator,
       rowCounter: LongAccumulator,
-      keepActualData: Boolean = false): Iterator[CarbonRow] = {
+      keepActualData: Boolean = false,
+      isCompactionFlow: Boolean = false): Iterator[CarbonRow] = {
     val model: CarbonLoadModel = 
modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
-    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
+    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf, 
isCompactionFlow)
     if (keepActualData) {
       conf.getDataFields.foreach(_.setUseActualData(keepActualData))
     }
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index bf3eed3..d39143a 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -507,7 +507,8 @@ class CarbonTableCompactor(
         Option(dataFrame),
         outputModel,
         SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
-        segmentMetaDataAccumulator)
+        segmentMetaDataAccumulator,
+        isCompactionFlow = true)
         .map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index bd4a112..c74dd13 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -594,4 +594,84 @@ class TestAlterTableAddColumns extends QueryTest with 
BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
   }
+
+  test("test the complex columns with global sort compaction") {
+    sql("DROP TABLE IF EXISTS alter_global1")
+    sql("CREATE TABLE alter_global1(intField INT) STORED AS carbondata " +
+        "TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
+    sql("insert into alter_global1 values(1)")
+    sql("insert into alter_global1 values(2)")
+    sql("insert into alter_global1 values(3)")
+    sql( "ALTER TABLE alter_global1 ADD COLUMNS(str1 array<int>)")
+    sql("insert into alter_global1 values(4, array(1))")
+    sql("insert into alter_global1 values(5, null)")
+    sql( "ALTER TABLE alter_global1 ADD COLUMNS(str2 array<string>)")
+    sql("insert into alter_global1 values(6, array(1), array('', 'hi'))")
+    sql("insert into alter_global1 values(7, array(1), array('bye', 'hi'))")
+    sql("ALTER TABLE alter_global1 ADD COLUMNS(str3 array<date>, str4 
struct<s1:timestamp>)")
+    sql(
+      "insert into alter_global1 values(8, array(1), array('bye', 'hi'), 
array('2017-02-01'," +
+      "'2018-09-11'),named_struct('s1', '2017-02-01 00:01:00'))")
+    val expected = Seq(Row(1, null, null, null, null),
+      Row(2, null, null, null, null),
+      Row(3, null, null, null, null),
+      Row(4, make(Array(1)), null, null, null),
+      Row(5, null, null, null, null),
+      Row(6, make(Array(1)), make(Array("", "hi")), null, null),
+      Row(7, make(Array(1)), make(Array("bye", "hi")), null, null),
+      Row(8, make(Array(1)), make(Array("bye", "hi")),
+        make(Array(Date.valueOf("2017-02-01"), Date.valueOf("2018-09-11"))),
+        Row(Timestamp.valueOf("2017-02-01 00:01:00"))))
+    checkAnswer(sql("select * from alter_global1"), expected)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global1")
+    assert(addedColumns.size == 4)
+    sql("alter table alter_global1 compact 'minor'")
+    checkAnswer(sql("select * from alter_global1"), expected)
+    sql("DROP TABLE IF EXISTS alter_global1")
+  }
+
+  test("test the multi-level complex columns with global sort compaction") {
+    sql("DROP TABLE IF EXISTS alter_global2")
+    sql("CREATE TABLE alter_global2(intField INT) STORED AS carbondata " +
+        "TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
+    sql("insert into alter_global2 values(1)")
+    // multi-level nested array
+    sql(
+      "ALTER TABLE alter_global2 ADD COLUMNS(arr1 array<array<int>>, arr2 
array<struct<a1:string," +
+      "map1:Map<string, string>>>) ")
+    sql(
+      "insert into alter_global2 values(1, array(array(1,2)), 
array(named_struct('a1','st'," +
+      "'map1', map('a','b'))))")
+    // multi-level nested struct
+    sql("ALTER TABLE alter_global2 ADD COLUMNS(struct1 struct<s1:string, arr: 
array<int>>," +
+        " struct2 struct<num:double,contact:map<string,array<int>>>) ")
+    sql("insert into alter_global2 values(1, " +
+        "array(array(1,2)), array(named_struct('a1','st','map1', 
map('a','b'))), " +
+        "named_struct('s1','hi','arr',array(1,2)), 
named_struct('num',2.3,'contact',map('ph'," +
+        "array(1,2))))")
+    // multi-level nested map
+    sql(
+      "ALTER TABLE alter_global2 ADD COLUMNS(map1 map<string,array<string>>, 
map2 map<string," +
+      "struct<d:int, s:struct<im:string>>>)")
+    sql("insert into alter_global2 values(1,  " +
+    "array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " 
+
+    "named_struct('s1','hi','arr',array(1,2)), 
named_struct('num',2.3,'contact',map('ph'," +
+    "array(1,2))),map('a',array('hi')), 
map('a',named_struct('d',23,'s',named_struct('im'," +
+    "'sh'))))")
+    val expected = Seq(Row(1, null, null, null, null, null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> 
"b")))),
+        null, null, null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> 
"b")))),
+        Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 
2)))), null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> 
"b")))),
+        Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))),
+        Map("a" -> make(Array("hi"))), Map("a" -> Row(23, Row("sh"))))
+    )
+    checkAnswer(sql("select * from alter_global2"), expected)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global2")
+    assert(addedColumns.size == 6)
+    sql("alter table alter_global2 compact 'minor'")
+    checkAnswer(sql("select * from alter_global2"), expected)
+    sql("DROP TABLE IF EXISTS alter_global2")
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
index 6a939bf..1e54e19 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -88,11 +88,13 @@ public class BadRecordsLogger {
 
   private boolean isDataLoadFail;
 
+  private boolean isCompactionFlow;
+
   // private final Object syncObject =new Object();
 
   public BadRecordsLogger(String key, String fileName, String storePath,
       boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
-      boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
+      boolean badRecordConvertNullDisable, boolean isDataLoadFail, boolean 
isCompactionFlow) {
     // Initially no bad rec
     taskKey = key;
     this.fileName = fileName;
@@ -101,6 +103,11 @@ public class BadRecordsLogger {
     this.badRecordLoggerEnable = badRecordLoggerEnable;
     this.badRecordConvertNullDisable = badRecordConvertNullDisable;
     this.isDataLoadFail = isDataLoadFail;
+    this.isCompactionFlow = isCompactionFlow;
+  }
+
+  public boolean isCompFlow() {
+    return isCompactionFlow;
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
index 25ae1c1..dbc4ff9 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
@@ -33,6 +33,16 @@ public class BadRecordsLoggerProvider {
    * @return
    */
   public static BadRecordsLogger 
createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
+    return createBadRecordLogger(configuration, false);
+  }
+
+  /**
+   * method returns the BadRecordsLogger instance
+   * @param configuration
+   * @return
+   */
+  public static BadRecordsLogger 
createBadRecordLogger(CarbonDataLoadConfiguration configuration,
+      Boolean isCompactionFlow) {
     boolean badRecordsLogRedirect = false;
     boolean badRecordConvertNullDisable = false;
     boolean isDataLoadFail = false;
@@ -72,7 +82,7 @@ public class BadRecordsLoggerProvider {
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
         identifier.getTableName() + '_' + System.currentTimeMillis(),
         getBadLogStoreLocation(configuration), badRecordsLogRedirect,
-        badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+        badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail, 
isCompactionFlow);
   }
 
   public static String getBadLogStoreLocation(CarbonDataLoadConfiguration 
configuration) {
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 28bfa91..b5dcb9e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -134,13 +134,15 @@ public class RowConverterImpl implements RowConverter {
         if 
(reason.equalsIgnoreCase(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)) 
{
           reason = String.format(reason, 
this.fields[i].getColumn().getColName());
         }
-        badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
-        if (badRecordLogger.isDataLoadFail()) {
-          String error = "Data load failed due to bad record: " + reason;
-          if (!badRecordLogger.isBadRecordLoggerEnable()) {
-            error += "Please enable bad record logger to know the detail 
reason.";
+        if (!badRecordLogger.isCompFlow()) {
+          badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
+          if (badRecordLogger.isDataLoadFail()) {
+            String error = "Data load failed due to bad record: " + reason;
+            if (!badRecordLogger.isBadRecordLoggerEnable()) {
+              error += "Please enable bad record logger to know the detail 
reason.";
+            }
+            throw new BadRecordFoundException(error);
           }
-          throw new BadRecordFoundException(error);
         }
         logHolder.clear();
         logHolder.setLogged(true);

Reply via email to