This is an automated email from the ASF dual-hosted git repository.
huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 455a82a692 Spark 4.1: Refactor metadata column references to use
asRef() method (#15376)
455a82a692 is described below
commit 455a82a6921a1cf40ff69f548f449d2748cfc09b
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 19 23:50:40 2026 -0800
Spark 4.1: Refactor metadata column references to use asRef() method
(#15376)
---
.../spark/source/SparkCopyOnWriteOperation.java | 16 +++++++---------
.../iceberg/spark/source/SparkCopyOnWriteScan.java | 4 +---
.../iceberg/spark/source/SparkMetadataColumn.java | 6 ++++++
.../spark/source/SparkPositionDeltaOperation.java | 22 ++++++++++------------
4 files changed, 24 insertions(+), 24 deletions(-)
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
index dd3d785b3b..21dfb955db 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java
@@ -23,12 +23,10 @@ import static
org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPD
import java.util.List;
import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -98,18 +96,18 @@ class SparkCopyOnWriteOperation implements
RowLevelOperation {
@Override
public NamedReference[] requiredMetadataAttributes() {
- List<NamedReference> metadataAttributes = Lists.newArrayList();
-
metadataAttributes.add(Expressions.column(MetadataColumns.FILE_PATH.name()));
+ List<NamedReference> metaAttrs = Lists.newArrayList();
+ metaAttrs.add(SparkMetadataColumns.FILE_PATH.asRef());
+
if (command == DELETE || command == UPDATE) {
-
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_POSITION.name()));
+ metaAttrs.add(SparkMetadataColumns.ROW_POSITION.asRef());
}
if (TableUtil.supportsRowLineage(table)) {
-
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_ID.name()));
- metadataAttributes.add(
-
Expressions.column(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()));
+ metaAttrs.add(SparkMetadataColumns.ROW_ID.asRef());
+ metaAttrs.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.asRef());
}
- return metadataAttributes.toArray(NamedReference[]::new);
+ return metaAttrs.toArray(NamedReference[]::new);
}
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
index 0678236861..937c5bc834 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java
@@ -37,7 +37,6 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsRuntimeFiltering;
@@ -98,8 +97,7 @@ class SparkCopyOnWriteScan extends
SparkPartitioningAwareScan<FileScanTask>
@Override
public NamedReference[] filterAttributes() {
- NamedReference file = Expressions.column(MetadataColumns.FILE_PATH.name());
- return new NamedReference[] {file};
+ return new NamedReference[] {SparkMetadataColumns.FILE_PATH.asRef()};
}
@Override
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
index 54687a8f04..8f2ae73460 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMetadataColumn.java
@@ -21,6 +21,8 @@ package org.apache.iceberg.spark.source;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.spark.sql.connector.catalog.MetadataColumn;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.MetadataBuilder;
@@ -127,4 +129,8 @@ public class SparkMetadataColumn implements MetadataColumn {
.build()
.json();
}
+
+ public NamedReference asRef() {
+ return Expressions.column(name());
+ }
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
index 97f9601ced..bc0a7c6478 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java
@@ -20,12 +20,10 @@ package org.apache.iceberg.spark.source;
import java.util.List;
import org.apache.iceberg.IsolationLevel;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.connector.expressions.Expressions;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -99,23 +97,23 @@ class SparkPositionDeltaOperation implements
RowLevelOperation, SupportsDelta {
@Override
public NamedReference[] requiredMetadataAttributes() {
- List<NamedReference> metadataAttributes = Lists.newArrayList();
- metadataAttributes.add(Expressions.column(MetadataColumns.SPEC_ID.name()));
-
metadataAttributes.add(Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME));
+ List<NamedReference> metaAttrs = Lists.newArrayList();
+ metaAttrs.add(SparkMetadataColumns.SPEC_ID.asRef());
+ metaAttrs.add(SparkMetadataColumns.partition(table).asRef());
+
if (TableUtil.supportsRowLineage(table)) {
-
metadataAttributes.add(Expressions.column(MetadataColumns.ROW_ID.name()));
- metadataAttributes.add(
-
Expressions.column(MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.name()));
+ metaAttrs.add(SparkMetadataColumns.ROW_ID.asRef());
+ metaAttrs.add(SparkMetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.asRef());
}
- return metadataAttributes.toArray(new NamedReference[0]);
+ return metaAttrs.toArray(new NamedReference[0]);
}
@Override
public NamedReference[] rowId() {
- NamedReference file = Expressions.column(MetadataColumns.FILE_PATH.name());
- NamedReference pos =
Expressions.column(MetadataColumns.ROW_POSITION.name());
- return new NamedReference[] {file, pos};
+ return new NamedReference[] {
+ SparkMetadataColumns.FILE_PATH.asRef(),
SparkMetadataColumns.ROW_POSITION.asRef()
+ };
}
@Override