aokolnychyi commented on a change in pull request #2141:
URL: https://github.com/apache/iceberg/pull/2141#discussion_r563973331



##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+  private SortOrderUtil() {
+  }
+
+  public static SortOrder buildSortOrder(Table table) {
+    return buildSortOrder(table.spec(), table.sortOrder());
+  }
+
+  public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder 
sortOrder) {
+    if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {

Review comment:
       That should work too. Will update.

##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+  private SortOrderUtil() {
+  }
+
+  public static SortOrder buildSortOrder(Table table) {
+    return buildSortOrder(table.spec(), table.sortOrder());
+  }
+
+  public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder 
sortOrder) {
+    if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
+      return sortOrder;
+    }
+
+    Schema schema = spec.schema();
+
+    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+    // build a sort prefix of partition fields that are not already in the 
sort order
+    SortOrder.Builder builder = SortOrder.builderFor(schema);

Review comment:
       Yeah.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -19,49 +19,30 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.iceberg.DistributionMode
-import org.apache.iceberg.TableProperties
-import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED
-import 
org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT
-import org.apache.iceberg.spark.Spark3Util.toClusteredDistribution
-import org.apache.iceberg.spark.Spark3Util.toOrderedDistribution
+import org.apache.iceberg.TableProperties.{MERGE_CARDINALITY_CHECK_ENABLED, 
MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT}
+import org.apache.iceberg.spark.Spark3Util
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.iceberg.util.PropertyUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.IsNull
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NullsFirst
-import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.FullOuter
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.LeftAnti
 import org.apache.spark.sql.catalyst.plans.RightOuter
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
-import org.apache.spark.sql.catalyst.plans.logical.DeleteAction
-import org.apache.spark.sql.catalyst.plans.logical.InsertAction
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.catalyst.plans.logical.JoinHint
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.MergeAction
-import org.apache.spark.sql.catalyst.plans.logical.MergeInto
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoParams
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
-import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.catalyst.plans.logical.Repartition
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, 
InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeInto, 
MergeIntoParams, MergeIntoTable, Project, Repartition, ReplaceData, 
UpdateAction}

Review comment:
       Oops, I missed this one. Will revert back.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)

Review comment:
       Could you elaborate a bit more? The interface in Spark 3.2 is 
