This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2acee9ea9 [KYUUBI #3601] [SPARK] Support infer columns for rebalance 
and sort
2acee9ea9 is described below

commit 2acee9ea977f7b5ae3d270be93ec0b2537eac4dd
Author: ulysses-you <[email protected]>
AuthorDate: Mon Oct 17 18:13:50 2022 +0800

    [KYUUBI #3601] [SPARK] Support infer columns for rebalance and sort
    
    ### _Why are the changes needed?_
    
    Improve the rebalance before writing rule.
    
    The rebalance before writing rule adds a rebalance at the top of query for 
data writing command, however the default partitioning of rebalance uses 
RoundRobinPartitioning which would break the original partitioning of data. It 
may cause the the output data size bigger than before.
    
    This pr supports infer the columns from join and aggregate for rebalance 
and sort to improve the compression ratio.
    
    Note that, this improvement only works for static partition writing.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3601 from ulysses-you/smart-order.
    
    Closes #3601
    
    c190dc1a [ulysses-you] docs
    995969b5 [ulysses-you] view
    ea23c417 [ulysses-you] Support infer columns for rebalance and sort
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 docs/extensions/engines/spark/rules.md             |  36 +++----
 .../kyuubi/sql/InferRebalanceAndSortOrders.scala   | 110 +++++++++++++++++++++
 .../apache/kyuubi/sql/RebalanceBeforeWriting.scala |  22 ++++-
 .../spark/sql/RebalanceBeforeWritingSuite.scala    |  87 +++++++++++++++-
 .../org/apache/kyuubi/sql/KyuubiSQLConf.scala      |  16 +++
 5 files changed, 251 insertions(+), 20 deletions(-)

diff --git a/docs/extensions/engines/spark/rules.md 
b/docs/extensions/engines/spark/rules.md
index 08750c8c9..a3c2678cf 100644
--- a/docs/extensions/engines/spark/rules.md
+++ b/docs/extensions/engines/spark/rules.md
@@ -47,11 +47,11 @@ And don't worry, Kyuubi will support the new Apache Spark 
version in the future.
 
 ## Usage
 
-| Kyuubi Spark SQL extension | Supported Spark version(s) | Available since  | 
EOL              | Bundled in Binary release tarball | Maven profile
-| -------------------------- | -------------------------- | ---------------- | 
---------------- | --------------------------------- | -------------
-| kyuubi-extension-spark-3-1 | 3.1.x                      | 1.3.0-incubating | 
N/A              | 1.3.0-incubating                  | spark-3.1
-| kyuubi-extension-spark-3-2 | 3.2.x                      | 1.4.0-incubating | 
N/A              | 1.4.0-incubating                  | spark-3.2
-| kyuubi-extension-spark-3-3 | 3.3.x                      | 1.6.0-incubating | 
N/A              | 1.6.0-incubating                  | spark-3.3
+| Kyuubi Spark SQL extension | Supported Spark version(s) | Available since  | 
EOL | Bundled in Binary release tarball | Maven profile |
+|----------------------------|----------------------------|------------------|-----|-----------------------------------|---------------|
+| kyuubi-extension-spark-3-1 | 3.1.x                      | 1.3.0-incubating | 
N/A | 1.3.0-incubating                  | spark-3.1     |
+| kyuubi-extension-spark-3-2 | 3.2.x                      | 1.4.0-incubating | 
N/A | 1.4.0-incubating                  | spark-3.2     |
+| kyuubi-extension-spark-3-3 | 3.3.x                      | 1.6.0-incubating | 
N/A | 1.6.0-incubating                  | spark-3.3     |
 
 1. Check the matrix that if you are using the supported Spark version, and 
find the corresponding Kyuubi Spark SQL Extension jar
 2. Get the Kyuubi Spark SQL Extension jar
@@ -68,15 +68,17 @@ Now, you can enjoy the Kyuubi SQL Extension.
 
 Kyuubi provides some configs to make these feature easy to use.
 
-Name | Default Value | Description | Since
---- | --- | --- | ---
-spark.sql.optimizer.insertRepartitionBeforeWrite.enabled | true | Add 
repartition node at the top of query plan. An approach of merging small files. 
| 1.2.0
-spark.sql.optimizer.insertRepartitionNum | none | The partition number if 
`spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE 
is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is 
enabled, the default value is none that means depend on AQE. | 1.2.0
-spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100 | The 
partition number of each dynamic partition if 
`spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will 
repartition by dynamic partition columns to reduce the small file but that can 
cause data skew. This config is to extend the partition of dynamic partition 
column to avoid skew but may generate some small files. | 1.2.0
-spark.sql.optimizer.forceShuffleBeforeJoin.enabled | false | Ensure shuffle 
node exists before shuffled join (shj and smj) to make AQE `OptimizeSkewedJoin` 
works (complex scenario join, multi table join). | 1.2.0
-spark.sql.optimizer.finalStageConfigIsolation.enabled | false | If true, the 
final stage support use different config with previous stage. The prefix of 
final stage config key should be `spark.sql.finalStage.`. For example, the raw 
spark config: `spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final 
stage config should be: 
`spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`. | 1.2.0
-spark.sql.analyzer.classification.enabled | false | When true, allows Kyuubi 
engine to judge this SQL's classification and set 
`spark.sql.analyzer.classification` back into sessionConf. Through this 
configuration item, Spark can optimizing configuration dynamic. | 1.4.0
-spark.sql.optimizer.insertZorderBeforeWriting.enabled | true | When true, we 
will follow target table properties to insert zorder or not. The key properties 
are: 1) `kyuubi.zorder.enabled`: if this property is true, we will insert 
zorder before writing data. 2) `kyuubi.zorder.cols`: string split by comma, we 
will zorder by these cols. | 1.4.0
-spark.sql.optimizer.zorderGlobalSort.enabled | true | When true, we do a 
global sort using zorder. Note that, it can cause data skew issue if the zorder 
columns have less cardinality. When false, we only do local sort using zorder. 
| 1.4.0
-spark.sql.watchdog.maxPartitions | none | Set the max partition number when 
spark scans a data source. Enable MaxPartitionStrategy by specifying this 
configuration. Add maxPartitions Strategy to avoid scan excessive partitions on 
partitioned table, it's optional that works with defined | 1.4.0
-spark.sql.optimizer.dropIgnoreNonExistent | false | When true, do not report 
an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a 
non-existent database/table/view/function/partition | 1.5.0
+| Name                                                        | Default Value 
| Description                                                                   
                                                                                
                                                                                
                                                                                
                       | Since |
+|-------------------------------------------------------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------|
+| spark.sql.optimizer.insertRepartitionBeforeWrite.enabled    | true          
| Add repartition node at the top of query plan. An approach of merging small 
files.                                                                          
                                                                                
                                                                                
                         | 1.2.0 |
+| spark.sql.optimizer.insertRepartitionNum                    | none          
| The partition number if 
`spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. If AQE 
is disabled, the default value is `spark.sql.shuffle.partitions`. If AQE is 
enabled, the default value is none that means depend on AQE.                    
                                                                                
   | 1.2.0 |
+| spark.sql.optimizer.dynamicPartitionInsertionRepartitionNum | 100           
| The partition number of each dynamic partition if 
`spark.sql.optimizer.insertRepartitionBeforeWrite.enabled` is enabled. We will 
repartition by dynamic partition columns to reduce the small file but that can 
cause data skew. This config is to extend the partition of dynamic partition 
column to avoid skew but may generate some small files. | 1.2.0 |
+| spark.sql.optimizer.forceShuffleBeforeJoin.enabled          | false         
| Ensure shuffle node exists before shuffled join (shj and smj) to make AQE 
`OptimizeSkewedJoin` works (complex scenario join, multi table join).           
                                                                                
                                                                                
                           | 1.2.0 |
+| spark.sql.optimizer.finalStageConfigIsolation.enabled       | false         
| If true, the final stage support use different config with previous stage. 
The prefix of final stage config key should be `spark.sql.finalStage.`. For 
example, the raw spark config: 
`spark.sql.adaptive.advisoryPartitionSizeInBytes`, then the final stage config 
should be: `spark.sql.finalStage.adaptive.advisoryPartitionSizeInBytes`.        
| 1.2.0 |
+| spark.sql.analyzer.classification.enabled                   | false         
| When true, allows Kyuubi engine to judge this SQL's classification and set 
`spark.sql.analyzer.classification` back into sessionConf. Through this 
configuration item, Spark can optimizing configuration dynamic.                 
                                                                                
                                  | 1.4.0 |
+| spark.sql.optimizer.insertZorderBeforeWriting.enabled       | true          
| When true, we will follow target table properties to insert zorder or not. 
The key properties are: 1) `kyuubi.zorder.enabled`: if this property is true, 
we will insert zorder before writing data. 2) `kyuubi.zorder.cols`: string 
split by comma, we will zorder by these cols.                                   
                                 | 1.4.0 |
