This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b0f55ce835 Spark 3.1: Add rewritten bytes to rewrite data files
procedure results (#6856)
b0f55ce835 is described below
commit b0f55ce83544ea802d3211cccc20dc297192ed9d
Author: Eduard Tudenhöfner <[email protected]>
AuthorDate: Thu Feb 16 14:50:18 2023 +0100
Spark 3.1: Add rewritten bytes to rewrite data files procedure results
(#6856)
---
.../extensions/TestRewriteDataFilesProcedure.java | 66 +++++++++++++++++-----
.../procedures/RewriteDataFilesProcedure.java | 7 ++-
.../apache/iceberg/spark/SparkCatalogConfig.java | 5 +-
.../org/apache/iceberg/spark/SparkTestBase.java | 2 +-
.../spark/actions/TestRewriteDataFilesAction.java | 49 ++++++++++++++++
.../apache/iceberg/spark/sql/TestRefreshTable.java | 4 +-
6 files changed, 114 insertions(+), 19 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 5e9ace3679..d8c415b281 100644
---
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -18,10 +18,14 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -49,7 +53,7 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
public void testRewriteDataFilesInEmptyTable() {
createTable();
List<Object[]> output = sql("CALL %s.system.rewrite_data_files('%s')",
catalogName, tableIdent);
- assertEquals("Procedure output must match", ImmutableList.of(row(0, 0)),
output);
+ assertEquals("Procedure output must match", ImmutableList.of(row(0, 0,
0L)), output);
}
@Test
@@ -64,8 +68,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 10 data files and add 2 data files (one per
partition) ",
- ImmutableList.of(row(10, 2)),
- output);
+ row(10, 2),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -83,8 +92,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
- ImmutableList.of(row(10, 1)),
- output);
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -105,7 +119,7 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 0 data files and add 0 data files",
- ImmutableList.of(row(0, 0)),
+ ImmutableList.of(row(0, 0, 0L)),
output);
List<Object[]> actualRecords = currentData();
@@ -128,8 +142,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
- ImmutableList.of(row(10, 1)),
- output);
+ row(10, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -151,8 +170,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 5 data files (containing c1 = 1) and add 1 data
files",
- ImmutableList.of(row(5, 1)),
- output);
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -174,8 +198,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
- ImmutableList.of(row(5, 1)),
- output);
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -197,8 +226,13 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
assertEquals(
"Action should rewrite 5 data files from single matching partition"
+ "(containing c2 = bar) and add 1 data files",
- ImmutableList.of(row(5, 1)),
- output);
+ row(5, 1),
+ Arrays.copyOf(output.get(0), 2));
+ // verify rewritten bytes separately
+ assertThat(output.get(0)).hasSize(3);
+ assertThat(output.get(0)[2])
+ .isInstanceOf(Long.class)
+
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords,
actualRecords);
@@ -409,6 +443,10 @@ public class TestRewriteDataFilesProcedure extends
SparkExtensionsTestBase {
}
}
+ private Map<String, String> snapshotSummary() {
+ return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
+ }
+
private List<Object[]> currentData() {
return rowsToJava(
spark.sql("SELECT * FROM " + tableName + " order by c1, c2,
c3").collectAsList());
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
index d703ae6d81..52d9096812 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java
@@ -63,7 +63,8 @@ class RewriteDataFilesProcedure extends BaseProcedure {
new StructField(
"rewritten_data_files_count", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField(
- "added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty())
+ "added_data_files_count", DataTypes.IntegerType, false,
Metadata.empty()),
+ new StructField("rewritten_bytes_count", DataTypes.LongType,
false, Metadata.empty())
});
public static ProcedureBuilder builder() {
@@ -187,8 +188,10 @@ class RewriteDataFilesProcedure extends BaseProcedure {
private InternalRow[] toOutputRows(RewriteDataFiles.Result result) {
int rewrittenDataFilesCount = result.rewrittenDataFilesCount();
+ long rewrittenBytesCount = result.rewrittenBytesCount();
int addedDataFilesCount = result.addedDataFilesCount();
- InternalRow row = newInternalRow(rewrittenDataFilesCount,
addedDataFilesCount);
+ InternalRow row =
+ newInternalRow(rewrittenDataFilesCount, addedDataFilesCount,
rewrittenBytesCount);
return new InternalRow[] {row};
}
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
index 1006ed380f..fc18ed3bb1 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java
@@ -28,7 +28,10 @@ public enum SparkCatalogConfig {
ImmutableMap.of(
"type", "hive",
"default-namespace", "default")),
- HADOOP("testhadoop", SparkCatalog.class.getName(), ImmutableMap.of("type",
"hadoop")),
+ HADOOP(
+ "testhadoop",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of("type", "hadoop", "cache-enabled", "false")),
SPARK(
"spark_catalog",
SparkSessionCatalog.class.getName(),
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
index 7f6028596f..352ac403bb 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -165,7 +165,7 @@ public abstract class SparkTestBase {
}
}
- private void assertEquals(String context, Object[] expectedRow, Object[]
actualRow) {
+ protected void assertEquals(String context, Object[] expectedRow, Object[]
actualRow) {
Assert.assertEquals("Number of columns should match", expectedRow.length,
actualRow.length);
for (int col = 0; col < actualRow.length; col += 1) {
Object expectedValue = expectedRow[col];
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 6d62fd2239..0d907569ee 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doAnswer;
@@ -151,10 +152,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Table table = createTable(4);
shouldHaveFiles(table, 4);
List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).execute();
Assert.assertEquals("Action should rewrite 4 data files", 4,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 1);
List<Object[]> actual = currentData();
@@ -167,10 +170,12 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).execute();
Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
@@ -183,6 +188,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Table table = createTablePartitioned(4, 2);
shouldHaveFiles(table, 8);
List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table)
@@ -192,6 +198,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
shouldHaveFiles(table, 7);
@@ -225,6 +232,8 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
+
Result result =
actions()
.rewriteDataFiles(table)
@@ -235,6 +244,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.option(BinPackStrategy.DELETE_FILE_THRESHOLD, "2")
.execute();
Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
@@ -261,12 +271,15 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
rowDelta.commit();
table.refresh();
List<Object[]> expectedRecords = currentData();
+ long dataSizeBefore = testDataSize(table);
+
Result result =
actions()
.rewriteDataFiles(table)
.option(BinPackStrategy.DELETE_FILE_THRESHOLD, "1")
.execute();
Assert.assertEquals("Action should rewrite 1 data files", 1,
result.rewrittenDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
List<Object[]> actualRecords = currentData();
assertEquals("Rows must match", expectedRecords, actualRecords);
@@ -292,11 +305,13 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER,
"true").execute();
Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
@@ -324,11 +339,13 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
table.refresh();
long oldSequenceNumber = table.currentSnapshot().sequenceNumber();
Assert.assertEquals("Table sequence number should be 0", 0,
oldSequenceNumber);
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table).option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER,
"true").execute();
Assert.assertEquals("Action should rewrite 8 data files", 8,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 4 data file", 4,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 4);
List<Object[]> actualRecords = currentData();
@@ -374,9 +391,11 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 2);
+ long dataSizeBefore = testDataSize(table);
Result result = basicRewrite(table).filter(Expressions.equal("c3",
"0")).execute();
Assert.assertEquals("Action should rewrite 2 data files", 2,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
List<Object[]> actualRecords = currentData();
@@ -391,6 +410,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
List<Object[]> expectedRecords = currentData();
long targetSize = testDataSize(table) / 2;
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table)
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Long.toString(targetSize))
@@ -399,6 +419,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertEquals("Action should delete 1 data files", 1,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 2 data files", 2,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 2);
@@ -420,6 +441,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int targetSize = averageFileSize(table);
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table)
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(targetSize + 1000))
@@ -431,6 +453,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// Should Split the big files into 3 pieces, one of which should be
combined with the two
// smaller files
Assert.assertEquals("Action should add 3 data files", 3,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 3);
@@ -447,6 +470,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int targetSize = ((int) testDataSize(table) / 3);
// The test is to see if we can combine parts of files to make files of
the correct size
+ long dataSizeBefore = testDataSize(table);
Result result =
basicRewrite(table)
.option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES,
Integer.toString(targetSize))
@@ -458,6 +482,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
Assert.assertEquals("Action should delete 4 data files", 4,
result.rewrittenDataFilesCount());
Assert.assertEquals("Action should add 3 data files", 3,
result.addedDataFilesCount());
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
shouldHaveFiles(table, 3);
@@ -471,6 +496,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
// Perform a rewrite but only allow 2 files to be compacted at a time
RewriteDataFiles.Result result =
@@ -482,6 +508,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -498,6 +525,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
// Perform a rewrite but only allow 2 files to be compacted at a time
RewriteDataFiles.Result result =
@@ -508,6 +536,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -524,6 +553,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
// Perform a rewrite but only allow 2 files to be compacted at a time
RewriteDataFiles.Result result =
@@ -535,6 +565,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -663,6 +694,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
BaseRewriteDataFilesSparkAction realRewrite =
(org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
@@ -684,6 +716,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.Result result = spyRewrite.execute();
Assert.assertEquals("Should have 7 fileGroups",
result.rewriteResults().size(), 7);
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();
@@ -703,6 +736,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
BaseRewriteDataFilesSparkAction realRewrite =
(org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
@@ -725,6 +759,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
RewriteDataFiles.Result result = spyRewrite.execute();
Assert.assertEquals("Should have 7 fileGroups",
result.rewriteResults().size(), 7);
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();
@@ -744,6 +779,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
BaseRewriteDataFilesSparkAction realRewrite =
(org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction)
@@ -771,6 +807,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
// Commit 1: 4/4 + Commit 2 failed 0/4 + Commit 3: 2/2 == 6 out of 10
total groups comitted
Assert.assertEquals("Should have 6 fileGroups", 6,
result.rewriteResults().size());
+
assertThat(result.rewrittenBytesCount()).isGreaterThan(0L).isLessThan(dataSizeBefore);
table.refresh();
@@ -824,6 +861,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
int fileSize = averageFileSize(table);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
// Perform a rewrite but only allow 2 files to be compacted at a time
RewriteDataFiles.Result result =
@@ -835,6 +873,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 10 fileGroups",
result.rewriteResults().size(), 10);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -853,6 +892,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitUnsorted(table, "c2");
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
basicRewrite(table)
@@ -864,6 +904,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -885,6 +926,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveLastCommitUnsorted(table, "c2");
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
basicRewrite(table)
@@ -899,6 +941,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
"Should have 1 fileGroup because all files were not correctly
partitioned",
result.rewriteResults().size(),
1);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -918,6 +961,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 20);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
basicRewrite(table)
@@ -928,6 +972,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -954,6 +999,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 20);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
basicRewrite(table)
@@ -965,6 +1011,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
table.refresh();
@@ -985,6 +1032,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
shouldHaveFiles(table, 20);
List<Object[]> originalData = currentData();
+ long dataSizeBefore = testDataSize(table);
RewriteDataFiles.Result result =
basicRewrite(table)
@@ -1000,6 +1048,7 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.execute();
Assert.assertEquals("Should have 1 fileGroups",
result.rewriteResults().size(), 1);
+ assertThat(result.rewrittenBytesCount()).isEqualTo(dataSizeBefore);
Assert.assertTrue(
"Should have written 40+ files",
Iterables.size(table.currentSnapshot().addedDataFiles(table.io())) >=
40);
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
index 3eaca63294..7da2dc0882 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestRefreshTable.java
@@ -23,6 +23,7 @@ import java.util.Map;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.junit.After;
import org.junit.Before;
@@ -49,7 +50,8 @@ public class TestRefreshTable extends SparkCatalogTestBase {
public void testRefreshCommand() {
// We are not allowed to change the session catalog after it has been
initialized, so build a
// new one
- if (catalogName.equals("spark_catalog")) {
+ if (catalogName.equals(SparkCatalogConfig.SPARK.catalogName())
+ || catalogName.equals(SparkCatalogConfig.HADOOP.catalogName())) {
spark.conf().set("spark.sql.catalog." + catalogName + ".cache-enabled",
true);
spark = spark.cloneSession();
}