implemented by `Write`, not `Table`. Are you thinking of passing `SparkTable` 
as an arg?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+        val ordering = Spark3Util.buildRequiredOrdering(distribution, 
iceberg.table)
+        val newChildPlan = distribution match {
+          case _: OrderedDistribution =>
+            // insert a round robin partitioning to avoid executing the join 
twice
+            val numShufflePartitions = conf.numShufflePartitions
+            Repartition(numShufflePartitions, shuffle = true, childPlan)

Review comment:
       Yes, we don't want to add a round-robin repartition during inserts, for 
example. I'll add more info.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName, 
int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
-  public static Distribution toOrderedDistribution(PartitionSpec spec, 
SortOrder sortOrder, boolean inferFromSpec) {
-    if (sortOrder.isUnsorted()) {
-      if (inferFromSpec) {
-        SortOrder specOrder = Partitioning.sortOrderFor(spec);
-        return Distributions.ordered(convert(specOrder));
-      }
-
-      return Distributions.unspecified();
-    }
-
-    Schema schema = spec.schema();
-    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
-    // build a sort prefix of partition fields that are not already in the 
sort order
-    SortOrder.Builder builder = SortOrder.builderFor(schema);
-    for (PartitionField field : spec.fields()) {
-      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
-      boolean isSorted = sortFields.stream().anyMatch(sortField ->
-          field.transform().equals(sortField.transform()) || 
sortField.transform().satisfiesOrderOf(field.transform()));
-      if (!isSorted) {
-        String sourceName = schema.findColumnName(field.sourceId());
-        
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, 
field.transform()));
-      }
+  public static Distribution 
buildRequiredDistribution(org.apache.iceberg.Table table) {
+    DistributionMode distributionMode = getDistributionMode(table);
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+      case HASH:
+        return Distributions.clustered(toTransforms(table.spec()));

Review comment:
       Good point, let me handle this.

##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+  private SortOrderUtil() {
+  }
+
+  public static SortOrder buildSortOrder(Table table) {
+    return buildSortOrder(table.spec(), table.sortOrder());
+  }
+
+  public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder 
sortOrder) {
+    if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {

Review comment:
       Done. Resolving this.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+        val ordering = Spark3Util.buildRequiredOrdering(distribution, 
iceberg.table)
+        val newChildPlan = distribution match {
+          case _: OrderedDistribution =>
+            // insert a round robin partitioning to avoid executing the join 
twice
+            val numShufflePartitions = conf.numShufflePartitions
+            Repartition(numShufflePartitions, shuffle = true, childPlan)

Review comment:
       Done. Could you check it once, @rdblue?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.spark.sql.{catalyst, AnalysisException}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{IcebergBucketTransform, 
IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, 
IcebergYearTransform, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RepartitionByExpression, Sort}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
+import org.apache.spark.sql.connector.expressions.{BucketTransform, 
DaysTransform, HoursTransform, MonthsTransform, NamedReference, Transform, 
YearsTransform}
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
IdentityTransform}
+import 
org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, 
Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, 
SortDirection, SortOrder}
+import org.apache.spark.sql.internal.SQLConf

Review comment:
       Done.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName, 
int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
-  public static Distribution toOrderedDistribution(PartitionSpec spec, 
SortOrder sortOrder, boolean inferFromSpec) {
-    if (sortOrder.isUnsorted()) {
-      if (inferFromSpec) {
-        SortOrder specOrder = Partitioning.sortOrderFor(spec);
-        return Distributions.ordered(convert(specOrder));
-      }
-
-      return Distributions.unspecified();
-    }
-
-    Schema schema = spec.schema();
-    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
-    // build a sort prefix of partition fields that are not already in the 
sort order
-    SortOrder.Builder builder = SortOrder.builderFor(schema);
-    for (PartitionField field : spec.fields()) {
-      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
-      boolean isSorted = sortFields.stream().anyMatch(sortField ->
-          field.transform().equals(sortField.transform()) || 
sortField.transform().satisfiesOrderOf(field.transform()));
-      if (!isSorted) {
-        String sourceName = schema.findColumnName(field.sourceId());
-        
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, 
field.transform()));
-      }
+  public static Distribution 
buildRequiredDistribution(org.apache.iceberg.Table table) {
+    DistributionMode distributionMode = getDistributionMode(table);
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+      case HASH:
+        return Distributions.clustered(toTransforms(table.spec()));

Review comment:
       Added a check at the beginning of this method. Could you check, @rdblue?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -19,49 +19,30 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.iceberg.DistributionMode
-import org.apache.iceberg.TableProperties
-import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED
-import 
org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT
-import org.apache.iceberg.spark.Spark3Util.toClusteredDistribution
-import org.apache.iceberg.spark.Spark3Util.toOrderedDistribution
+import org.apache.iceberg.TableProperties.{MERGE_CARDINALITY_CHECK_ENABLED, 
MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT}
+import org.apache.iceberg.spark.Spark3Util
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.iceberg.util.PropertyUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.IsNull
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NullsFirst
-import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.FullOuter
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.LeftAnti
 import org.apache.spark.sql.catalyst.plans.RightOuter
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
-import org.apache.spark.sql.catalyst.plans.logical.DeleteAction
-import org.apache.spark.sql.catalyst.plans.logical.InsertAction
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.catalyst.plans.logical.JoinHint
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.MergeAction
-import org.apache.spark.sql.catalyst.plans.logical.MergeInto
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoParams
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
-import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.catalyst.plans.logical.Repartition
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, 
InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeInto, 
MergeIntoParams, MergeIntoTable, Project, Repartition, ReplaceData, 
UpdateAction}

Review comment:
       Done.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/IcebergImplicits.scala
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.Table
+
+object IcebergImplicits {
+  implicit class TableHelper(table: Table) {
+    def asIcebergTable: org.apache.iceberg.Table = {

Review comment:
       I think it looks cleaner with implicits.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName, 
int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
-  public static Distribution toOrderedDistribution(PartitionSpec spec, 
SortOrder sortOrder, boolean inferFromSpec) {
-    if (sortOrder.isUnsorted()) {
-      if (inferFromSpec) {
-        SortOrder specOrder = Partitioning.sortOrderFor(spec);
-        return Distributions.ordered(convert(specOrder));
-      }
-
-      return Distributions.unspecified();
-    }
-
-    Schema schema = spec.schema();
-    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
-    // build a sort prefix of partition fields that are not already in the 
sort order
-    SortOrder.Builder builder = SortOrder.builderFor(schema);
-    for (PartitionField field : spec.fields()) {
-      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
-      boolean isSorted = sortFields.stream().anyMatch(sortField ->
-          field.transform().equals(sortField.transform()) || 
sortField.transform().satisfiesOrderOf(field.transform()));
-      if (!isSorted) {
-        String sourceName = schema.findColumnName(field.sourceId());
-        
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, 
field.transform()));
-      }
+  public static Distribution 
buildRequiredDistribution(org.apache.iceberg.Table table) {
+    DistributionMode distributionMode = getDistributionMode(table);
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+      case HASH:
+        return Distributions.clustered(toTransforms(table.spec()));

Review comment:
       You are right. Updated.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/IcebergImplicits.scala
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.Table
+
+object IcebergImplicits {
+  implicit class TableHelper(table: Table) {
+    def asIcebergTable: org.apache.iceberg.Table = {

Review comment:
       I think this logic will be needed in a few places so I moved it to 
`Spark3Util` and got rid of implicits.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to