This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b41b971  [CARBONDATA-3916] Support array complex type with SI
b41b971 is described below

commit b41b97155cc3c53466f420480ee5cb4ada9bff31
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu May 21 15:06:52 2020 +0530

    [CARBONDATA-3916] Support array complex type with SI
    
    Why is this PR needed?
    Currently, SI supports only dimension columns as index columns. This PR 
will support creating index with array data type.
    
    What changes were proposed in this PR?
    Support SI with array type
    Store ARRAY type as STRING type internally, by storing data as flattened.
    Currently, only one level of complex type is supported. Nested types will 
be supported in future PR
    Support query on complex type, by rewriting the filter expression
    
    This closes #3778
---
 .../carbondata/core/datastore/DataRefNode.java     |   3 +
 .../blockletindex/BlockletDataRefNode.java         |   5 +-
 .../core/scan/complextypes/ArrayQueryType.java     |  24 ++-
 .../core/scan/complextypes/MapQueryType.java       |   7 +-
 .../core/scan/complextypes/PrimitiveQueryType.java |  17 +-
 .../core/scan/complextypes/StructQueryType.java    |  15 ++
 .../scan/executor/impl/AbstractQueryExecutor.java  |   6 +-
 .../core/scan/filter/GenericQueryType.java         |   4 +
 .../result/iterator/DetailQueryResultIterator.java |  11 +
 .../apache/carbondata/core/util/DataTypeUtil.java  |  13 +-
 docs/index/secondary-index-guide.md                |   8 +-
 .../TestSIWithComplexArrayType.scala               | 228 +++++++++++++++++++++
 .../query/SecondaryIndexQueryResultProcessor.java  | 123 +++++++++--
 .../secondaryindex/command/SICreationCommand.scala |  51 ++++-
 .../optimizer/CarbonSecondaryIndexOptimizer.scala  |  18 +-
 .../secondaryindex/util/SecondaryIndexUtil.scala   |   2 +-
 .../spark/src/test/resources/secindex/array.csv    |   4 +
 17 files changed, 498 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
index c38e56a..97efc94 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DataRefNode.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.datastore;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.util.BitSetGroup;
@@ -127,4 +128,6 @@ public interface DataRefNode {
    * @return min max flag for each column
    */
   boolean[] minMaxFlagArray();
+
+  TableBlockInfo getTableBlockInfo();
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
index 53e01ff..b942c50 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataRefNode.java
@@ -218,7 +218,8 @@ public class BlockletDataRefNode implements DataRefNode {
     return blockInfos.size();
   }
 
-  public List<TableBlockInfo> getBlockInfos() {
-    return blockInfos;
+  @Override
+  public TableBlockInfo getTableBlockInfo() {
+    return blockInfos.get(index);
   }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 8a41384..0e1bd7f 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -97,21 +97,39 @@ public class ArrayQueryType extends ComplexQueryType 
implements GenericQueryType
 
   @Override
   public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
-    Object[] data = fillData(dataBuffer);
+    Object[] data = fillData(dataBuffer, false);
     if (data == null) {
       return null;
     }
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
   }
 
-  protected Object[] fillData(ByteBuffer dataBuffer) {
+  @Override
+  public Object[] getObjectArrayDataBasedOnDataType(ByteBuffer dataBuffer) {
+    return fillData(dataBuffer, true);
+  }
+
+  @Override
+  public Object getObjectDataBasedOnDataType(ByteBuffer dataBuffer) {
+    Object[] data = fillData(dataBuffer, true);
+    if (data == null) {
+      return null;
+    }
+    return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
+  }
+
+  protected Object[] fillData(ByteBuffer dataBuffer, boolean getBytesData) {
     int dataLength = dataBuffer.getInt();
     if (dataLength == -1) {
       return null;
     }
     Object[] data = new Object[dataLength];
     for (int i = 0; i < dataLength; i++) {
-      data[i] = children.getDataBasedOnDataType(dataBuffer);
+      if (getBytesData) {
+        data[i] = children.getObjectDataBasedOnDataType(dataBuffer);
+      } else {
+        data[i] = children.getDataBasedOnDataType(dataBuffer);
+      }
     }
     return data;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/MapQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/MapQueryType.java
index 212b102..4d80723 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/MapQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/MapQueryType.java
@@ -41,7 +41,7 @@ public class MapQueryType extends ArrayQueryType {
    */
   @Override
   public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
-    Object[] data = fillData(dataBuffer);
+    Object[] data = fillData(dataBuffer, false);
     if (data == null) {
       return null;
     }
@@ -55,4 +55,9 @@ public class MapQueryType extends ArrayQueryType {
     return 
DataTypeUtil.getDataTypeConverter().wrapWithArrayBasedMapData(keyArray, 
valueArray);
   }
 
+  @Override
+  public Object[] getObjectArrayDataBasedOnDataType(ByteBuffer dataBuffer) {
+    return fillData(dataBuffer, true);
+  }
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 5657418..2328c75 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -112,6 +112,16 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
   }
 
   @Override
+  public Object[] getObjectArrayDataBasedOnDataType(ByteBuffer dataBuffer) {
+    return new Object[0];
+  }
+
+  @Override
+  public Object getObjectDataBasedOnDataType(ByteBuffer dataBuffer) {
+    return getDataObject(dataBuffer, -1, true);
+  }
+
+  @Override
   public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension 
parent,
       CarbonDimension child) {
     Object actualData;
@@ -134,6 +144,10 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
   }
 
   private Object getDataObject(ByteBuffer dataBuffer, int size) {
+    return getDataObject(dataBuffer, size, false);
+  }
+
+  private Object getDataObject(ByteBuffer dataBuffer, int size, boolean 
getBytesData) {
     Object actualData;
     if (isDirectDictionary) {
       // Direct Dictionary Column, only for DATE type
@@ -160,7 +174,8 @@ public class PrimitiveQueryType extends ComplexQueryType 
implements GenericQuery
               ByteUtil.toXorInt(value, 0, 
CarbonCommonConstants.INT_SIZE_IN_BYTE));
         }
       } else {
-        actualData = 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType);
+        actualData = DataTypeUtil
+            .getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType, 
true, getBytesData);
       }
     }
     return actualData;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 5f0b8ce..8ee0053 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -124,6 +124,21 @@ public class StructQueryType extends ComplexQueryType 
