This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 49d833aa83 API,Core,Spark: Add rewritten bytes to rewrite data files 
procedure results (#6801)
49d833aa83 is described below

commit 49d833aa83f6d4681c6c9f8473080f6e78124a0e
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Wed Feb 15 22:46:53 2023 +0100

    API,Core,Spark: Add rewritten bytes to rewrite data files procedure results 
(#6801)
    
    Co-authored-by: Alex Reid <[email protected]>
---
 .../apache/iceberg/actions/RewriteDataFiles.java   |   8 ++
 .../actions/BaseFileGroupRewriteResult.java        |  18 +++
 .../apache/iceberg/actions/RewriteFileGroup.java   |   4 +-
 .../extensions/TestRewriteDataFilesProcedure.java  | 124 +++++++++++++++++----
 .../procedures/RewriteDataFilesProcedure.java      |   7 +-
 .../apache/iceberg/spark/SparkCatalogConfig.java   |   5 +-
 .../apache/iceberg/spark/SparkTestHelperBase.java  |   2 +-
 .../spark/actions/TestRewriteDataFilesAction.java  |  55 +++++++++
 .../apache/iceberg/spark/sql/TestRefreshTable.java |   4 +-
 9 files changed, 197 insertions(+), 30 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java 
b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
index 39e2b9bc66..5f76528d60 100644
--- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
+++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
@@ -187,6 +187,10 @@ public interface RewriteDataFiles
           .mapToInt(FileGroupRewriteResult::rewrittenDataFilesCount)
           .sum();
     }
+
+    default long rewrittenBytesCount() {
+      return 
rewriteResults().stream().mapToLong(FileGroupRewriteResult::rewrittenBytesCount).sum();
+    }
   }
 
   /**
@@ -199,6 +203,10 @@ public interface RewriteDataFiles
     int addedDataFilesCount();
 
     int rewrittenDataFilesCount();
+
+    default long rewrittenBytesCount() {
+      return 0L;
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java 
b/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java
index fd44f7f6a3..f730c4303f 100644
--- 
a/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java
+++ 
b/core/src/main/java/org/apache/iceberg/actions/BaseFileGroupRewriteResult.java
@@ -24,13 +24,26 @@ import 
org.apache.iceberg.actions.RewriteDataFiles.FileGroupRewriteResult;
 public class BaseFileGroupRewriteResult implements FileGroupRewriteResult {
   private final int addedDataFilesCount;
   private final int rewrittenDataFilesCount;
+  private final long rewrittenBytesCount;
   private final FileGroupInfo info;
 
+  /**
+   * @deprecated Will be removed in 1.3.0; use {@link
+   *     BaseFileGroupRewriteResult#BaseFileGroupRewriteResult(FileGroupInfo, 
int, int, long)}
+   *     instead.
+   */
+  @Deprecated
   public BaseFileGroupRewriteResult(
       FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount) {
+    this(info, addedFilesCount, rewrittenFilesCount, 0L);
+  }
+
+  public BaseFileGroupRewriteResult(
+      FileGroupInfo info, int addedFilesCount, int rewrittenFilesCount, long 
rewrittenBytesCount) {
     this.info = info;
     this.addedDataFilesCount = addedFilesCount;
     this.rewrittenDataFilesCount = rewrittenFilesCount;
+    this.rewrittenBytesCount = rewrittenBytesCount;
   }
 
   @Override
