This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d5b21d82e3 Spark 3.4: Add utility to load table state reliably (#11115)
d5b21d82e3 is described below
commit d5b21d82e3adb351c3295465e76c772e2dcb3a54
Author: Hongyue/Steve Zhang <[email protected]>
AuthorDate: Mon Sep 16 14:46:24 2024 -0700
Spark 3.4: Add utility to load table state reliably (#11115)
---
.../java/org/apache/iceberg/spark/SparkTableUtil.java | 19 +++++++++++++++----
.../apache/iceberg/spark/actions/NDVSketchUtil.java | 12 ++++--------
.../spark/actions/TestComputeTableStatsAction.java | 14 ++++++++++++++
3 files changed, 33 insertions(+), 12 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 7a96e97fb9..e103104171 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -851,6 +851,12 @@ public class SparkTableUtil {
.run(item -> io.deleteFile(item.path()));
}
+ public static Dataset<Row> loadTable(SparkSession spark, Table table, long
snapshotId) {
+ SparkTable sparkTable = new SparkTable(table, snapshotId, false);
+ DataSourceV2Relation relation = createRelation(sparkTable,
ImmutableMap.of());
+ return Dataset.ofRows(spark, relation);
+ }
+
public static Dataset<Row> loadMetadataTable(
SparkSession spark, Table table, MetadataTableType type) {
return loadMetadataTable(spark, table, type, ImmutableMap.of());
@@ -858,11 +864,16 @@ public class SparkTableUtil {
public static Dataset<Row> loadMetadataTable(
SparkSession spark, Table table, MetadataTableType type, Map<String,
String> extraOptions) {
- SparkTable metadataTable =
- new SparkTable(MetadataTableUtils.createMetadataTableInstance(table,
type), false);
+ Table metadataTable =
MetadataTableUtils.createMetadataTableInstance(table, type);
+ SparkTable sparkMetadataTable = new SparkTable(metadataTable, false);
+ DataSourceV2Relation relation = createRelation(sparkMetadataTable,
extraOptions);
+ return Dataset.ofRows(spark, relation);
+ }
+
+ private static DataSourceV2Relation createRelation(
+ SparkTable sparkTable, Map<String, String> extraOptions) {
CaseInsensitiveStringMap options = new
CaseInsensitiveStringMap(extraOptions);
- return Dataset.ofRows(
- spark, DataSourceV2Relation.create(metadataTable, Some.empty(),
Some.empty(), options));
+ return DataSourceV2Relation.create(sparkTable, Option.empty(),
Option.empty(), options);
}
/**
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
index 22055a161e..c8a20d3cca 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/NDVSketchUtil.java
@@ -32,9 +32,10 @@ import org.apache.iceberg.puffin.StandardBlobTypes;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.stats.ThetaSketchAgg;
@@ -73,13 +74,8 @@ public class NDVSketchUtil {
private static Row computeNDVSketches(
SparkSession spark, Table table, Snapshot snapshot, List<String>
colNames) {
- return spark
- .read()
- .format("iceberg")
- .option(SparkReadOptions.SNAPSHOT_ID, snapshot.snapshotId())
- .load(table.name())
- .select(toAggColumns(colNames))
- .first();
+ Dataset<Row> inputDF = SparkTableUtil.loadTable(spark, table,
snapshot.snapshotId());
+ return inputDF.select(toAggColumns(colNames)).first();
}
private static Column[] toAggColumns(List<String> colNames) {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
index 58703d4a90..88805a070c 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputeTableStatsAction.java
@@ -82,6 +82,20 @@ public class TestComputeTableStatsAction extends
SparkCatalogTestBase {
super(catalogName, implementation, config);
}
+ @Test
+ public void testLoadingTableDirectly() {
+ sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);
+ sql("INSERT into %s values(1, 'abcd')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ SparkActions actions = SparkActions.get();
+ ComputeTableStats.Result results =
actions.computeTableStats(table).execute();
+ StatisticsFile statisticsFile = results.statisticsFile();
+ assertThat(statisticsFile.fileSizeInBytes()).isNotEqualTo(0);
+ assertThat(statisticsFile.blobMetadata().size()).isEqualTo(2);
+ }
+
@Test
public void testComputeTableStatsAction() throws NoSuchTableException,
ParseException {
sql("CREATE TABLE %s (id int, data string) USING iceberg", tableName);