+| spark.sql.optimizer.zorderGlobalSort.enabled                | true          
| When true, we do a global sort using zorder. Note that, it can cause data 
skew issue if the zorder columns have less cardinality. When false, we only do 
local sort using zorder.                                                        
                                                                                
                            | 1.4.0 |
+| spark.sql.watchdog.maxPartitions                            | none          
| Set the max partition number when spark scans a data source. Enable 
MaxPartitionStrategy by specifying this configuration. Add maxPartitions 
Strategy to avoid scan excessive partitions on partitioned table, it's optional 
that works with defined                                                         
                                        | 1.4.0 |
+| spark.sql.optimizer.dropIgnoreNonExistent                   | false         
| When true, do not report an error if DROP 
DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies a non-existent 
database/table/view/function/partition                                          
                                                                                
                                                                           | 
1.5.0 |
+| spark.sql.optimizer.inferRebalanceAndSortOrders.enabled     | false         
| When ture, infer columns for rebalance and sort orders from original query, 
e.g. the join keys from join. It can avoid compression ratio regression.        
                                                                                
                                                                                
                         | 1.7.0 |
+| spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns   | 3             
| The max columns of inferred columns.                                          
                                                                                
                                                                                
                                                                                
                       | 1.7.0 |
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
new file mode 100644
index 000000000..fcbf5c0a1
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/InferRebalanceAndSortOrders.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.sql
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeSet, Expression, NamedExpression, UnaryExpression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, 
LeftOuter, LeftSemi, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, Project, Sort, SubqueryAlias, View}
+
+/**
+ * Infer the columns for Rebalance and Sort to improve the compression ratio.
+ *
+ * For example
+ * {{{
+ *   INSERT INTO TABLE t PARTITION(p='a')
+ *   SELECT * FROM t1 JOIN t2 on t1.c1 = t2.c1
+ * }}}
+ * the inferred columns are: t1.c1
+ */
+object InferRebalanceAndSortOrders {
+
+  type PartitioningAndOrdering = (Seq[Expression], Seq[Expression])
+
+  private def getAliasMap(named: Seq[NamedExpression]): Map[Expression, 
Attribute] = {
+    @tailrec
+    def throughUnary(e: Expression): Expression = e match {
+      case u: UnaryExpression if u.deterministic =>
+        throughUnary(u.child)
+      case _ => e
+    }
+
+    named.flatMap {
+      case a @ Alias(child, _) =>
+        Some((throughUnary(child).canonicalized, a.toAttribute))
+      case _ => None
+    }.toMap
+  }
+
+  def infer(plan: LogicalPlan): Option[PartitioningAndOrdering] = {
+    def candidateKeys(
+        input: LogicalPlan,
+        output: AttributeSet = AttributeSet.empty): 
Option[PartitioningAndOrdering] = {
+      input match {
+        case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, _, _, _, _, _) 
=>
+          joinType match {
+            case LeftSemi | LeftAnti | LeftOuter => Some((leftKeys, leftKeys))
+            case RightOuter => Some((rightKeys, rightKeys))
+            case Inner | FullOuter =>
+              if (output.isEmpty) {
+                Some((leftKeys ++ rightKeys, leftKeys ++ rightKeys))
+              } else {
+                assert(leftKeys.length == rightKeys.length)
+                val keys = leftKeys.zip(rightKeys).flatMap { case (left, 
right) =>
+                  if (left.references.subsetOf(output)) {
+                    Some(left)
+                  } else if (right.references.subsetOf(output)) {
+                    Some(right)
+                  } else {
+                    None
+                  }
+                }
+                Some((keys, keys))
+              }
+            case _ => None
+          }
+        case agg: Aggregate =>
+          val aliasMap = getAliasMap(agg.aggregateExpressions)
+          Some((
+            agg.groupingExpressions.map(p => 
aliasMap.getOrElse(p.canonicalized, p)),
+            agg.groupingExpressions.map(o => 
aliasMap.getOrElse(o.canonicalized, o))))
+        case s: Sort => Some((s.order.map(_.child), s.order.map(_.child)))
+        case p: Project =>
+          val aliasMap = getAliasMap(p.projectList)
+          candidateKeys(p.child, p.references).map { case (partitioning, 
ordering) =>
+            (
+              partitioning.map(p => aliasMap.getOrElse(p.canonicalized, p)),
+              ordering.map(o => aliasMap.getOrElse(o.canonicalized, o)))
+          }
+        case f: Filter => candidateKeys(f.child, output)
+        case s: SubqueryAlias => candidateKeys(s.child, output)
+        case v: View => candidateKeys(v.child, output)
+
+        case _ => None
+      }
+    }
+
+    candidateKeys(plan).map { case (partitioning, ordering) =>
+      (
+        partitioning.filter(_.references.subsetOf(plan.outputSet)),
+        ordering.filter(_.references.subsetOf(plan.outputSet)))
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 8f7f17c4a..3cbacdd2f 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -18,14 +18,32 @@
 package org.apache.kyuubi.sql
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical._
 
 trait RepartitionBuilderWithRebalance extends RepartitionBuilder {
   override def buildRepartition(
       dynamicPartitionColumns: Seq[Attribute],
       query: LogicalPlan): LogicalPlan = {
-    RebalancePartitions(dynamicPartitionColumns, query)
+    if (!conf.getConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS) ||
+      dynamicPartitionColumns.nonEmpty) {
+      RebalancePartitions(dynamicPartitionColumns, query)
+    } else {
+      val maxColumns = 
conf.getConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS_MAX_COLUMNS)
+      val inferred = InferRebalanceAndSortOrders.infer(query)
+      if (inferred.isDefined) {
+        val (partitioning, ordering) = inferred.get
+        val rebalance = RebalancePartitions(partitioning.take(maxColumns), 
query)
+        if (ordering.nonEmpty) {
+          val sortOrders = ordering.take(maxColumns).map(o => SortOrder(o, 
Ascending))
+          Sort(sortOrders, false, rebalance)
+        } else {
+          rebalance
+        }
+      } else {
+        RebalancePartitions(dynamicPartitionColumns, query)
+      }
+    }
   }
 
   override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f1a27cdb8..859b6256c 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -18,13 +18,15 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.RebalancePartitions
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RebalancePartitions, Sort}
+import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.hive.HiveUtils
 import 
