rdblue commented on a change in pull request #2141:
URL: https://github.com/apache/iceberg/pull/2141#discussion_r563949252



##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)

Review comment:
       Can we expose the same interface as in Spark 3.2 from `Table` and use 
that here instead? Then `Spark3Util` calls remain in `SparkTable`.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -19,49 +19,30 @@
 
 package org.apache.spark.sql.catalyst.optimizer
 
-import org.apache.iceberg.DistributionMode
-import org.apache.iceberg.TableProperties
-import org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED
-import 
org.apache.iceberg.TableProperties.MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT
-import org.apache.iceberg.spark.Spark3Util.toClusteredDistribution
-import org.apache.iceberg.spark.Spark3Util.toOrderedDistribution
+import org.apache.iceberg.TableProperties.{MERGE_CARDINALITY_CHECK_ENABLED, 
MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT}
+import org.apache.iceberg.spark.Spark3Util
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.iceberg.util.PropertyUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.expressions.Ascending
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.expressions.IsNull
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.expressions.NullsFirst
-import org.apache.spark.sql.catalyst.expressions.SortOrder
 import org.apache.spark.sql.catalyst.plans.FullOuter
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.LeftAnti
 import org.apache.spark.sql.catalyst.plans.RightOuter
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
-import org.apache.spark.sql.catalyst.plans.logical.DeleteAction
-import org.apache.spark.sql.catalyst.plans.logical.InsertAction
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.catalyst.plans.logical.JoinHint
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.MergeAction
-import org.apache.spark.sql.catalyst.plans.logical.MergeInto
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoParams
-import org.apache.spark.sql.catalyst.plans.logical.MergeIntoTable
-import org.apache.spark.sql.catalyst.plans.logical.Project
-import org.apache.spark.sql.catalyst.plans.logical.Repartition
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.ReplaceData
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.catalyst.plans.logical.UpdateAction
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, 
InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeInto, 
MergeIntoParams, MergeIntoTable, Project, Repartition, ReplaceData, 
UpdateAction}

Review comment:
       Can you revert this change? I'll open a PR to update the scala style 
checks.