implements GenericQueryTyp
   }
 
   @Override
+  public Object[] getObjectArrayDataBasedOnDataType(ByteBuffer dataBuffer) {
+    int childLength = dataBuffer.getShort();
+    Object[] fields = new Object[childLength];
+    for (int i = 0; i < childLength; i++) {
+      fields[i] =  children.get(i).getObjectDataBasedOnDataType(dataBuffer);
+    }
+    return fields;
+  }
+
+  @Override
+  public Object getObjectDataBasedOnDataType(ByteBuffer dataBuffer) {
+    return getDataBasedOnDataType(dataBuffer);
+  }
+
+  @Override
   public Object getDataBasedOnColumn(ByteBuffer dataBuffer, CarbonDimension 
parent,
       CarbonDimension child) {
     int childLength;
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 549f85c..978e8e4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -396,9 +396,9 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
               queryModel,
               abstractIndex,
               dataRefNode.numberOfNodes(),
-              dataRefNode.getBlockInfos().get(0).getFilePath(),
-              dataRefNode.getBlockInfos().get(0).getDeletedDeltaFilePath(),
-              dataRefNode.getBlockInfos().get(0).getSegment());
+              dataRefNode.getTableBlockInfo().getFilePath(),
+              dataRefNode.getTableBlockInfo().getDeletedDeltaFilePath(),
+              dataRefNode.getTableBlockInfo().getSegment());
       if (null == dimensionReusableDataBuffers || null == 
measureReusableDataBuffers) {
         dimensionReusableDataBuffers = 
blockExecutionInfoForBlock.getDimensionReusableDataBuffer();
         measureReusableDataBuffers = 
blockExecutionInfoForBlock.getMeasureReusableDataBuffer();
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index c5d9739..8dd33eb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -54,4 +54,8 @@ public interface GenericQueryType {
   Object getDataBasedOnColumnList(Map<CarbonDimension, ByteBuffer> childBuffer,
       CarbonDimension presentColumn);
 
+  Object[] getObjectArrayDataBasedOnDataType(ByteBuffer dataBuffer);
+
+  Object getObjectDataBasedOnDataType(ByteBuffer dataBuffer);
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
index ad530d3..34ec7b9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/result/iterator/DetailQueryResultIterator.java
@@ -33,9 +33,16 @@ public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator
 
   private final Object lock = new Object();
 
+  private final BlockExecutionInfo blockExecutionInfo = new 
BlockExecutionInfo();
+
   public DetailQueryResultIterator(List<BlockExecutionInfo> infos, QueryModel 
queryModel,
       ExecutorService execService) {
     super(infos, queryModel, execService);
+    if (infos.size() > 0) {
+      blockExecutionInfo
+          
.setComplexColumnParentBlockIndexes(infos.get(0).getComplexColumnParentBlockIndexes());
+      
blockExecutionInfo.setComplexDimensionInfoMap(infos.get(0).getComplexDimensionInfoMap());
+    }
   }
 
   @Override
@@ -53,4 +60,8 @@ public class DetailQueryResultIterator extends 
AbstractDetailQueryResultIterator
     }
     return rowBatch;
   }