@@ -47,4 +60,9 @@ public class BaseFileGroupRewriteResult implements 
FileGroupRewriteResult {
   public int rewrittenDataFilesCount() {
     return rewrittenDataFilesCount;
   }
+
+  @Override
+  public long rewrittenBytesCount() {
+    return rewrittenBytesCount;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java 
b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index dd4516be76..c26753c01d 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -65,7 +65,8 @@ public class RewriteFileGroup {
 
   public RewriteDataFiles.FileGroupRewriteResult asResult() {
     Preconditions.checkState(addedFiles != null, "Cannot get result, Group was 
never rewritten");
-    return new BaseFileGroupRewriteResult(info, addedFiles.size(), 
fileScanTasks.size());
+    return new BaseFileGroupRewriteResult(
+        info, addedFiles.size(), fileScanTasks.size(), sizeInBytes());
   }
 
   @Override
@@ -76,6 +77,7 @@ public class RewriteFileGroup {
         .add(
             "numAddedFiles",
             addedFiles == null ? "Rewrite Incomplete" : 
Integer.toString(addedFiles.size()))
+        .add("numRewrittenBytes", sizeInBytes())
         .toString();
   }
 
diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 5e80c85864..4126f35919 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -18,10 +18,15 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.IntStream;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.NamedReference;
 import org.apache.iceberg.expressions.Zorder;
@@ -69,7 +74,7 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   public void testRewriteDataFilesInEmptyTable() {
     createTable();
     List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')", 
catalogName, tableIdent);
-    assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)), 
output);
+    assertEquals("Procedure output must match", ImmutableList.of(row(0, 0, 
0L)), output);
   }
 
   @Test
@@ -84,8 +89,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 2 data files (one per 
partition) ",
-        ImmutableList.of(row(10, 2)),
-        output);
+        row(10, 2),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -103,8 +113,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data files",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -125,7 +140,7 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 0 data files and add 0 data files",
-        ImmutableList.of(row(0, 0)),
+        ImmutableList.of(row(0, 0, 0L)),
         output);
 
     List<Object[]> actualRecords = currentData();
@@ -148,8 +163,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data files",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -170,8 +190,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data files",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     // Due to Z_order, the data written will be in the below order.
     // As there is only one small output file, we can validate the query 