##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+  private SortOrderUtil() {
+  }
+
+  public static SortOrder buildSortOrder(Table table) {
+    return buildSortOrder(table.spec(), table.sortOrder());
+  }
+
+  public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder 
sortOrder) {
+    if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {

Review comment:
       Should this return `SortOrder.unsorted()` instead?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+        val ordering = Spark3Util.buildRequiredOrdering(distribution, 
iceberg.table)
+        val newChildPlan = distribution match {
+          case _: OrderedDistribution =>
+            // insert a round robin partitioning to avoid executing the join 
twice
+            val numShufflePartitions = conf.numShufflePartitions
+            Repartition(numShufflePartitions, shuffle = true, childPlan)

Review comment:
       This is here and not in `prepareQuery` because we don't want to assume 
that a global ordering always requires an extra round-robin repartition?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)
+        val ordering = Spark3Util.buildRequiredOrdering(distribution, 
iceberg.table)
+        val newChildPlan = distribution match {
+          case _: OrderedDistribution =>
+            // insert a round robin partitioning to avoid executing the join 
twice
+            val numShufflePartitions = conf.numShufflePartitions
+            Repartition(numShufflePartitions, shuffle = true, childPlan)

Review comment:
       This is here and not in `prepareQuery` because we don't want to assume 
that a global ordering always requires an extra round-robin repartition? If so, 
it would be good to move the comment above `newChildPlan` and make it a bit 
more clear why this extra step is happening.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.spark.sql.{catalyst, AnalysisException}
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{IcebergBucketTransform, 
IcebergDayTransform, IcebergHourTransform, IcebergMonthTransform, 
IcebergYearTransform, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
RepartitionByExpression, Sort}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.quoteIfNeeded
+import org.apache.spark.sql.connector.expressions.{BucketTransform, 
DaysTransform, HoursTransform, MonthsTransform, NamedReference, Transform, 
YearsTransform}
+import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
IdentityTransform}
+import 
org.apache.spark.sql.connector.iceberg.distributions.{ClusteredDistribution, 
Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.connector.iceberg.expressions.{NullOrdering, 
SortDirection, SortOrder}
+import org.apache.spark.sql.internal.SQLConf

Review comment:
       Could you rewrite imports to one per line?

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName, 
int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
-  public static Distribution toOrderedDistribution(PartitionSpec spec, 
SortOrder sortOrder, boolean inferFromSpec) {
-    if (sortOrder.isUnsorted()) {
-      if (inferFromSpec) {
-        SortOrder specOrder = Partitioning.sortOrderFor(spec);
-        return Distributions.ordered(convert(specOrder));
-      }
-
-      return Distributions.unspecified();
-    }
-
-    Schema schema = spec.schema();
-    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
-    // build a sort prefix of partition fields that are not already in the 
sort order
-    SortOrder.Builder builder = SortOrder.builderFor(schema);
-    for (PartitionField field : spec.fields()) {
-      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
-      boolean isSorted = sortFields.stream().anyMatch(sortField ->
-          field.transform().equals(sortField.transform()) || 
sortField.transform().satisfiesOrderOf(field.transform()));
-      if (!isSorted) {
-        String sourceName = schema.findColumnName(field.sourceId());
-        
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, 
field.transform()));
-      }
+  public static Distribution 
buildRequiredDistribution(org.apache.iceberg.Table table) {
+    DistributionMode distributionMode = getDistributionMode(table);
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+      case HASH:
+        return Distributions.clustered(toTransforms(table.spec()));

Review comment:
       Is it correct to return a clustered distribution with no expressions if 
the spec is unpartitioned? I think I would rather return 
`Distributions.unspecified` just to be safe when passing this back to Spark.

##########
File path: core/src/main/java/org/apache/iceberg/util/SortOrderUtil.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimap;
+import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+
+public class SortOrderUtil {
+
+  private SortOrderUtil() {
+  }
+
+  public static SortOrder buildSortOrder(Table table) {
+    return buildSortOrder(table.spec(), table.sortOrder());
+  }
+
+  public static SortOrder buildSortOrder(PartitionSpec spec, SortOrder 
sortOrder) {
+    if (sortOrder.isUnsorted() && spec.isUnpartitioned()) {
+      return sortOrder;
+    }
+
+    Schema schema = spec.schema();
+
+    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
+
+    // build a sort prefix of partition fields that are not already in the 
sort order
+    SortOrder.Builder builder = SortOrder.builderFor(schema);

Review comment:
       The logic here is unchanged from what was in `Spark3Util`, right?

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -233,47 +215,23 @@ case class RewriteMergeInto(spark: SparkSession) extends 
Rule[LogicalPlan] with
     !(actions.size == 1 && hasUnconditionalDelete(actions.headOption))
   }
 
-  def buildWritePlan(
-     childPlan: LogicalPlan,
-     table: Table): LogicalPlan = {
-    val defaultDistributionMode = table match {
-      case iceberg: SparkTable if !iceberg.table.sortOrder.isUnsorted =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_RANGE
-      case _ =>
-        TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT
-    }
-
+  private def buildWritePlan(childPlan: LogicalPlan, table: Table): 
LogicalPlan = {
     table match {
-      case iceTable: SparkTable =>
-        val numShufflePartitions = spark.sessionState.conf.numShufflePartitions
-        val table = iceTable.table()
-        val distributionMode: String = table.properties
-          .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, 
defaultDistributionMode)
-        val order = toCatalyst(toOrderedDistribution(table.spec(), 
table.sortOrder(), true), childPlan)
-        DistributionMode.fromName(distributionMode) match {
-          case DistributionMode.NONE =>
-            Sort(buildSortOrder(order), global = false, childPlan)
-          case DistributionMode.HASH =>
-            val clustering = toCatalyst(toClusteredDistribution(table.spec()), 
childPlan)
-            val hashPartitioned = RepartitionByExpression(clustering, 
childPlan, numShufflePartitions)
-            Sort(buildSortOrder(order), global = false, hashPartitioned)
-          case DistributionMode.RANGE =>
-            val roundRobin = Repartition(numShufflePartitions, shuffle = true, 
childPlan)
-            Sort(buildSortOrder(order), global = true, roundRobin)
+      case iceberg: SparkTable =>
+        val distribution = Spark3Util.buildRequiredDistribution(iceberg.table)

Review comment:
       You're right. I was thinking it was on Table instead. Let's go with this 
then.

##########
File path: spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
##########
@@ -279,46 +280,39 @@ public Transform unknown(int fieldId, String sourceName, 
int sourceId, String tr
     return transforms.toArray(new Transform[0]);
   }
 
-  public static Distribution toOrderedDistribution(PartitionSpec spec, 
SortOrder sortOrder, boolean inferFromSpec) {
-    if (sortOrder.isUnsorted()) {
-      if (inferFromSpec) {
-        SortOrder specOrder = Partitioning.sortOrderFor(spec);
-        return Distributions.ordered(convert(specOrder));
-      }
-
-      return Distributions.unspecified();
-    }
-
-    Schema schema = spec.schema();
-    Multimap<Integer, SortField> sortFieldIndex = 
Multimaps.index(sortOrder.fields(), SortField::sourceId);
-
-    // build a sort prefix of partition fields that are not already in the 
sort order
-    SortOrder.Builder builder = SortOrder.builderFor(schema);
-    for (PartitionField field : spec.fields()) {
-      Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId());
-      boolean isSorted = sortFields.stream().anyMatch(sortField ->
-          field.transform().equals(sortField.transform()) || 
sortField.transform().satisfiesOrderOf(field.transform()));
-      if (!isSorted) {
-        String sourceName = schema.findColumnName(field.sourceId());
-        
builder.asc(org.apache.iceberg.expressions.Expressions.transform(sourceName, 
field.transform()));
-      }
+  public static Distribution 
buildRequiredDistribution(org.apache.iceberg.Table table) {
+    DistributionMode distributionMode = getDistributionMode(table);
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+      case HASH:
+        return Distributions.clustered(toTransforms(table.spec()));

Review comment:
       A sorted table may not be partitioned, but it would pass the check you 
added. Then if the distribution mode is hash, it would return an empty 
clustered distribution. I think it would be more correct and easier to reason 
about if the check was done here.

##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/IcebergImplicits.scala
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.utils
+
+import org.apache.iceberg.spark.source.SparkTable
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.Table
+
+object IcebergImplicits {
+  implicit class TableHelper(table: Table) {
+    def asIcebergTable: org.apache.iceberg.Table = {

Review comment:
       Should this be named `toIcebergTable` or `icebergTable`? This is doing 
more than just a cast, it is accessing the underlying table.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to