This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d5b21d82e3 Spark 3.4: Add utility to load table state reliably (#11115)
d5b21d82e3 is described below

commit d5b21d82e3adb351c3295465e76c772e2dcb3a54
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Sep 16 14:46:24 2024 -0700

    Spark 3.4: Add utility to load table state reliably (#11115)
---
 .../java/org/apache/iceberg/spark/SparkTableUtil.java | 19 +++++++++++++++----
 .../apache/iceberg/spark/actions/NDVSketchUtil.java   | 12 ++++--------
 .../spark/actions/TestComputeTableStatsAction.java    | 14 ++++++++++++++
 3 files changed, 33 insertions(+), 12 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 7a96e97fb9..e103104171 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -851,6 +851,12 @@ public class SparkTableUtil {
         .run(item -> io.deleteFile(item.path()));
   }
 
+  public static Dataset<Row> loadTable(SparkSession spark, Table table, long 
snapshotId) {
+    SparkTable sparkTable = new SparkTable(table, snapshotId, false);
+    DataSourceV2Relation relation = createRelation(sparkTable, 
ImmutableMap.of());
+    return Dataset.ofRows(spark, relation);
+  }
+
   public static Dataset<Row> loadMetadataTable(
       SparkSession spark, Table table, MetadataTableType type) {
     return loadMetadataTable(spark, table, type, ImmutableMap.of());
@@ -858,11 +864,16 @@ public class SparkTableUtil {
 
   public static Dataset<Row> loadMetadataTable(
       SparkSession spark, Table table, MetadataTableType type, Map<String, 
String> extraOptions) {
-    SparkTable metadataTable =
-        new SparkTable(MetadataTableUtils.createMetadataTableInstance(table, 
type), false);
+    Table metadataTable = 
MetadataTableUtils.createMetadataTableInstance(table, type);
+    SparkTable sparkMetadataTable = new SparkTable(metadataTable, false);
+    DataSourceV2Relation relation = createRelation(sparkMetadataTable, 
extraOptions);
+    return Dataset.ofRows(spark, relation);
+  }
+
+  private static DataSourceV2Relation createRelation(
+      SparkTable sparkTable, Map<String, String> extraOptions) {
     CaseInsensitiveStringMap options = new 
CaseInsensitiveStringMap(extraOptions);
-    return Dataset.ofRows(
-        spark, DataSourceV2Relation.create(metadataTable, Some.empty(), 
Some.empty(), options));
+    return DataSourceV2Relation.create(sparkTable, Option.empty(), 
Option.empty(), options);
   }
 
   /**
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
index 22055a161e..c8a20d3cca 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
@@ -32,9 +32,10 @@ import org.apache.iceberg.puffin.StandardBlobTypes;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.stats.ThetaSketchAgg;
@@ -73,13 +74,8 @@ public class NDVSketchUtil {
 
   private static Row computeNDVSketches(
       SparkSession spark, Table table, Snapshot snapshot, List<String> 
colNames) {
-    return spark
-        .read()
-        .format("iceberg")
-        .option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId())
-        .load(table.name())
-        .select(toAggColumns(colNames))
-        .first();
+    Dataset<Row> inputDF = SparkTableUtil.loadTable(spark, table, 
snapshot.snapshotId());
+    return inputDF.select(toAggColumns(colNames)).first();
   }
 
   private static Column[] toAggColumns(List<String> colNames) {
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
index 58703d4a90..88805a070c 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
@@ -82,6 +82,20 @@ public class TestComputeTableStatsAction extends 
SparkCatalogTestBase {
     super(catalogName, implementation, config);
   }
 
+  @Test
+  public void testLoadingTableDirectly() {
+    sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+    sql("INSERT into %s values(1, 'abcd')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    SparkActions actions = SparkActions.get();
+    ComputeTableStats.Result results = 
actions.computeTableStats(table).execute();
+    StatisticsFile statisticsFile = results.statisticsFile();
+    assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0);
+    assertThat(statisticsFile.blobMetadata().size()).isEqualTo(2);
+  }
+
   @Test
   public void testComputeTableStatsAction() throws NoSuchTableException, 
ParseException {
     sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);

Reply via email to