org.apache.spark.sql.hive.execution.OptimizedCreateHiveTableAsSelectCommand
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
 
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
+
   test("check rebalance exists") {
     def check(df: DataFrame, expectedRebalanceNum: Int = 1): Unit = {
       assert(
@@ -176,4 +178,87 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
       }
     }
   }
+
+  test("Infer rebalance and sorder orders") {
+    def checkShuffleAndSort(dataWritingCommand: LogicalPlan, sSize: Int, 
rSize: Int): Unit = {
+      assert(dataWritingCommand.isInstanceOf[DataWritingCommand])
+      val plan = dataWritingCommand.asInstanceOf[DataWritingCommand].query
+      assert(plan.collect {
+        case s: Sort => s
+      }.size == sSize)
+      assert(plan.collect {
+        case r: RebalancePartitions if r.partitionExpressions.size == rSize => 
r
+      }.nonEmpty || rSize == 0)
+    }
+
+    withView("v") {
+      withTable("t", "input1", "input2") {
+        withSQLConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS.key -> 
"true") {
+          sql(s"CREATE TABLE t (c1 int, c2 long) USING PARQUET PARTITIONED BY 
(p string)")
+          sql(s"CREATE TABLE input1 USING PARQUET AS SELECT * FROM 
VALUES(1,2),(1,3)")
+          sql(s"CREATE TABLE input2 USING PARQUET AS SELECT * FROM 
VALUES(1,3),(1,3)")
+          sql(s"CREATE VIEW v as SELECT col1, count(*) as col2 FROM input1 
GROUP BY col1")
+
+          val df0 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p='a')
+               |SELECT /*+ broadcast(input2) */ input1.col1, input2.col1
+               |FROM input1
+               |JOIN input2
+               |ON input1.col1 = input2.col1
+               |""".stripMargin)
+          checkShuffleAndSort(df0.queryExecution.analyzed, 1, 1)
+
+          val df1 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p='a')
+               |SELECT /*+ broadcast(input2) */ input1.col1, input1.col2
+               |FROM input1
+               |LEFT JOIN input2
+               |ON input1.col1 = input2.col1 and input1.col2 = input2.col2
+               |""".stripMargin)
+          checkShuffleAndSort(df1.queryExecution.analyzed, 1, 2)
+
+          val df2 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p='a')
+               |SELECT col1 as c1, count(*) as c2
+               |FROM input1
+               |GROUP BY col1
+               |HAVING count(*) > 0
+               |""".stripMargin)
+          checkShuffleAndSort(df2.queryExecution.analyzed, 1, 1)
+
+          // dynamic partition
+          val df3 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p)
+               |SELECT /*+ broadcast(input2) */ input1.col1, input1.col2, 
input1.col2
+               |FROM input1
+               |JOIN input2
+               |ON input1.col1 = input2.col1
+               |""".stripMargin)
+          checkShuffleAndSort(df3.queryExecution.analyzed, 0, 1)
+
+          // non-deterministic
+          val df4 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p='a')
+               |SELECT col1 + rand(), count(*) as c2
+               |FROM input1
+               |GROUP BY col1
+               |""".stripMargin)
+          checkShuffleAndSort(df4.queryExecution.analyzed, 0, 0)
+
+          // view
+          val df5 = sql(
+            s"""
+               |INSERT INTO TABLE t PARTITION(p='a')
+               |SELECT * FROM v
+               |""".stripMargin)
+          checkShuffleAndSort(df5.queryExecution.analyzed, 1, 1)
+        }
+      }
+    }
+  }
 }
diff --git 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
index d5ddb788c..f30ab2bc2 100644
--- 
a/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-common/src/main/scala/org/apache/kyuubi/sql/KyuubiSQLConf.scala
@@ -160,4 +160,20 @@ object KyuubiSQLConf {
       .version("1.5.0")
       .booleanConf
       .createWithDefault(false)
+
+  val INFER_REBALANCE_AND_SORT_ORDERS =
+    buildConf("spark.sql.optimizer.inferRebalanceAndSortOrders.enabled")
+      .doc("When ture, infer columns for rebalance and sort orders from 
original query, " +
+        "e.g. the join keys from join. It can avoid compression ratio 
regression.")
+      .version("1.7.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val INFER_REBALANCE_AND_SORT_ORDERS_MAX_COLUMNS =
+    buildConf("spark.sql.optimizer.inferRebalanceAndSortOrdersMaxColumns")
+      .doc("The max columns of inferred columns.")
+      .version("1.7.0")
+      .intConf
+      .checkValue(_ > 0, "must be positive number")
+      .createWithDefault(3)
 }

Reply via email to