This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new da392f259c Spark: Use SerializableTableWithSize when optimizing 
metadata (#8957)
da392f259c is described below

commit da392f259cd8fc4788453e309e3369a3a190bbb1
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Oct 31 10:59:49 2023 -0700

    Spark: Use SerializableTableWithSize when optimizing metadata (#8957)
---
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java       | 9 ++++++---
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java       | 9 ++++++---
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java       | 9 ++++++---
 .../iceberg/spark/actions/RewriteManifestsSparkAction.java       | 9 ++++++---
 4 files changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 57faa5fcd9..854232a62d 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int 
targetNumManifestEntries) {
 
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int 
targetNumManifestEntries) {
 
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int 
targetNumManifestEntries) {
 
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.JobGroupInfo;
 import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForPartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests, int 
targetNumManifestEntries) {
 
-    Broadcast<Table> tableBroadcast = 
sparkContext().broadcast(SerializableTable.copyOf(table));
+    Broadcast<Table> tableBroadcast =
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
 

Reply via email to