This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 455a82a692 Spark 4.1: Refactor metadata column references to use 
asRef() method (#15376)
455a82a692 is described below

commit 455a82a6921a1cf40ff69f548f449d2748cfc09b
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 23:50:40 2026 -0800

    Spark 4.1: Refactor metadata column references to use asRef() method 
(#15376)
---
 .../spark/source/SparkCopyOnWriteOperation.java    | 16 +++++++---------
 .../iceberg/spark/source/SparkCopyOnWriteScan.java |  4 +---
 .../iceberg/spark/source/SparkMetadataColumn.java  |  6 ++++++
 .../spark/source/SparkPositionDeltaOperation.java  | 22 ++++++++++------------
 4 files changed, 24 insertions(+), 24 deletions(-)

diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
index dd3d785b3b..21dfb955db 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
@@ -23,12 +23,10 @@ import static 
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
 
 import java.util.List;
 import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -98,18 +96,18 @@ class SparkCopyOnWriteOperation implements 
RowLevelOperation {
 
   @Override
   public NamedReference[] requiredMetadataAttributes() {
-    List<NamedReference> metadataAttributes = Lists.newArrayList();
-    
metadataAttributes.add(Expressions.column(MetadataColumns.FILE_PATH.name()));
+    List<NamedReference> metaAttrs = Lists.newArrayList();
+    metaAttrs.add(SparkMetadataColumns.FILE_PATH.asRef());
+
     if (command == DELETE || command == UPDATE) {
-      
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_POSITION.name()));
+      metaAttrs.add(SparkMetadataColumns.ROW_POSITION.asRef());
     }
 
     if (TableUtil.supportsRowLineage(table)) {
-      
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_ID.name()));
-      metadataAttributes.add(
-          
Expressions.column(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()));
+      metaAttrs.add(SparkMetadataColumns.ROW_ID.asRef());
+      metaAttrs.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.asRef());
     }
 
-    return metadataAttributes.toArray(NamedReference[]::new);
+    return metaAttrs.toArray(NamedReference[]::new);
   }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 0678236861..937c5bc834 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -37,7 +37,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
@@ -98,8 +97,7 @@ class SparkCopyOnWriteScan extends 
SparkPartitioningAwareScan<FileScanTask>
 
   @Override
   public NamedReference[] filterAttributes() {
-    NamedReference file = Expressions.column(MetadataColumns.FILE_PATH.name());
-    return new NamedReference[] {file};
+    return new NamedReference[] {SparkMetadataColumns.FILE_PATH.asRef()};
   }
 
   @Override
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
index 54687a8f04..8f2ae73460 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.spark.source;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.MetadataBuilder;
 
@@ -127,4 +129,8 @@ public class SparkMetadataColumn implements MetadataColumn {
         .build()
         .json();
   }
+
+  public NamedReference asRef() {
+    return Expressions.column(name());
+  }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
index 97f9601ced..bc0a7c6478 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
@@ -20,12 +20,10 @@ package org.apache.iceberg.spark.source;
 
 import java.util.List;
 import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -99,23 +97,23 @@ class SparkPositionDeltaOperation implements 
RowLevelOperation, SupportsDelta {
 
   @Override
   public NamedReference[] requiredMetadataAttributes() {
-    List<NamedReference> metadataAttributes = Lists.newArrayList();
-    metadataAttributes.add(Expressions.column(MetadataColumns.SPEC_ID.name()));
-    
metadataAttributes.add(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME));
+    List<NamedReference> metaAttrs = Lists.newArrayList();
+    metaAttrs.add(SparkMetadataColumns.SPEC_ID.asRef());
+    metaAttrs.add(SparkMetadataColumns.partition(table).asRef());
+
     if (TableUtil.supportsRowLineage(table)) {
-      
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_ID.name()));
-      metadataAttributes.add(
-          
Expressions.column(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()));
+      metaAttrs.add(SparkMetadataColumns.ROW_ID.asRef());
+      metaAttrs.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.asRef());
     }
 
-    return metadataAttributes.toArray(new NamedReference[0]);
+    return metaAttrs.toArray(new NamedReference[0]);
   }
 
   @Override
   public NamedReference[] rowId() {
-    NamedReference file = Expressions.column(MetadataColumns.FILE_PATH.name());
-    NamedReference pos = 
Expressions.column(MetadataColumns.ROW_POSITION.name());
-    return new NamedReference[] {file, pos};
+    return new NamedReference[] {
+      SparkMetadataColumns.FILE_PATH.asRef(), 
SparkMetadataColumns.ROW_POSITION.asRef()
+    };
   }
 
   @Override

Reply via email to