This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new da392f259c Spark: Use SerializableTableWithSize when optimizing
metadata (#8957)
da392f259c is described below
commit da392f259cd8fc4788453e309e3369a3a190bbb1
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Oct 31 10:59:49 2023 -0700
Spark: Use SerializableTableWithSize when optimizing metadata (#8957)
---
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 9 ++++++---
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 9 ++++++---
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 9 ++++++---
.../iceberg/spark/actions/RewriteManifestsSparkAction.java | 9 ++++++---
4 files changed, 24 insertions(+), 12 deletions(-)
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 57faa5fcd9..854232a62d 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int
targetNumManifestEntries) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int
targetNumManifestEntries) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int
targetNumManifestEntries) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index 6ecdfac822..87fbe2de2f 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -36,7 +36,6 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
-import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -52,6 +51,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.spark.SparkDataFile;
+import org.apache.iceberg.spark.source.SerializableTableWithSize;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.Tasks;
@@ -220,7 +220,9 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
@@ -246,7 +248,8 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForPartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests, int
targetNumManifestEntries) {
- Broadcast<Table> tableBroadcast =
sparkContext().broadcast(SerializableTable.copyOf(table));
+ Broadcast<Table> tableBroadcast =
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);