This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b9f00b  [CARBONDATA-3994] Skip Order by for map task if it is sort 
column and use limit pushdown for array_contains filter
8b9f00b is described below

commit 8b9f00b72b4374fca80a5a0983b2707b58e29c75
Author: ajantha-bhat <[email protected]>
AuthorDate: Wed Sep 16 16:12:20 2020 +0530

    [CARBONDATA-3994] Skip Order by for map task if it is sort column and use 
limit pushdown for array_contains filter
    
    Why is this PR needed?
    To improve query performance in specific scenarios these changes are 
proposed.
    
    What changes were proposed in this PR?
    When the order by column is in sort column, every map task output will be 
already sorted. No need to sort the data again.
    Hence skipping the order at map task by changing plan node from 
TakeOrderedAndProject --> CarbonTakeOrderedAndProjectExec
    Also in this scenario collecting the limit at map task and Array_contains() 
will use this limit value for row scan filtering to break scan once limit value 
is reached.
    Also added a carbon property to control this .
    carbon.mapOrderPushDown.<db_name>_<table_name>.column
    
    Note: later we can improve this for other filters also to use the limit 
value.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3932
    
    Co-authored-by: li36909 <[email protected]>
---
 .../core/constants/CarbonCommonConstants.java      |   8 ++
 .../apache/carbondata/core/index/IndexFilter.java  |  15 +++
 .../carbondata/core/scan/filter/FilterUtil.java    |   6 +-
 .../executer/RowLevelFilterExecutorImpl.java       |  16 ++-
 ...velRangeGreaterThanEqualFilterExecutorImpl.java |   2 +-
 ...RowLevelRangeGreaterThanFilterExecutorImpl.java |   2 +-
 ...wLevelRangeLessThanEqualFilterExecutorImpl.java |   2 +-
 .../RowLevelRangeLessThanFilterExecutorImpl.java   |   2 +-
 .../resolver/RowLevelFilterResolverImpl.java       |  16 ++-
 .../apache/carbondata/core/util/SessionParams.java |   2 +
 docs/configuration-parameters.md                   |   4 +-
 .../filter/executor/PolygonFilterExecutorImpl.java |   2 +-
 .../spark/sql/CarbonDatasourceHadoopRelation.scala |  10 +-
 .../CarbonTakeOrderedAndProjectExec.scala          | 125 +++++++++++++++++++++
 .../strategy/CarbonLateDecodeStrategy.scala        |  83 ++++++++++++++
 .../complexType/TestArrayContainsPushDown.scala    |  24 ++++
 16 files changed, 304 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index d9ed63a..15139ec 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2575,4 +2575,12 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_REORDER_FILTER_DEFAULT = "true";
 