ordering (as it will not
@@ -207,8 +232,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 5 data files (containing c1 = 1) and add 1 data 
files",
-        ImmutableList.of(row(5, 1)),
-        output);
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -230,8 +260,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals(
         "Action should rewrite 5 data files from single matching partition"
             + "(containing c2 = bar) and add 1 data files",
-        ImmutableList.of(row(5, 1)),
-        output);
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -253,8 +288,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     assertEquals(
         "Action should rewrite 5 data files from single matching partition"
             + "(containing c2 = bar) and add 1 data files",
-        ImmutableList.of(row(5, 1)),
-        output);
+        row(5, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -471,6 +511,8 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   public void testBinPackTableWithSpecialChars() {
     
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
 
+    TableIdentifier identifier =
+        TableIdentifier.of("default", 
QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
     sql(
         "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
         tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
@@ -486,8 +528,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data file",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isEqualTo(
+            
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -499,6 +546,8 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   public void testSortTableWithSpecialChars() {
     
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
 
+    TableIdentifier identifier =
+        TableIdentifier.of("default", 
QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
     sql(
         "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
         tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
@@ -518,8 +567,14 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data file",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        .isEqualTo(
+            
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -531,6 +586,8 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
   public void testZOrderTableWithSpecialChars() {
     
Assume.assumeTrue(catalogName.equals(SparkCatalogConfig.HADOOP.catalogName()));
 
+    TableIdentifier identifier =
+        TableIdentifier.of("default", 
QUOTED_SPECIAL_CHARS_TABLE_NAME.replaceAll("`", ""));
     sql(
         "CREATE TABLE %s (c1 int, c2 string, c3 string) USING iceberg",
         tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
@@ -550,8 +607,14 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 10 data files and add 1 data file",
-        ImmutableList.of(row(10, 1)),
-        output);
+        row(10, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        .isEqualTo(
+            
Long.valueOf(snapshotSummary(identifier).get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = 
currentData(tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -580,8 +643,13 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
 
     assertEquals(
         "Action should rewrite 2 data files and add 1 data files",
-        ImmutableList.of(row(2, 1)),
-        output);
+        row(2, 1),
+        Arrays.copyOf(output.get(0), 2));
+    // verify rewritten bytes separately
+    assertThat(output.get(0)).hasSize(3);
+    assertThat(output.get(0)[2])
+        .isInstanceOf(Long.class)
+        
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Data after compaction should not change", expectedRecords, 
actualRecords);
@@ -622,6 +690,14 @@ public class TestRewriteDataFilesProcedure extends 
SparkExtensionsTestBase {
     }
   }
 
+  private Map<String, String> snapshotSummary() {
+    return snapshotSummary(tableIdent);
+  }
+
+  private Map<String, String> snapshotSummary(TableIdentifier tableIdentifier) 
{
+    return 
validationCatalog.loadTable(tableIdentifier).currentSnapshot().summary();
+  }
+
   private List<Object[]> currentData() {
     return currentData(tableName);
   }
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index 39b8f7f30a..1aea61e747 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -67,7 +67,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
             new StructField(
                 "rewritten_data_files_count", DataTypes.IntegerType, false, 
Metadata.empty()),
             new StructField(
-                "added_data_files_count", DataTypes.IntegerType, false, 
Metadata.empty())
+                "added_data_files_count", DataTypes.IntegerType, false, 
Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, 
false, Metadata.empty())
           });
 
   public static ProcedureBuilder builder() {
@@ -213,8 +214,10 @@ class RewriteDataFilesProcedure extends BaseProcedure {
 
   private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
     int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
+    long rewrittenBytesCount = result.rewrittenBytesCount();
     int addedDataFilesCount = result.addedDataFilesCount();
-    InternalRow row = newInternalRow(rewrittenDataFilesCount, 
addedDataFilesCount);
+    InternalRow row =
+        newInternalRow(rewrittenDataFilesCount, addedDataFilesCount, 
rewrittenBytesCount);
     return new InternalRow[] {row};
   }
 
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
index 1006ed380f..fc18ed3bb1 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
@@ -28,7 +28,10 @@ public enum SparkCatalogConfig {
       ImmutableMap.of(
           "type", "hive",
           "default-namespace", "default")),
-  HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type", 
"hadoop")),
+  HADOOP(
+      "testhadoop",
+      SparkCatalog.class.getName(),
+      ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
   SPARK(
       "spark_catalog",
       SparkSessionCatalog.class.getName(),
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
index 9d21f392a8..97484702ca 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
@@ -68,7 +68,7 @@ public class SparkTestHelperBase {
     }
   }
 
-  private void assertEquals(String context, Object[] expectedRow, Object[] 
actualRow) {
+  protected void assertEquals(String context, Object[] expectedRow, Object[] 
actualRow) {
     Assert.assertEquals("Number of columns should match", expectedRow.length, 
actualRow.length);
     for (int col = 0; col < actualRow.length; col += 1) {
       Object expectedValue = expectedRow[col];
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index ebe02bb98d..61a855b1dd 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -23,6 +23,7 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 import static org.apache.spark.sql.functions.current_date;
 import static org.apache.spark.sql.functions.date_add;
 import static org.apache.spark.sql.functions.expr;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.doAnswer;
@@ -155,10 +156,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Table table = createTable(4);
     shouldHaveFiles(table, 4);
     List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     Result result = basicRewrite(table).execute();
     Assert.assertEquals("Action should rewrite 4 data files", 4, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 1);
     List<Object[]> actual = currentData();
@@ -171,10 +174,12 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Table table = createTablePartitioned(4, 2);
     shouldHaveFiles(table, 8);
     List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     Result result = basicRewrite(table).execute();
     Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
     List<Object[]> actualRecords = currentData();
@@ -187,6 +192,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Table table = createTablePartitioned(4, 2);
     shouldHaveFiles(table, 8);
     List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     Result result =
         basicRewrite(table)
@@ -196,6 +202,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     shouldHaveFiles(table, 7);
 
@@ -212,6 +219,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     table.updateSpec().addField(Expressions.ref("c1")).commit();
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -227,6 +235,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         "Should have 1 fileGroup because all files were not correctly 
partitioned",
         1,
         result.rewriteResults().size());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> postRewriteData = currentData();
     assertEquals("We shouldn't have changed the data", originalData, 
postRewriteData);
@@ -262,6 +271,8 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
+
     Result result =
         actions()
             .rewriteDataFiles(table)
@@ -272,6 +283,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2")
             .execute();
     Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Rows must match", expectedRecords, actualRecords);
@@ -298,12 +310,15 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     rowDelta.commit();
     table.refresh();
     List<Object[]> expectedRecords = currentData();
+    long dataSizeBefore = testDataSize(table);
+
     Result result =
         actions()
             .rewriteDataFiles(table)
             .option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
             .execute();
     Assert.assertEquals("Action should rewrite 1 data files", 1, 
result.rewrittenDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
     assertEquals("Rows must match", expectedRecords, actualRecords);
@@ -329,11 +344,13 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
     table.refresh();
     long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+    long dataSizeBefore = testDataSize(table);
 
     Result result =
         
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, 
"true").execute();
     Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
     List<Object[]> actualRecords = currentData();
@@ -361,11 +378,13 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     table.refresh();
     long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
     Assert.assertEquals("Table sequence number should be 0", 0, 
oldSequenceNumber);
+    long dataSizeBefore = testDataSize(table);
 
     Result result =
         
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, 
"true").execute();
     Assert.assertEquals("Action should rewrite 8 data files", 8, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 4 data file", 4, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 4);
     List<Object[]> actualRecords = currentData();
@@ -411,9 +430,11 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     shouldHaveFiles(table, 2);
 
+    long dataSizeBefore = testDataSize(table);
     Result result = basicRewrite(table).filter(Expressions.equal("c3", 
"0")).execute();
     Assert.assertEquals("Action should rewrite 2 data files", 2, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 1 data file", 1, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     List<Object[]> actualRecords = currentData();
 
@@ -428,6 +449,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     List<Object[]> expectedRecords = currentData();
     long targetSize = testDataSize(table) / 2;
 
+    long dataSizeBefore = testDataSize(table);
     Result result =
         basicRewrite(table)
             .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Long.toString(targetSize))
@@ -436,6 +458,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     Assert.assertEquals("Action should delete 1 data files", 1, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 2 data files", 2, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 2);
 
@@ -457,6 +480,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     int targetSize = averageFileSize(table);
 
+    long dataSizeBefore = testDataSize(table);
     Result result =
         basicRewrite(table)
             .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Integer.toString(targetSize + 1000))
@@ -468,6 +492,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     // Should Split the big files into 3 pieces, one of which should be 
combined with the two
     // smaller files
     Assert.assertEquals("Action should add 3 data files", 3, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 3);
 
@@ -484,6 +509,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int targetSize = ((int) testDataSize(table) / 3);
     // The test is to see if we can combine parts of files to make files of 
the correct size
 
+    long dataSizeBefore = testDataSize(table);
     Result result =
         basicRewrite(table)
             .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, 
Integer.toString(targetSize))
@@ -495,6 +521,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     Assert.assertEquals("Action should delete 4 data files", 4, 
result.rewrittenDataFilesCount());
     Assert.assertEquals("Action should add 3 data files", 3, 
result.addedDataFilesCount());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     shouldHaveFiles(table, 3);
 
@@ -508,6 +535,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     // Perform a rewrite but only allow 2 files to be compacted at a time
     RewriteDataFiles.Result result =
@@ -519,6 +547,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -535,6 +564,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     // Perform a rewrite but only allow 2 files to be compacted at a time
     RewriteDataFiles.Result result =
@@ -545,6 +575,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -561,6 +592,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     // Perform a rewrite but only allow 2 files to be compacted at a time
     RewriteDataFiles.Result result =
@@ -572,6 +604,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -694,6 +727,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFilesSparkAction realRewrite =
         basicRewrite(table)
@@ -713,6 +747,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     RewriteDataFiles.Result result = spyRewrite.execute();
 
     Assert.assertEquals("Should have 7 fileGroups", 
result.rewriteResults().size(), 7);
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();
 
@@ -732,6 +767,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFilesSparkAction realRewrite =
         basicRewrite(table)
@@ -752,6 +788,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     RewriteDataFiles.Result result = spyRewrite.execute();
 
     Assert.assertEquals("Should have 7 fileGroups", 
result.rewriteResults().size(), 7);
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();
 
@@ -771,6 +808,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFilesSparkAction realRewrite =
         basicRewrite(table)
@@ -796,6 +834,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
 
     // Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10 
total groups comitted
     Assert.assertEquals("Should have 6 fileGroups", 6, 
result.rewriteResults().size());
+    
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
 
     table.refresh();
 
@@ -849,6 +888,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     int fileSize = averageFileSize(table);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     // Perform a rewrite but only allow 2 files to be compacted at a time
     RewriteDataFiles.Result result =
@@ -860,6 +900,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 10 fileGroups", 
result.rewriteResults().size(), 10);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -878,6 +919,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitUnsorted(table, "c2");
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -889,6 +931,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -910,6 +953,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveLastCommitUnsorted(table, "c2");
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -924,6 +968,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
         "Should have 1 fileGroup because all files were not correctly 
partitioned",
         result.rewriteResults().size(),
         1);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -943,6 +988,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveFiles(table, 20);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -953,6 +999,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -979,6 +1026,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveFiles(table, 20);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -990,6 +1038,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
 
     table.refresh();
 
@@ -1010,6 +1059,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     shouldHaveFiles(table, 20);
 
     List<Object[]> originalData = currentData();
+    long dataSizeBefore = testDataSize(table);
 
     RewriteDataFiles.Result result =
         basicRewrite(table)
@@ -1025,6 +1075,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 
result.rewriteResults().size(), 1);
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
     Assert.assertTrue(
         "Should have written 40+ files",
         Iterables.size(table.currentSnapshot().addedDataFiles(table.io())) >= 
40);
@@ -1088,6 +1139,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     Assert.assertTrue("Should require all files to scan c2", originalFilesC2 > 
0.99);
     Assert.assertTrue("Should require all files to scan c3", originalFilesC3 > 
0.99);
 
+    long dataSizeBefore = testDataSize(table);
     RewriteDataFiles.Result result =
         basicRewrite(table)
             .zOrder("c2", "c3")
@@ -1102,6 +1154,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 1, 
result.rewriteResults().size());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
     int zOrderedFilesTotal = 
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
     Assert.assertTrue("Should have written 40+ files", zOrderedFilesTotal >= 
40);
 
@@ -1137,6 +1190,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
     List<Row> originalRaw =
         
spark.read().format("iceberg").load(tableLocation).sort("longCol").collectAsList();
     List<Object[]> originalData = rowsToJava(originalRaw);
+    long dataSizeBefore = testDataSize(table);
 
     // TODO add in UUID when it is supported in Spark
     RewriteDataFiles.Result result =
@@ -1156,6 +1210,7 @@ public class TestRewriteDataFilesAction extends 
SparkTestBase {
             .execute();
 
     Assert.assertEquals("Should have 1 fileGroups", 1, 
result.rewriteResults().size());
+    assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
     int zOrderedFilesTotal = 
Iterables.size(table.currentSnapshot().addedDataFiles(table.io()));
     Assert.assertEquals("Should have written 1 file", 1, zOrderedFilesTotal);
 
diff --git 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 3eaca63294..7da2dc0882 100644
--- 
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++ 
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.junit.After;
 import org.junit.Before;
@@ -49,7 +50,8 @@ public class TestRefreshTable extends SparkCatalogTestBase {
   public void testRefreshCommand() {
     // We are not allowed to change the session catalog after it has been 
initialized, so build a
     // new one
-    if (catalogName.equals("spark_catalog")) {
+    if (catalogName.equals(SparkCatalogConfig.SPARK.catalogName())
+        || catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())) {
       spark.conf().set("spark.sql.catalog." + catalogName + ".cache-enabled", 
true);
       spark = spark.cloneSession();
     }


Reply via email to