This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 57db6815bc Core, Spark: Adds a Table Property for Relying on 
Identifier Fields (#15372)
57db6815bc is described below

commit 57db6815bc85a741a188f6d387d721dcb3153f2d
Author: Russell Spitzer <[email protected]>
AuthorDate: Mon Mar 2 12:23:46 2026 -0600

    Core, Spark: Adds a Table Property for Relying on Identifier Fields (#15372)
    
    * Core, Spark: Adds a Table Property for Relying on Identifier Fields
    
    Previously while we could set table identifiers there was no way of letting
    an engine know that it could rely on these fields foro optimizations. While
    we do not enforce or require valadation of identifier fields, some users
    may know that their identifier fields are in fact unique. This PR gives
    users an option of letting compatible engines know that they can take
    advantage of the uniqueness of their identifiers.
    
    * SpotlessApply
    
    * Reviewer Suggestions
    
    * SpotlessApply
---
 .../java/org/apache/iceberg/TableProperties.java   |   9 ++
 .../java/org/apache/iceberg/spark/Spark3Util.java  |   4 +
 .../org/apache/iceberg/spark/SparkReadConf.java    |   9 ++
 .../apache/iceberg/spark/SparkSQLProperties.java   |   3 +
 .../apache/iceberg/spark/source/SparkTable.java    |  22 ++++
 .../iceberg/spark/source/TestSparkTable.java       | 118 +++++++++++++++++++--
 6 files changed, 158 insertions(+), 7 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java 
b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 64be3db498..05f284079e 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -247,6 +247,15 @@ public class TableProperties {
   public static final String DELETE_PLANNING_MODE = 
"read.delete-planning-mode";
   public static final String PLANNING_MODE_DEFAULT = 
PlanningMode.AUTO.modeName();
 
+  /**
+   * When true, declares that the table's identifier fields can be relied upon 
as a primary key by
+   * query engines for optimization purposes (e.g. eliminating redundant joins 
or distinct). This is
+   * not enforced at write time and does not validate existing data.
+   */
+  public static final String READ_IDENTIFIER_FIELDS_RELY = 
"read.identifier-fields.rely";
+
+  public static final boolean READ_IDENTIFIER_FIELDS_RELY_DEFAULT = false;
+
   public static final String OBJECT_STORE_ENABLED = 
"write.object-storage.enabled";
   public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
index df42175c34..fbaffb2961 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
@@ -369,6 +369,10 @@ public class Spark3Util {
     return Expressions.column(name);
   }
 
+  public static NamedReference[] toNamedReferences(Set<String> names) {
+    return 
names.stream().map(Spark3Util::toNamedReference).toArray(NamedReference[]::new);
+  }
+
   public static Term toIcebergTerm(Expression expr) {
     if (expr instanceof Transform) {
       Transform transform = (Transform) expr;
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index ec56e5f239..0b54e1a691 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -356,4 +356,13 @@ public class SparkReadConf {
         .defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
         .parse();
   }
+
+  public boolean identifierFieldsRely() {
+    return confParser
+        .booleanConf()
+        .sessionConf(SparkSQLProperties.IDENTIFIER_FIELDS_RELY)
+        .tableProperty(TableProperties.READ_IDENTIFIER_FIELDS_RELY)
+        .defaultValue(TableProperties.READ_IDENTIFIER_FIELDS_RELY_DEFAULT)
+        .parse();
+  }
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 81139969f7..14b2b8c958 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -107,6 +107,9 @@ public class SparkSQLProperties {
   public static final String REPORT_COLUMN_STATS = 
"spark.sql.iceberg.report-column-stats";
   public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
 
+  // Rely on identifier fields as a PRIMARY KEY constraint for query 
optimization (not enforced)
+  public static final String IDENTIFIER_FIELDS_RELY = 
"spark.sql.iceberg.identifier-fields-rely";
+
   // Prefix for custom snapshot properties
   public static final String SNAPSHOT_PROPERTY_PREFIX = 
"spark.sql.iceberg.snapshot-property.";
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 24915a1bfc..6dcf1c9a15 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -55,6 +55,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
@@ -70,6 +71,8 @@ import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
+import org.apache.spark.sql.connector.catalog.constraints.Constraint;
+import 
org.apache.spark.sql.connector.catalog.constraints.Constraint.ValidationStatus;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.ScanBuilder;
@@ -286,6 +289,25 @@ public class SparkTable
     return cols.toArray(SparkMetadataColumn[]::new);
   }
 
+  @Override
+  public Constraint[] constraints() {
+    List<Constraint> constraints = Lists.newArrayList();
+
+    SparkReadConf readConf = new SparkReadConf(sparkSession(), icebergTable);
+    Set<String> identifierFieldNames = 
icebergTable.schema().identifierFieldNames();
+
+    if (readConf.identifierFieldsRely() && !identifierFieldNames.isEmpty()) {
+      constraints.add(
+          Constraint.primaryKey("iceberg_pk", 
Spark3Util.toNamedReferences(identifierFieldNames))
+              .enforced(false)
+              .validationStatus(ValidationStatus.UNVALIDATED)
+              .rely(true)
+              .build());
+    }
+
+    return constraints.toArray(new Constraint[0]);
+  }
+
   @Override
   public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
     if (refreshEagerly) {
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
index d14b1a52cf..92a6426002 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTable.java
@@ -20,12 +20,22 @@ package org.apache.iceberg.spark.source;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.CatalogManager;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.constraints.Constraint;
+import org.apache.spark.sql.connector.catalog.constraints.PrimaryKey;
+import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -36,7 +46,9 @@ public class TestSparkTable extends CatalogTestBase {
 
   @BeforeEach
   public void createTable() {
-    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
tableName);
+    sql(
+        "CREATE TABLE %s (id bigint NOT NULL, name string NOT NULL, data 
string) USING iceberg",
+        tableName);
   }
 
   @AfterEach
@@ -45,15 +57,107 @@ public class TestSparkTable extends CatalogTestBase {
   }
 
   @TestTemplate
-  public void testTableEquality() throws NoSuchTableException {
-    CatalogManager catalogManager = spark.sessionState().catalogManager();
-    TableCatalog catalog = (TableCatalog) catalogManager.catalog(catalogName);
-    Identifier identifier = Identifier.of(tableIdent.namespace().levels(), 
tableIdent.name());
-    SparkTable table1 = (SparkTable) catalog.loadTable(identifier);
-    SparkTable table2 = (SparkTable) catalog.loadTable(identifier);
+  public void testTableEquality() {
+    SparkTable table1 = loadSparkTable();
+    SparkTable table2 = loadSparkTable();
 
     // different instances pointing to the same table must be equivalent
     assertThat(table1).as("References must be different").isNotSameAs(table2);
     assertThat(table1).as("Tables must be equivalent").isEqualTo(table2);
   }
+
+  @TestTemplate
+  public void testNoIdentifierFieldsRelyByDefault() {
+    SparkTable sparkTable = loadSparkTable();
+    assertThat(primaryKeys(sparkTable)).isEmpty();
+
+    // enabling rely without identifier fields still produces no primary key
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.READ_IDENTIFIER_FIELDS_RELY);
+    sparkTable = loadSparkTable();
+    assertThat(primaryKeys(sparkTable)).isEmpty();
+  }
+
+  @TestTemplate
+  public void testIdentifierFieldsRelyViaTableProperty() {
+    SparkTable sparkTable = loadSparkTable();
+    sparkTable
+        .table()
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .setIdentifierFields("id", "name")
+        .commit();
+
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.READ_IDENTIFIER_FIELDS_RELY);
+
+    sparkTable = loadSparkTable();
+    List<PrimaryKey> pks = primaryKeys(sparkTable);
+    assertThat(pks).hasSize(1);
+
+    PrimaryKey pk = pks.get(0);
+    assertThat(pk.name()).isEqualTo("iceberg_pk");
+    assertThat(pk.enforced()).isFalse();
+    assertThat(pk.rely()).isTrue();
+    
assertThat(pk.validationStatus()).isEqualTo(Constraint.ValidationStatus.UNVALIDATED);
+
+    Set<String> columnNames =
+        
Arrays.stream(pk.columns()).map(NamedReference::toString).collect(Collectors.toSet());
+    assertThat(columnNames).containsExactlyInAnyOrder("id", "name");
+
+    // disabling rely removes the primary key
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'false')",
+        tableName, TableProperties.READ_IDENTIFIER_FIELDS_RELY);
+    sparkTable = loadSparkTable();
+    assertThat(primaryKeys(sparkTable)).isEmpty();
+  }
+
+  @TestTemplate
+  public void testIdentifierFieldsRelyViaSessionConf() {
+    SparkTable sparkTable = loadSparkTable();
+    
sparkTable.table().updateSchema().allowIncompatibleChanges().setIdentifierFields("id").commit();
+
+    // session conf enables rely without a table property
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.IDENTIFIER_FIELDS_RELY, "true"),
+        () -> {
+          List<PrimaryKey> pks = primaryKeys(loadSparkTable());
+          assertThat(pks).hasSize(1);
+
+          Set<String> columnNames =
+              Arrays.stream(pks.get(0).columns())
+                  .map(NamedReference::toString)
+                  .collect(Collectors.toSet());
+          assertThat(columnNames).containsExactly("id");
+        });
+
+    // session conf rely=false overrides table property rely=true
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.READ_IDENTIFIER_FIELDS_RELY);
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.IDENTIFIER_FIELDS_RELY, "false"),
+        () -> assertThat(primaryKeys(loadSparkTable())).isEmpty());
+  }
+
+  private static List<PrimaryKey> primaryKeys(SparkTable table) {
+    return Arrays.stream(table.constraints())
+        .filter(c -> c instanceof PrimaryKey)
+        .map(c -> (PrimaryKey) c)
+        .collect(Collectors.toList());
+  }
+
+  private SparkTable loadSparkTable() {
+    try {
+      CatalogManager catalogManager = spark.sessionState().catalogManager();
+      TableCatalog catalog = (TableCatalog) 
catalogManager.catalog(catalogName);
+      Identifier identifier = Identifier.of(tableIdent.namespace().levels(), 
tableIdent.name());
+      return (SparkTable) catalog.loadTable(identifier);
+    } catch (NoSuchTableException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }

Reply via email to