+
+  public BlockExecutionInfo getBlockExecutionInfo() {
+    return blockExecutionInfo;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index fca2639..19139e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -568,6 +568,12 @@ public final class DataTypeUtil {
         && !DataTypes.isDecimal(dataType);
   }
 
+  public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] 
dataInBytes,
+      DataType actualDataType, boolean isTimeStampConversion) {
+    return getDataBasedOnDataTypeForNoDictionaryColumn(dataInBytes, 
actualDataType,
+        isTimeStampConversion, false);
+  }
+
   /**
    * Wrapper for actual getDataBasedOnDataTypeForNoDictionaryColumn.
    *
@@ -577,7 +583,7 @@ public final class DataTypeUtil {
    */
   public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] 
dataInBytes,
       DataType actualDataType) {
-    return getDataBasedOnDataTypeForNoDictionaryColumn(dataInBytes, 
actualDataType, true);
+    return getDataBasedOnDataTypeForNoDictionaryColumn(dataInBytes, 
actualDataType, true, false);
   }
 
   /**
@@ -590,7 +596,7 @@ public final class DataTypeUtil {
    * @return actual data after conversion
    */
   public static Object getDataBasedOnDataTypeForNoDictionaryColumn(byte[] 
dataInBytes,
-      DataType actualDataType, boolean isTimeStampConversion) {
+      DataType actualDataType, boolean isTimeStampConversion, boolean 
getBytesData) {
     if (null == dataInBytes || Arrays
         .equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, dataInBytes)) {
       return null;
@@ -647,6 +653,9 @@ public final class DataTypeUtil {
         }
         return dataInBytes;
       } else {
+        if (getBytesData) {
+          return dataInBytes;
+        }
         // Default action for String/Varchar
         return getDataTypeConverter().convertFromByteToUTF8String(dataInBytes);
       }
