rdblue commented on a change in pull request #2022:
URL: https://github.com/apache/iceberg/pull/2022#discussion_r561417740



##########
File path: 
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
##########
@@ -194,4 +220,84 @@ trait RewriteRowLevelOperationHelper extends 
PredicateHelper with Logging {
       }
     }
   }
+
+  private object BucketTransform {
+    def unapply(transform: Transform): Option[(Int, FieldReference)] = 
transform match {
+      case bt: BucketTransform => bt.columns match {
+        case Seq(nf: NamedReference) =>
+          Some(bt.numBuckets.value(), FieldReference(nf.fieldNames()))
+        case _ =>
+          None
+      }
+      case _ => None
+    }
+  }
+
+  protected def toCatalyst(
+      distribution: Distribution,
+      plan: LogicalPlan): Seq[catalyst.expressions.Expression] = {
+
+    distribution match {
+      case d: OrderedDistribution =>
+        d.ordering.map(e => toCatalyst(e, plan, resolver))
+      case d: ClusteredDistribution =>
+        d.clustering.map(e => toCatalyst(e, plan, resolver))
+      case _: UnspecifiedDistribution =>
+        Array.empty[catalyst.expressions.Expression]
+    }
+  }
+
+  private def toCatalyst(
+      expr: Expression,
+      query: LogicalPlan,
+      resolver: Resolver): catalyst.expressions.Expression = {
+
+    def resolve(parts: Seq[String]): NamedExpression = {
+      // this part is controversial as we perform resolution in the optimizer
+      // we cannot perform this step in the analyzer since we need to optimize 
expressions
+      // in nodes like OverwriteByExpression before constructing a logical 
write
+      query.resolve(parts, resolver) match {
+        case Some(attr) => attr
+        case None => throw new AnalysisException(s"Cannot resolve 
'${parts.map(quoteIfNeeded).mkString(".")}'" +
+          s" using ${query.output}")
+      }
+    }
+
+    expr match {
+      case s: SortOrder =>
+        val catalystChild = toCatalyst(s.expression(), query, resolver)
+        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), 
toCatalyst(s.nullOrdering), Set.empty)
+      case it: IdentityTransform =>
+        resolve(it.ref.fieldNames())
+      case BucketTransform(numBuckets, ref) =>
+        IcebergBucketTransform(numBuckets, resolve(ref.fieldNames))
+      case yt: YearsTransform =>
+        IcebergYearTransform(resolve(yt.ref.fieldNames))
+      case mt: MonthsTransform =>
+        IcebergMonthTransform(resolve(mt.ref.fieldNames))
+      case dt: DaysTransform =>
+        IcebergDayTransform(resolve(dt.ref.fieldNames))
+      case ht: HoursTransform =>
+        IcebergHourTransform(resolve(ht.ref.fieldNames))
+      case ref: FieldReference =>
+        resolve(ref.fieldNames)
+      case _ =>
+        throw new RuntimeException(s"$expr is not currently supported")
+    }
+  }
+

Review comment:
       Nit: extra newline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to