+  /**
+   * If order by column is in sort column,
+   * specify that sort column here to avoid ordering at map task.
+   * Also the limit value can be used for row scanning to scan the data only 
till the limit.
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_MAP_ORDER_PUSHDOWN = 
"carbon.mapOrderPushDown";
+
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java 
b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
index cbb41c1..c3fd782 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexFilter.java
@@ -38,6 +38,7 @@ import 
org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptimizer;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
@@ -62,6 +63,9 @@ public class IndexFilter implements Serializable {
 
   private SegmentProperties properties;
 
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown 
is enabled
+  private int limit = -1;
+
   public IndexFilter(CarbonTable table, Expression expression) {
     this(table, expression, false);
   }
@@ -91,6 +95,14 @@ public class IndexFilter implements Serializable {
     resolve(false);
   }
 
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
   Expression getNewCopyOfExpression() {
     if (expression != null) {
       try {
@@ -189,6 +201,9 @@ public class IndexFilter implements Serializable {
   public FilterResolverIntf getResolver() {
     if (resolver == null) {
       resolver = resolveFilter();
+      if (resolver instanceof RowLevelFilterResolverImpl) {
+        ((RowLevelFilterResolverImpl)resolver).setLimit(limit);
+      }
     }
     return resolver;
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index af8866b..9dc6b5b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -203,7 +203,8 @@ public final class FilterUtil {
                   .getMsrColEvalutorInfoList(),
               ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getFilterExpresion(),
               ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
-              segmentProperties, complexDimensionInfoMap);
+              segmentProperties, complexDimensionInfoMap,
+              ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getLimit());
 
       }
     }
@@ -212,7 +213,8 @@ public final class FilterUtil {
         ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getMsrColEvalutorInfoList(),
         ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getFilterExpresion(),
         ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getTableIdentifier(),
-        segmentProperties, complexDimensionInfoMap);
+        segmentProperties, complexDimensionInfoMap,
+        ((RowLevelFilterResolverImpl) 
filterExpressionResolverTree).getLimit());
 
   }
 
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
index de31fdd..7ba181a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecutorImpl.java
@@ -113,10 +113,15 @@ public class RowLevelFilterExecutorImpl implements 
FilterExecutor {
    */
   private DirectDictionaryGenerator dateDictionaryGenerator;
 
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown 
is enabled
+  // TODO: right now this is used only for Array_contains() filter,
+  // later we can make use of it for all row level filters.
+  private int limit;
+
   public RowLevelFilterExecutorImpl(List<DimColumnResolvedFilterInfo> 
dimColEvaluatorInfoList,
       List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
       AbsoluteTableIdentifier tableIdentifier, SegmentProperties 
segmentProperties,
-      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+      Map<Integer, GenericQueryType> complexDimensionInfoMap, int limit) {
     this.segmentProperties = segmentProperties;
     if (null == dimColEvaluatorInfoList) {
       this.dimColEvaluatorInfoList = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -147,6 +152,7 @@ public class RowLevelFilterExecutorImpl implements 
FilterExecutor {
     this.complexDimensionInfoMap = complexDimensionInfoMap;
     this.dateDictionaryGenerator =
         
DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(DataTypes.DATE);
+    this.limit = limit;
     initDimensionChunkIndexes();
     initMeasureChunkIndexes();
   }
@@ -266,8 +272,12 @@ public class RowLevelFilterExecutorImpl implements 
FilterExecutor {
           literalExpDataType);
       ArrayQueryType complexType =
           (ArrayQueryType) complexDimensionInfoMap.get(dimensionChunkIndex[0]);
+      int totalCount = 0;
       // check all the pages
       for (int i = 0; i < pageNumbers; i++) {
+        if (limit != -1 && totalCount >= limit) {
+          break;
+        }
         BitSet set = new BitSet(numberOfRows[i]);
         int[][] numberOfChild = complexType
             
.getNumberOfChild(rawBlockletColumnChunks.getDimensionRawColumnChunks(), null,
@@ -277,12 +287,16 @@ public class RowLevelFilterExecutorImpl implements 
FilterExecutor {
                 null, i);
         // check every row
         for (int index = 0; index < numberOfRows[i]; index++) {
+          if (limit != -1 && totalCount >= limit) {
+            break;
+          }
           int dataOffset = numberOfChild[index][1];
           // loop the children
           for (int j = 0; j < numberOfChild[index][0]; j++) {
             byte[] obj = page.getChunkData(dataOffset++);
             if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(obj, 
filterValueInBytes) == 0) {
               set.set(index);
+              totalCount++;
               break;
             }
           }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
index 2326334..3857a2b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanEqualFilterExecutorImpl.java
@@ -62,7 +62,7 @@ public class RowLevelRangeGreaterThanEqualFilterExecutorImpl 
extends RowLevelFil
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvalutorInfoList.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
index 0849e5e..50ce0ca 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGreaterThanFilterExecutorImpl.java
@@ -62,7 +62,7 @@ public class RowLevelRangeGreaterThanFilterExecutorImpl 
extends RowLevelFilterEx
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvoluatorInfoList, exp, 
tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!this.msrColEvalutorInfoList.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
index b22a91d..f4800be 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecutorImpl.java
@@ -61,7 +61,7 @@ public class RowLevelRangeLessThanEqualFilterExecutorImpl 
extends RowLevelFilter
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvalutorInfoList.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
index e2cddca..125e3a9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFilterExecutorImpl.java
@@ -61,7 +61,7 @@ public class RowLevelRangeLessThanFilterExecutorImpl extends 
RowLevelFilterExecu
       AbsoluteTableIdentifier tableIdentifier, byte[][] filterRangeValues,
       Object[] msrFilterRangeValues, SegmentProperties segmentProperties) {
     super(dimColEvaluatorInfoList, msrColEvaluatorInfoList, exp, 
tableIdentifier, segmentProperties,
-        null);
+        null, -1);
     this.filterRangeValues = filterRangeValues;
     this.msrFilterRangeValues = msrFilterRangeValues;
     if (!msrColEvaluatorInfoList.isEmpty()) {
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
index 3672f2d..8e22c9b 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/RowLevelFilterResolverImpl.java
@@ -36,17 +36,25 @@ public class RowLevelFilterResolverImpl extends 
ConditionalFilterResolverImpl {
   private List<DimColumnResolvedFilterInfo> dimColEvaluatorInfoList;
   private List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList;
   private AbsoluteTableIdentifier tableIdentifier;
+  // limit value used for row scanning, collected when carbon.mapOrderPushDown 
is enabled
+  private int limit = -1;
 
   public RowLevelFilterResolverImpl(Expression exp, boolean 
isExpressionResolve,
       boolean isIncludeFilter, AbsoluteTableIdentifier tableIdentifier) {
     super(exp, isExpressionResolve, isIncludeFilter, false);
-    dimColEvaluatorInfoList =
-        new 
ArrayList<DimColumnResolvedFilterInfo>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    msrColEvalutorInfoList = new ArrayList<MeasureColumnResolvedFilterInfo>(
-        CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    dimColEvaluatorInfoList = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    msrColEvalutorInfoList = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     this.tableIdentifier = tableIdentifier;
   }
 
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
   /**
    * Method which will resolve the filter expression by converting the filter 
member
    * to its assigned dictionary values.
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 0fd4e82..c611256 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -229,6 +229,8 @@ public class SessionParams implements Serializable, 
Cloneable {
           }
         } else if (key.equalsIgnoreCase(CARBON_REORDER_FILTER)) {
           isValid = true;
+        } else if (key.startsWith(CARBON_MAP_ORDER_PUSHDOWN)) {
+          isValid = true;
         } else {
           throw new InvalidConfigurationException(
               "The key " + key + " not supported for dynamic configuration.");
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 70fc3f3..f38983b 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -147,6 +147,7 @@ This section provides the details of all the configurations 
required for the Car
 | carbon.si.lookup.partialstring | true | When true, it includes starts with, 
ends with and contains. When false, it includes only starts with secondary 
indexes. |
 | carbon.max.pagination.lru.cache.size.in.mb | -1 | Maximum memory **(in MB)** 
upto which the SDK pagination reader can cache the blocklet rows. Suggest to 
configure as multiple of blocklet size. Default value of -1 means there is no 
memory limit for caching. Only integer values greater than 0 are accepted. |
 | carbon.partition.max.driver.lru.cache.size | -1 | Maximum memory **(in MB)** 
upto which driver can cache partition metadata. Beyond this, least recently 
used data will be removed from cache before loading new set of values.
+| carbon.mapOrderPushDown.<db_name>_<table_name>.column| empty | If order by 
column is in sort column, specify that sort column here to avoid ordering at 
map task . |
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
@@ -231,7 +232,8 @@ RESET
 | carbon.index.visible.<db_name>.<table_name>.<index_name> | To specify query 
on ***db_name.table_name*** to not use the index ***index_name***. |
 | carbon.load.indexes.parallel.<db_name>.<table_name> | To enable parallel 
index loading for a table. when db_name.table_name are not specified, i.e., 
when ***carbon.load.indexes.parallel.*** is set, it applies for all the tables 
of the session. |
 | carbon.enable.index.server                | To use index server for caching 
and pruning. This property can be used for a session or for a particular table 
with ***carbon.enable.index.server.<db_name>.<table_name>***. |
-| carbon.reorder.filter                     | This property can be used to 
enabled/disable filter reordering. Should be disabled only when the user has 
optimized the filter condition.
+| carbon.reorder.filter                     | This property can be used to 
enabled/disable filter reordering. Should be disabled only when the user has 
optimized the filter condition. | 
+| carbon.mapOrderPushDown.<db_name>_<table_name>.column | If order by column 
is in sort column, specify that sort column here to avoid ordering at map task 
. |
 **Examples:**
 
 * Add or Update:
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
index 230884e..c5a7317 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -41,7 +41,7 @@ public class PolygonFilterExecutorImpl extends 
RowLevelFilterExecutorImpl {
       AbsoluteTableIdentifier tableIdentifier, SegmentProperties 
segmentProperties,
       Map<Integer, GenericQueryType> complexDimensionInfoMap) {
     super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
-        complexDimensionInfoMap);
+        complexDimensionInfoMap, -1);
   }
 
   private int getNearestRangeIndex(List<Long[]> ranges, long searchForNumber) {
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b0b0742..10335d9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,7 +46,8 @@ case class CarbonDatasourceHadoopRelation(
     sparkSession: SparkSession,
     paths: Array[String],
     parameters: Map[String, String],
-    tableSchema: Option[StructType])
+    tableSchema: Option[StructType],
+    limit: Int = -1)
   extends BaseRelation with InsertableRelation {
 
   val caseInsensitiveMap: Map[String, String] = parameters.map(f => 
(f._1.toLowerCase, f._2))
@@ -93,10 +94,15 @@ case class CarbonDatasourceHadoopRelation(
     }
 
     val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics
+    val filter = filterExpression.map(new IndexFilter(carbonTable, _, 
true)).orNull
+    if (filter != null && filters.length == 1) {
+      // push down the limit if only one filter
+      filter.setLimit(limit)
+    }
     new CarbonScanRDD(
       sparkSession,
       projection,
-      filterExpression.map(new IndexFilter(carbonTable, _, true)).orNull,
+      filter,
       identifier,
       carbonTable.getTableInfo.serialize(),
       carbonTable.getTableInfo,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala
new file mode 100644
index 0000000..5a5e93a
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/CarbonTakeOrderedAndProjectExec.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, 
SortOrder, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.util.Utils
+
+// To skip the order at map task
+case class CarbonTakeOrderedAndProjectExec(
+    limit: Int,
+    sortOrder: Seq[SortOrder],
+    projectList: Seq[NamedExpression],
+    child: SparkPlan,
+    skipMapOrder: Boolean = false,
+    readFromHead: Boolean = true) extends UnaryExecNode {
+
+  private val serializer: Serializer = new 
UnsafeRowSerializer(child.output.size)
+
+  override def executeCollect(): Array[InternalRow] = {
+    val ordering = new LazilyGeneratedOrdering(sortOrder, child.output)
+    val rdd = child.execute().map(_.copy())
+    val data = takeOrdered(rdd, limit, skipMapOrder, readFromHead)(ordering)
+    if (projectList != child.output) {
+      val projection = UnsafeProjection.create(projectList, child.output)
+      data.map(r => projection(r).copy())
+    } else {
+      data
+    }
+  }
+
+  def takeOrdered(rdd: RDD[InternalRow],
+      num: Int,
+      skipMapOrder: Boolean = false,
+      readFromHead: Boolean = true)(implicit ord: Ordering[InternalRow]): 
Array[InternalRow] = {
+    if (!skipMapOrder) {
+      return rdd.takeOrdered(num)(ord)
+    }
+    // new ShuffledRowRDD by skipping the order at map task as column data is 
already sorted
+    if (num == 0) {
+      Array.empty
+    } else {
+      val mapRDDs = rdd.mapPartitions { items =>
+        if (readFromHead) {
+          items.slice(0, num)
+        } else {
+          items.drop(items.size - num)
+        }
+      }
+      if (mapRDDs.partitions.length == 0) {
+        Array.empty
+      } else {
+        mapRDDs.collect().sorted(ord)
+      }
+    }
+  }
+
+  override def outputOrdering: Seq[SortOrder] = sortOrder
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override def simpleString: String = {
+    val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
+    val outputString = Utils.truncatedString(output, "[", ",", "]")
+
+    s"CarbonTakeOrderedAndProjectExec(limit=$limit, orderBy=$orderByString, " +
+    s"skipMapOrder=$skipMapOrder, readFromHead=$readFromHead, 
output=$outputString)"
+  }
+
+  override def output: Seq[Attribute] = {
+    projectList.map(_.toAttribute)
+  }
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
+    val localTopK: RDD[InternalRow] = {
+      child.execute().map(_.copy()).mapPartitions { iter =>
+        if (skipMapOrder) {
+          // new ShuffledRowRDD by skipping the order at map task as column 
data is already sorted
+          if (readFromHead) {
+            iter.slice(0, limit)
+          } else {
+            iter.drop(iter.size - limit)
+          }
+        } else {
+          org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
+        }
+      }
+    }
+    // update with modified RDD (localTopK)
+    val shuffled = new ShuffledRowRDD(
+      ShuffleExchangeExec.prepareShuffleDependency(
+        localTopK, child.output, SinglePartition, serializer))
+    shuffled.mapPartitions { iter =>
+      val topK = 
org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), 
limit)(ord)
+      if (projectList != child.output) {
+        val projection = UnsafeProjection.create(projectList, child.output)
+        topK.map(r => projection(r))
+      } else {
+        topK
+      }
+    }
+  }
+
+}
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 415e19a..4b062e9 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.strategy
 
+import java.util.Locale
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -187,6 +189,8 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
           planLater(right),
           condition)
         condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) 
:: Nil
+      case ExtractTakeOrderedAndProjectExec(carbonTakeOrderedAndProjectExec) =>
+        carbonTakeOrderedAndProjectExec :: Nil
       case _ => Nil
     }
   }
@@ -982,6 +986,85 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
           null)(sparkSession)
     }
   }
+
+  object ExtractTakeOrderedAndProjectExec {
+
+    def unapply(plan: LogicalPlan): Option[CarbonTakeOrderedAndProjectExec] = {
+      val allRelations = plan.collect { case logicalRelation: LogicalRelation 
=> logicalRelation }
+      // push down order by limit to carbon map task,
+      // only when there are only one CarbonDatasourceHadoopRelation
+      if (allRelations.size != 1 ||
+          allRelations.exists(x => 
!x.relation.isInstanceOf[CarbonDatasourceHadoopRelation])) {
+        return None
+      }
+      //  check and Replace TakeOrderedAndProject (physical plan node for 
order by + limit)
+      //  with CarbonTakeOrderedAndProjectExec.
+      val relation = 
allRelations.head.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+      plan match {
+        case ReturnAnswer(rootPlan) => rootPlan match {
+          case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+            carbonTakeOrder(relation, limit,
+              order,
+              child.output,
+              planLater(pushLimit(limit, child)))
+          case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
+            carbonTakeOrder(relation, limit, order, projectList, 
planLater(pushLimit(limit, child)))
+          case _ => None
+        }
+        case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
+          carbonTakeOrder(relation, limit, order, child.output, 
planLater(pushLimit(limit, child)))
+        case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, 
true, child))) =>
+          carbonTakeOrder(relation, limit, order, projectList, 
planLater(pushLimit(limit, child)))
+        case _ => None
+      }
+    }
+
+    private def carbonTakeOrder(relation: CarbonDatasourceHadoopRelation,
+        limit: Int,
+        orders: Seq[SortOrder],
+        projectList: Seq[NamedExpression],
+        child: SparkPlan): Option[CarbonTakeOrderedAndProjectExec] = {
+      val latestOrder = orders.last
+      val fromHead: Boolean = latestOrder.direction match {
+        case Ascending => true
+        case Descending => false
+      }
+      val (columnName, canPushDown) = latestOrder.child match {
+        case attr: AttributeReference => (attr.name, true)
+        case Alias(AttributeReference(name, _, _, _), _) => (name, true)
+        case _ => (null, false)
+      }
+      val mapOrderPushDown = CarbonProperties.getInstance.getProperty(
+        CarbonCommonConstants.CARBON_MAP_ORDER_PUSHDOWN + "." +
+        s"${ relation.carbonTable.getTableUniqueName.toLowerCase(Locale.ROOT) 
}.column")
+      // when this property is enabled and order by column is in sort column,
+      // enable limit push down to map task, row scanner can use this limit.
+      val sortColumns = relation.carbonTable.getSortColumns
+      if (mapOrderPushDown != null && canPushDown && sortColumns.size() > 0
+          && sortColumns.get(0).equalsIgnoreCase(columnName)
+          && mapOrderPushDown.equalsIgnoreCase(columnName)) {
+        // Replace TakeOrderedAndProject (which comes after physical plan with 
limit and order by)
+        // with CarbonTakeOrderedAndProjectExec.
+        // which will skip the order at map task as column data is already 
sorted
+        Some(CarbonTakeOrderedAndProjectExec(limit, orders, projectList, 
child, skipMapOrder =
+          true, readFromHead = fromHead))
+      } else {
+        None
+      }
+    }
+
+    def pushLimit(limit: Int, plan: LogicalPlan): LogicalPlan = {
+      val newPlan = plan transform {
+        case lr: LogicalRelation =>
+          val newRelation = lr.copy(relation = lr.relation
+            .asInstanceOf[CarbonDatasourceHadoopRelation]
+            .copy(limit = limit))
+          newRelation
+        case other => other
+      }
+      newPlan
+    }
+  }
 }
 
 class CarbonPhysicalPlanException extends Exception
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
index 226e05c..8ff94da 100644
--- 
a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
+++ 
b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestArrayContainsPushDown.scala
@@ -66,6 +66,30 @@ class TestArrayContainsPushDown extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table complex1")
   }
 
+  test("test array contains push down using limit " +
+       "in row level scan when order by column is sort column") {
+    sql("drop table if exists complex1")
+
+    sql("create table complex1 (id int, arr array<String>) " +
+        "stored as carbondata TBLPROPERTIES ('SORT_COLUMNS'='id')")
+    sql("insert into complex1 select 1, array('as') union all " +
+        "select 2, array('sd','df','gh') union all " +
+        "select 3, array('rt','ew','rtyu','jk',null) union all " +
+        "select 4, array('ghsf','dbv','','ty') union all " +
+        "select 5, array('hjsd','fggb','nhj','sd','asd')")
+
+    // enable the property
+    CarbonProperties.getInstance()
+      .addProperty("carbon.mapOrderPushDown.default_complex1.column", "id")
+    // check for CarbonTakeOrderedAndProjectExec node in plan
+    checkExistence(sql(
+      " explain select * from complex1 where array_contains(arr,'sd') order by 
id limit 1"),
+      true,
+      "CarbonTakeOrderedAndProjectExec")
+    
CarbonProperties.getInstance().removeProperty("carbon.mapOrderPushDown.default_complex1.column")
+    sql("drop table complex1")
+  }
+
   test("test array contains pushdown for array of boolean") {
     sql("drop table if exists complex1")
     sql("create table complex1 (arr array<boolean>) stored as carbondata")

Reply via email to