diff --git a/docs/index/secondary-index-guide.md 
b/docs/index/secondary-index-guide.md
index 2eaaed9..503230c 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -1,3 +1,4 @@
+
 <!--
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
@@ -23,6 +24,7 @@
 * [Querying Data](#querying-data)
 * [Compaction](#compacting-SI-table)
 * [DDLs on Secondary Index](#DDLs-on-Secondary-Index)
+* [Complex DataType support on SI](#Complex-DataType-support-on-SI)
 
 ## Quick example
 
@@ -209,4 +211,8 @@ Reindex on Database level
   ```
   REINDEX DATABASE db_name [WHERE SEGMENT.ID IN (1,2,5)]
   ```
-Note: This command is not supported with other concurrent operations.
\ No newline at end of file
+Note: This command is not supported with other concurrent operations.
+
+## Complex DataType support on SI
+Currently, only complex Array types are supported for creating secondary 
indexes. Nested Array
+support and other complex types support will be supported in the future.
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
new file mode 100644
index 0000000..859979c
--- /dev/null
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithComplexArrayType.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+import 
org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
+
+class TestSIWithComplexArrayType extends QueryTest with BeforeAndAfterEach {
+
+  override def beforeEach(): Unit = {
+    sql("drop table if exists complextable")
+  }
+
+  override def afterEach(): Unit = {
+    sql("drop index if exists index_1 on complextable")
+    sql("drop table if exists complextable")
+  }
+
+  test("test array<string> on secondary index") {
+    sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
+    sql("insert into complextable select 1,array('china', 'us'), 'b'")
+    sql("insert into complextable select 2,array('pak'), 'v'")
+    sql("insert into complextable select 3,array('china'), 'f'")
+    sql("insert into complextable select 4,array('india'),'g'")
+    val result1 = sql(" select * from complextable where 
array_contains(country,'china')")
+    val result2 = sql(" select * from complextable where country[0]='china'")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country) as 'carbondata'")
+    val df1 = sql(" select * from complextable where 
array_contains(country,'china')")
+    val df2 = sql(" select * from complextable where country[0]='china'")
+    if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+   val doNotHitSIDf = sql(" select * from complextable where 
array_contains(country,'china') and array_contains(country,'us')")
+    if (isFilterPushedDownToSI(doNotHitSIDf.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result1, df1)
+    checkAnswer(result2, df2)
+  }
+
+  test("test array<string> and string as index columns on secondary index") {
+    sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
+    sql("insert into complextable select 1, array('china', 'us'), 'b'")
+    sql("insert into complextable select 2, array('pak'), 'v'")
+    sql("insert into complextable select 3, array('china'), 'f'")
+    sql("insert into complextable select 4, array('india'),'g'")
+    val result =  sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country, name) as 
'carbondata'")
+    val df =  sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test load data with array<string> on secondary index") {
+    sql("create table complextable (id int, name string, country 
array<string>) stored as carbondata")
+    sql(
+      s"load data inpath '$resourcesPath/secindex/array.csv' into table 
complextable options('delimiter'=','," +
+      
"'quotechar'='\"','fileheader'='id,name,country','complex_delimiter_level_1'='$')")
+    val result =  sql(" select * from complextable where 
array_contains(country,'china')")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country) as 'carbondata'")
+    val df =  sql(" select * from complextable where 
array_contains(country,'china')")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test si creation with struct and map type") {
+    sql("create table complextable (country struct<b:string>, name string, id 
Map<string, string>, arr1 array<string>, arr2 array<string>) stored as 
carbondata")
+    intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(country) as 
'carbondata'")
+    }
+    intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(id) as 'carbondata'")
+    }
+    intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(arr1, arr2) as 
'carbondata'")
+    }
+  }
+
+  test("test si creation with array") {
+    sql("create table complextable (id int, name string, country 
array<array<string>>, add array<int>) stored as carbondata")
+    sql("drop index if exists index_1 on complextable")
+    intercept[RuntimeException] {
+      sql("create index index_1 on table complextable(country) as 
'carbondata'")
+    }.getMessage.contains("SI creation with nested array complex type is not 
supported yet")
+  }
+
+  test("test complex with null and empty data") {
+    sql("create table complextable (id string, country array<string>, name 
string) stored as carbondata")
+    sql("insert into complextable select 'a', array(), ''")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country) as 'carbondata'")
+    checkAnswer(sql("select count(*) from index_1"), Seq(Row(1)) )
+    sql("insert into complextable select 'a', array(null), 'b'")
+    checkAnswer(sql("select count(*) from index_1"), Seq(Row(2)) )
+  }
+
+  test("test array<date> on secondary index") {
+    sql("drop table if exists complextable")
+    sql("create table complextable (name string, time date, projectdate 
array<date>) stored as carbondata")
+    sql("drop index if exists index_1 on complextable")
+    sql("insert into complextable select 'b', 
'2017-02-01',array('2017-02-01','2018-02-01')")
+    val result = sql(" select * from complextable where 
array_contains(projectdate,cast('2017-02-01' as date))")
+    sql("create index index_1 on table complextable(projectdate) as 
'carbondata'")
+    checkAnswer(sql("select count(*) from index_1"), Seq(Row(2)))
+    val df =  sql(" select * from complextable where 
array_contains(projectdate,cast('2017-02-01' as date))")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test array<timestamp> on secondary index") {
+    sql("drop table if exists complextable")
+    sql("create table complextable (name string, time date, projectdate 
array<timestamp>) stored as carbondata")
+    sql("drop index if exists index_1 on complextable")
+    sql("insert into complextable select 'b', '2017-02-01',array('2017-02-01 
00:01:00','2018-02-01 02:00:00')")
+    val result = sql(" select * from complextable where 
array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+    sql("create index index_1 on table complextable(projectdate) as 
'carbondata'")
+    checkAnswer(sql("select count(*) from index_1"), Seq(Row(2)))
+    val df =  sql(" select * from complextable where 
array_contains(projectdate,cast('2017-02-01 00:01:00' as timestamp))")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test array<varchar> and varchar as index columns on secondary index") {
+    sql("create table complextable (id string, country array<varchar(10)>, 
name string) stored as carbondata")
+    sql("insert into complextable select 1, array('china', 'us'), 'b'")
+    sql("insert into complextable select 2, array('pak'), 'v'")
+    sql("insert into complextable select 3, array('china'), 'f'")
+    sql("insert into complextable select 4, array('india'),'g'")
+    val result =  sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country, name) as 
'carbondata'")
+    val df =  sql(" select * from complextable where 
array_contains(country,'china') and name='f'")
+    if (!isFilterPushedDownToSI(df.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result, df)
+  }
+
+  test("test multiple SI with array and primitive type") {
+    sql("create table complextable (id string, country array<varchar(10)>, 
name string, addr string) stored as carbondata")
+    sql("insert into complextable select 1, array('china', 'us'), 'b', 'b1'")
+    sql("insert into complextable select 2, array('pak', 'india'), 'v', 'v'")
+    val result1 = sql("select * from complextable where addr='v' and 
array_contains(country,'pak')")
+    val result2 =  sql("select * from complextable where 
array_contains(country,'pak') and addr='v'")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country, name) as 
'carbondata'")
+    sql("drop index if exists index_2 on complextable")
+    sql("create index index_2 on table complextable(addr) as 'carbondata'")
+    val df1 =  sql("select * from complextable where addr='v' and 
array_contains(country,'pak')")
+    val df2 =  sql("select * from complextable where 
array_contains(country,'pak') and addr='v'")
+    if (!isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result1, df1)
+    if (!isFilterPushedDownToSI(df2.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result2, df2)
+  }
+
+  test("test SI complex with multiple array contains") {
+    sql("create table complextable (id string, country array<varchar(10)>, 
name string, addr string) stored as carbondata")
+    sql("insert into complextable select 1, array('china', 'us'), 'b', 'b1'")
+    sql("insert into complextable select 2, array('pak', 'india'), 'v', 'v'")
+    val result1 = sql("select * from complextable where 
array_contains(country,'india') and array_contains(country,'pak')")
+    sql("drop index if exists index_1 on complextable")
+    sql("create index index_1 on table complextable(country, name) as 
'carbondata'")
+    val df1 =  sql("select * from complextable where 
array_contains(country,'india') and array_contains(country,'pak')")
+    if (isFilterPushedDownToSI(df1.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    checkAnswer(result1, df1)
+  }
+
+}
diff --git 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index b200655..1138ff7 100644
--- 
a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ 
b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.secondaryindex.query;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -26,13 +29,17 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import 
org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.result.RowBatch;
+import 
org.apache.carbondata.core.scan.result.iterator.DetailQueryResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -222,10 +229,18 @@ public class SecondaryIndexQueryResultProcessor {
   private void processResult(List<CarbonIterator<RowBatch>> 
detailQueryResultIteratorList)
       throws SecondaryIndexException {
     for (CarbonIterator<RowBatch> detailQueryIterator : 
detailQueryResultIteratorList) {
+      DetailQueryResultIterator queryIterator = (DetailQueryResultIterator) 
detailQueryIterator;
+      BlockExecutionInfo blockExecutionInfo = 
queryIterator.getBlockExecutionInfo();
+      // get complex dimension info map from block execution info
+      Map<Integer, GenericQueryType> complexDimensionInfoMap =
+          blockExecutionInfo.getComplexDimensionInfoMap();
+      int[] complexColumnParentBlockIndexes =
+          blockExecutionInfo.getComplexColumnParentBlockIndexes();
       while (detailQueryIterator.hasNext()) {
         RowBatch batchResult = detailQueryIterator.next();
         while (batchResult.hasNext()) {
-          addRowForSorting(prepareRowObjectForSorting(batchResult.next()));
+          addRowForSorting(prepareRowObjectForSorting(batchResult.next(), 
complexDimensionInfoMap,
+              complexColumnParentBlockIndexes));
           isRecordFound = true;
         }
       }
@@ -243,47 +258,123 @@ public class SecondaryIndexQueryResultProcessor {
   /**
    * This method will prepare the data from raw object that will take part in 
sorting
    */
-  private Object[] prepareRowObjectForSorting(Object[] row) {
+  private Object[] prepareRowObjectForSorting(Object[] row,
+      Map<Integer, GenericQueryType> complexDimensionInfoMap, int[] 
complexColumnParentBlockIndexes)
+      throws SecondaryIndexException {
     ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
-    // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
+    byte[] implicitColumnByteArray = wrapper.getImplicitColumnByteArray();
 
     List<CarbonDimension> dimensions = segmentProperties.getDimensions();
     Object[] preparedRow = new Object[dimensions.size() + measureCount];
+    Map<Integer, Object[]> complexDataMap = new HashMap<>();
 
     int noDictionaryIndex = 0;
     int dictionaryIndex = 0;
+    int complexIndex = 0;
     int i = 0;
     // loop excluding last dimension as last one is implicit column.
     for (; i < dimensions.size() - 1; i++) {
       CarbonDimension dims = dimensions.get(i);
-      if (dims.hasEncoding(Encoding.DICTIONARY)) {
+      boolean isComplexColumn = false;
+      // As complex column of MainTable is stored as its primitive type in SI,
+      // we need to check if dimension is complex dimension or not based on 
dimension
+      // name. Check if name exists in complexDimensionInfoMap of main table 
result
+      if (!complexDimensionInfoMap.isEmpty() && 
complexColumnParentBlockIndexes.length > 0) {
+        for (GenericQueryType queryType : complexDimensionInfoMap.values()) {
+          if (queryType.getName().equalsIgnoreCase(dims.getColName())) {
+            isComplexColumn = true;
+            break;
+          }
+        }
+      }
+      // fill all the no dictionary and dictionary data to the prepared row 
first, fill the complex
+      // flatten data to prepared row at last
+      if (dims.hasEncoding(Encoding.DICTIONARY) && !isComplexColumn) {
         // dictionary
         preparedRow[i] = wrapper.getDictionaryKeyByIndex(dictionaryIndex++);
       } else {
-        // no dictionary dims
-        byte[] noDictionaryKeyByIndex = 
wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
-        // no dictionary primitive columns are expected to be in original data 
while loading,
-        // so convert it to original data
-        if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
-          Object dataFromBytes = 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
-              noDictionaryKeyByIndex, dims.getDataType());
-          if (null != dataFromBytes && dims.getDataType() == 
DataTypes.TIMESTAMP) {
-            dataFromBytes = (long) dataFromBytes / 1000L;
+        if (isComplexColumn) {
+          // get the flattened data of complex column
+          byte[] complexKeyByIndex = 
wrapper.getComplexKeyByIndex(complexIndex);
+          ByteBuffer byteArrayInput = ByteBuffer.wrap(complexKeyByIndex);
+          GenericQueryType genericQueryType =
+              
complexDimensionInfoMap.get(complexColumnParentBlockIndexes[complexIndex++]);
+          int complexDataLength = byteArrayInput.getShort(2);
+          // In case, if array is empty
+          if (complexDataLength == 0) {
+            complexDataLength = complexDataLength + 1;
+          }
+          // get flattened array data
+          Object[] complexFlattenedData = new Object[complexDataLength];
+          Object[] data = 
genericQueryType.getObjectArrayDataBasedOnDataType(byteArrayInput);
+          for (int index = 0; index < complexDataLength; index++) {
+            complexFlattenedData[index] =
+                getData(data, index, dims.getColumnSchema().getDataType());
           }
-          preparedRow[i] = dataFromBytes;
+          // store the dimesnion column index and the complex column flattened 
data to a map
+          complexDataMap.put(i, complexFlattenedData);
         } else {
-          preparedRow[i] = noDictionaryKeyByIndex;
+          // no dictionary dims
+          byte[] noDictionaryKeyByIndex = 
wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+          // no dictionary primitive columns are expected to be in original 
data while loading,
+          // so convert it to original data
+          if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
+            Object dataFromBytes = DataTypeUtil
+                
.getDataBasedOnDataTypeForNoDictionaryColumn(noDictionaryKeyByIndex,
+                    dims.getDataType());
+            if (null != dataFromBytes && dims.getDataType() == 
DataTypes.TIMESTAMP) {
+              dataFromBytes = (long) dataFromBytes / 1000L;
+            }
+            preparedRow[i] = dataFromBytes;
+          } else {
+            preparedRow[i] = noDictionaryKeyByIndex;
+          }
         }
       }
     }
 
     // at last add implicit column position reference(PID)
+    preparedRow[i] = implicitColumnByteArray;
 
-    preparedRow[i] = wrapper.getImplicitColumnByteArray();
+    // In case of complex array type, get the flattened data based on 
dimension index and add
+    // it to the prepared row one by one and add for sorting.
+    // TODO Handle for nested array and other complex types
+    if (!complexDataMap.isEmpty()) {
+      Object[] firstRow = preparedRow;
+      for (Map.Entry<Integer, Object[]> dataEntry : complexDataMap.entrySet()) 
{
+        Object[] complexArrayData = dataEntry.getValue();
+        preparedRow[dataEntry.getKey()] = complexArrayData[0];
+        firstRow = preparedRow.clone();
+        if (complexArrayData.length != 1) {
+          for (int index = 1; index < complexArrayData.length; index++) {
+            preparedRow[dataEntry.getKey()] = complexArrayData[index];
+            addRowForSorting(preparedRow.clone());
+          }
+        }
+      }
+      preparedRow = firstRow;
+    }
     return preparedRow;
   }
 
   /**
+   * This method will return complex array primitive data
+   */
+  private Object getData(Object[] data, int index, DataType dataType) {
+    if (data.length == 0) {
+      return new byte[0];
+    } else if (data[0] == null) {
+      return CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
+    }
+    if (dataType == DataTypes.TIMESTAMP && null != data[index]) {
+      return (long) data[index] / 1000L;
+    } else if (dataType == DataTypes.DATE) {
+      return (int) data[index] + DateDirectDictionaryGenerator.cutOffDate;
+    }
+    return data[index];
+  }
+
+  /**
    * This method will read sort temp files, perform merge sort and add it to 
store for data loading
    */
   private void readAndLoadDataFromSortTempFiles() throws 
SecondaryIndexException {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index 3fac274..4492cd0 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -200,7 +200,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         .map(x => if (!x.isComplex) {
           x.getColName
         })
-      val dimNames = dims.map(x => if (!x.isComplex) {
+      val dimNames = dims.map(x => if (DataTypes.isArrayType(x.getDataType) || 
!x.isComplex) {
         x.getColName.toLowerCase()
       })
       val isMeasureColPresent = indexModel.columnNames.find(x => 
msrs.contains(x))
@@ -246,7 +246,8 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
 
       var isColsIndexedAsPerTable = true
       for (i <- indexModel.columnNames.indices) {
-        if (!dims(i).getColName.equalsIgnoreCase(indexModel.columnNames(i))) {
+        val mainTableDims = dims.sortBy(_.getOrdinal)
+        if 
(!mainTableDims(i).getColName.equalsIgnoreCase(indexModel.columnNames(i))) {
           isColsIndexedAsPerTable = false
         }
       }
@@ -443,10 +444,32 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       databaseName: String, tableName: String, indexTableName: String,
       absoluteTableIdentifier: AbsoluteTableIdentifier): TableInfo = {
     var schemaOrdinal = -1
-    var allColumns = indexModel.columnNames.map { indexCol =>
-      val colSchema = carbonTable.getDimensionByName(indexCol).getColumnSchema
+    var allColumns = List[ColumnSchema]()
+    var complexColumnExists = false
+    indexModel.columnNames.foreach { indexCol =>
+      val dimension = carbonTable.getDimensionByName(indexCol)
       schemaOrdinal += 1
-      cloneColumnSchema(colSchema, schemaOrdinal)
+      if (dimension.isComplex) {
+        if (complexColumnExists) {
+          throw new ErrorMessage(
+            "SI creation with more than one complex type is not supported yet")
+        }
+        if (dimension.getNumberOfChild > 0) {
+          val complexChildDims = dimension.getListOfChildDimensions.asScala
+          if (complexChildDims.exists(col => 
DataTypes.isArrayType(col.getDataType))) {
+            throw new ErrorMessage(
+              "SI creation with nested array complex type is not supported 
yet")
+          }
+        }
+        allColumns = allColumns :+ cloneColumnSchema(
+          dimension.getColumnSchema,
+          schemaOrdinal,
+          
dimension.getListOfChildDimensions.get(0).getColumnSchema.getDataType)
+        complexColumnExists = true
+      } else {
+        val colSchema = dimension.getColumnSchema
+        allColumns = allColumns :+ cloneColumnSchema(colSchema, schemaOrdinal)
+      }
     }
     // Setting TRUE on all sort columns
     allColumns.foreach(f => f.setSortColumn(true))
@@ -619,12 +642,24 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     columnSchema
   }
 
-  def cloneColumnSchema(parentColumnSchema: ColumnSchema, schemaOrdinal: Int): 
ColumnSchema = {
+  def cloneColumnSchema(parentColumnSchema: ColumnSchema,
+      schemaOrdinal: Int,
+      dataType: DataType = null): ColumnSchema = {
     val columnSchema = new ColumnSchema()
-    columnSchema.setDataType(parentColumnSchema.getDataType)
+    val encodingList = parentColumnSchema.getEncodingList
+    // if data type is arrayType, then store the column as its CHILD data type 
in SI
+    if (DataTypes.isArrayType(parentColumnSchema.getDataType)) {
+      columnSchema.setDataType(dataType)
+      if (dataType == DataTypes.DATE) {
+        encodingList.add(Encoding.DIRECT_DICTIONARY)
+        encodingList.add(Encoding.DICTIONARY)
+      }
+    } else {
+      columnSchema.setDataType(parentColumnSchema.getDataType)
+    }
     columnSchema.setColumnName(parentColumnSchema.getColumnName)
     columnSchema.setColumnProperties(parentColumnSchema.getColumnProperties)
-    columnSchema.setEncodingList(parentColumnSchema.getEncodingList)
+    columnSchema.setEncodingList(encodingList)
     columnSchema.setColumnUniqueId(parentColumnSchema.getColumnUniqueId)
     columnSchema.setColumnReferenceId(parentColumnSchema.getColumnReferenceId)
     columnSchema.setDimensionColumn(parentColumnSchema.isDimensionColumn)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
index b4c9d67..f39f18f 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/optimizer/CarbonSecondaryIndexOptimizer.scala
@@ -263,14 +263,24 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
       case SIUnaryFilterPushDownOperation(tableName, filterCondition) =>
         val attributeMap = indexTableAttributeMap.get(tableName).get
         var filterAttributes = indexJoinedFilterAttributes
-        val indexTableFilter = filterCondition transformDown {
+        val newFilterCondition = filterCondition transform {
+          case ArrayContains(left, right) =>
+            EqualTo(left, right)
+        }
+        val indexTableFilter = newFilterCondition transformDown {
+          case array: GetArrayItem =>
+            val attr = array.child.asInstanceOf[AttributeReference]
+            val attrNew = attributeMap.get(attr.name.toLowerCase()).get
+            filterAttributes += attr.name.toLowerCase
+            attrNew
           case attr: AttributeReference =>
             val attrNew = attributeMap.get(attr.name.toLowerCase()).get
             filterAttributes += attr.name.toLowerCase
             attrNew
         }
-        val positionReference =
-          
Seq(attributeMap(CarbonCommonConstants.POSITION_REFERENCE.toLowerCase()))
+        val positionRef = 
attributeMap(CarbonCommonConstants.POSITION_REFERENCE.toLowerCase())
+        var positionReference = Seq(positionRef)
+
         // Add Filter on logicalRelation
         var planTransform: LogicalPlan = Filter(indexTableFilter,
           indexTableToLogicalRelationMapping(tableName))
@@ -548,6 +558,8 @@ class CarbonSecondaryIndexOptimizer(sparkSession: 
SparkSession) {
           case _ =>
             (filterTree, condition, None)
         }
+      case And(ArrayContains(_, _), ArrayContains(_, _)) =>
+        (filterTree, condition, None)
       case and@And(left, right) =>
         val (newSIFilterTreeLeft, newLeft, tableNameLeft) =
           createIndexTableFilterCondition(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 2c3260a..adba486 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -491,7 +491,7 @@ object SecondaryIndexUtil {
         for (i <- 0 until factTableDimensions.size) {
           val dim = factTableDimensions.get(i)
           if (dim.getColumnId == indexTableDimension.getColumnId) {
-            if (isDictColsAlone && dim.hasEncoding(Encoding.DICTIONARY)) {
+            if (isDictColsAlone && 
indexTableDimension.hasEncoding(Encoding.DICTIONARY)) {
               dims.add(i)
             } else if (!isDictColsAlone) {
               dims.add(i)
diff --git a/integration/spark/src/test/resources/secindex/array.csv 
b/integration/spark/src/test/resources/secindex/array.csv
new file mode 100644
index 0000000..7fbc89a
--- /dev/null
+++ b/integration/spark/src/test/resources/secindex/array.csv
@@ -0,0 +1,4 @@
+1,'abc',china$india$us
+2,'xyz',sri$can
+3,'mno',rus$china
+4,'lok',hk$bang
\ No newline at end of file

Reply via email to