This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 8a7a1ed99b Spark: Remove deprecated AssertHelpers usage (#7483)
8a7a1ed99b is described below
commit 8a7a1ed99b80a74c4f0730a93e91809ca2eaa322
Author: Liu Xiao <[email protected]>
AuthorDate: Tue May 2 20:43:05 2023 +0800
Spark: Remove deprecated AssertHelpers usage (#7483)
---
.../apache/iceberg/spark/TestFunctionCatalog.java | 40 ++++----
.../actions/TestDeleteReachableFilesAction.java | 21 ++--
.../spark/actions/TestExpireSnapshotsAction.java | 20 ++--
.../spark/actions/TestRemoveOrphanFilesAction.java | 58 +++++------
.../spark/actions/TestRewriteDataFilesAction.java | 114 ++++++++++-----------
.../spark/actions/TestRewriteManifestsAction.java | 12 +--
.../vectorized/TestParquetVectorizedReads.java | 36 +++----
.../spark/source/TestDataFrameWriterV2.java | 34 ++----
.../spark/source/TestDataSourceOptions.java | 79 +++++++-------
.../spark/source/TestForwardCompatibility.java | 29 +++---
.../spark/source/TestIcebergSourceTablesBase.java | 22 ++--
.../TestMetadataTablesWithPartitionEvolution.java | 10 +-
.../TestRequiredDistributionAndOrdering.java | 27 +++--
.../iceberg/spark/source/TestSparkDataWrite.java | 24 ++---
.../spark/source/TestSparkMetadataColumns.java | 19 ++--
.../spark/source/TestStructuredStreamingRead3.java | 19 ++--
.../spark/source/TestTimestampWithoutZone.java | 67 +++++-------
.../spark/source/TestWriteMetricsConfig.java | 12 +--
18 files changed, 287 insertions(+), 356 deletions(-)
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
index 1db0fa41f7..be91c8d637 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestFunctionCatalog.java
@@ -18,7 +18,6 @@
*/
package org.apache.iceberg.spark;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.IcebergBuild;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.functions.IcebergVersionFunction;
@@ -68,11 +67,9 @@ public class TestFunctionCatalog extends
SparkTestBaseWithCatalog {
new Identifier[0],
asFunctionCatalog.listFunctions(DEFAULT_NAMESPACE));
- AssertHelpers.assertThrows(
- "Listing functions in a namespace that does not exist should throw",
- NoSuchNamespaceException.class,
- "The schema `db` cannot be found",
- () -> asFunctionCatalog.listFunctions(DB_NAMESPACE));
+ Assertions.assertThatThrownBy(() ->
asFunctionCatalog.listFunctions(DB_NAMESPACE))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageStartingWith("[SCHEMA_NOT_FOUND] The schema `db` cannot be
found.");
}
@Test
@@ -87,24 +84,23 @@ public class TestFunctionCatalog extends
SparkTestBaseWithCatalog {
.isExactlyInstanceOf(IcebergVersionFunction.class);
}
- AssertHelpers.assertThrows(
- "Cannot load a function if it's not used with the system namespace or
the empty namespace",
- NoSuchFunctionException.class,
- "The function default.iceberg_version cannot be found",
- () -> asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE,
"iceberg_version")));
+ Assertions.assertThatThrownBy(
+ () ->
+
asFunctionCatalog.loadFunction(Identifier.of(DEFAULT_NAMESPACE,
"iceberg_version")))
+ .isInstanceOf(NoSuchFunctionException.class)
+ .hasMessageStartingWith(
+ "[ROUTINE_NOT_FOUND] The function default.iceberg_version cannot
be found.");
Identifier undefinedFunction = Identifier.of(SYSTEM_NAMESPACE,
"undefined_function");
- AssertHelpers.assertThrows(
- "Cannot load a function that does not exist",
- NoSuchFunctionException.class,
- "The function system.undefined_function cannot be found",
- () -> asFunctionCatalog.loadFunction(undefinedFunction));
-
- AssertHelpers.assertThrows(
- "Using an undefined function from SQL should fail analysis",
- AnalysisException.class,
- "Cannot resolve function",
- () -> sql("SELECT undefined_function(1, 2)"));
+ Assertions.assertThatThrownBy(() ->
asFunctionCatalog.loadFunction(undefinedFunction))
+ .isInstanceOf(NoSuchFunctionException.class)
+ .hasMessageStartingWith(
+ "[ROUTINE_NOT_FOUND] The function system.undefined_function cannot
be found.");
+
+ Assertions.assertThatThrownBy(() -> sql("SELECT undefined_function(1, 2)"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith(
+ "[UNRESOLVED_ROUTINE] Cannot resolve function `undefined_function`
on search path");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
index 154e940519..18de71ce8e 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestDeleteReachableFilesAction.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
@@ -51,6 +50,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -366,22 +366,21 @@ public class TestDeleteReachableFilesAction extends
SparkTestBase {
public void testEmptyIOThrowsException() {
DeleteReachableFiles baseRemoveFilesSparkAction =
sparkActions().deleteReachableFiles(metadataLocation(table)).io(null);
- AssertHelpers.assertThrows(
- "FileIO can't be null in DeleteReachableFiles action",
- IllegalArgumentException.class,
- "File IO cannot be null",
- baseRemoveFilesSparkAction::execute);
+
+ Assertions.assertThatThrownBy(baseRemoveFilesSparkAction::execute)
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("File IO cannot be null");
}
@Test
public void testRemoveFilesActionWhenGarbageCollectionDisabled() {
table.updateProperties().set(TableProperties.GC_ENABLED, "false").commit();
- AssertHelpers.assertThrows(
- "Should complain about removing files when GC is disabled",
- ValidationException.class,
- "Cannot delete files: GC is disabled (deleting files may corrupt other
tables)",
- () ->
sparkActions().deleteReachableFiles(metadataLocation(table)).execute());
+ Assertions.assertThatThrownBy(
+ () ->
sparkActions().deleteReachableFiles(metadataLocation(table)).execute())
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot delete files: GC is disabled (deleting files may corrupt
other tables)");
}
private String metadataLocation(Table tbl) {
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
index 25e6f6486a..e90626cbda 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestExpireSnapshotsAction.java
@@ -29,7 +29,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
@@ -57,6 +56,7 @@ import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -447,11 +447,10 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
table.newAppend().appendFile(FILE_A).commit();
- AssertHelpers.assertThrows(
- "Should complain about expiring snapshots",
- ValidationException.class,
- "Cannot expire snapshots: GC is disabled",
- () -> SparkActions.get().expireSnapshots(table));
+ Assertions.assertThatThrownBy(() ->
SparkActions.get().expireSnapshots(table))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot expire snapshots: GC is disabled (deleting files may
corrupt other tables)");
}
@Test
@@ -529,11 +528,10 @@ public class TestExpireSnapshotsAction extends
SparkTestBase {
@Test
public void testRetainZeroSnapshots() {
- AssertHelpers.assertThrows(
- "Should fail retain 0 snapshots " + "because number of snapshots to
retain cannot be zero",
- IllegalArgumentException.class,
- "Number of snapshots to retain must be at least 1, cannot be: 0",
- () ->
SparkActions.get().expireSnapshots(table).retainLast(0).execute());
+ Assertions.assertThatThrownBy(
+ () ->
SparkActions.get().expireSnapshots(table).retainLast(0).execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Number of snapshots to retain must be at least 1, cannot
be: 0");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index d91ac3606d..032ccfb14d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Files;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
@@ -751,11 +750,10 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
table.updateProperties().set(TableProperties.GC_ENABLED, "false").commit();
- AssertHelpers.assertThrows(
- "Should complain about removing orphan files",
- ValidationException.class,
- "Cannot delete orphan files: GC is disabled",
- () -> SparkActions.get().deleteOrphanFiles(table).execute());
+ Assertions.assertThatThrownBy(() ->
SparkActions.get().deleteOrphanFiles(table).execute())
+ .isInstanceOf(ValidationException.class)
+ .hasMessage(
+ "Cannot delete orphan files: GC is disabled (deleting files may
corrupt other tables)");
}
@Test
@@ -987,18 +985,18 @@ public abstract class TestRemoveOrphanFilesAction extends
SparkTestBase {
public void testPathsWithEqualSchemes() {
List<String> validFiles =
Lists.newArrayList("scheme1://bucket1/dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("scheme2://bucket1/dir1/dir2/file1");
- AssertHelpers.assertThrows(
- "Test remove orphan files with equal schemes",
- ValidationException.class,
- "Conflicting authorities/schemes: [(scheme1, scheme2)]",
- () ->
- executeTest(
- validFiles,
- actualFiles,
- Lists.newArrayList(),
- ImmutableMap.of(),
- ImmutableMap.of(),
- DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+ Assertions.assertThatThrownBy(
+ () ->
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Unable to determine whether certain files are
orphan")
+ .hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1,
scheme2)].");
Map<String, String> equalSchemes = Maps.newHashMap();
equalSchemes.put("scheme1", "scheme");
@@ -1016,18 +1014,18 @@ public abstract class TestRemoveOrphanFilesAction
extends SparkTestBase {
public void testPathsWithEqualAuthorities() {
List<String> validFiles =
Lists.newArrayList("hdfs://servicename1/dir1/dir2/file1");
List<String> actualFiles =
Lists.newArrayList("hdfs://servicename2/dir1/dir2/file1");
- AssertHelpers.assertThrows(
- "Test remove orphan files with equal authorities",
- ValidationException.class,
- "Conflicting authorities/schemes: [(servicename1, servicename2)]",
- () ->
- executeTest(
- validFiles,
- actualFiles,
- Lists.newArrayList(),
- ImmutableMap.of(),
- ImmutableMap.of(),
- DeleteOrphanFiles.PrefixMismatchMode.ERROR));
+ Assertions.assertThatThrownBy(
+ () ->
+ executeTest(
+ validFiles,
+ actualFiles,
+ Lists.newArrayList(),
+ ImmutableMap.of(),
+ ImmutableMap.of(),
+ DeleteOrphanFiles.PrefixMismatchMode.ERROR))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith("Unable to determine whether certain files are
orphan")
+ .hasMessageEndingWith("Conflicting authorities/schemes:
[(servicename1, servicename2)].");
Map<String, String> equalAuthorities = Maps.newHashMap();
equalAuthorities.put("servicename1", "servicename");
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 3ecd7ce371..a638776033 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -47,7 +47,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -104,6 +103,7 @@ import org.apache.iceberg.util.Pair;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -647,10 +647,9 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));
- AssertHelpers.assertThrows(
- "Should fail entire rewrite if part fails",
- RuntimeException.class,
- () -> spyRewrite.execute());
+ Assertions.assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Rewrite Failed");
table.refresh();
@@ -682,10 +681,9 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
doReturn(util).when(spyRewrite).commitManager(table.currentSnapshot().snapshotId());
- AssertHelpers.assertThrows(
- "Should fail entire rewrite if commit fails",
- RuntimeException.class,
- () -> spyRewrite.execute());
+ Assertions.assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Commit Failure");
table.refresh();
@@ -718,10 +716,9 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
.when(spyRewrite)
.rewriteFiles(any(), argThat(failGroup));
- AssertHelpers.assertThrows(
- "Should fail entire rewrite if part fails",
- RuntimeException.class,
- () -> spyRewrite.execute());
+ Assertions.assertThatThrownBy(spyRewrite::execute)
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Rewrite Failed");
table.refresh();
@@ -863,32 +860,35 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
public void testInvalidOptions() {
Table table = createTable(20);
- AssertHelpers.assertThrows(
- "No negative values for partial progress max commits",
- IllegalArgumentException.class,
- () ->
- basicRewrite(table)
- .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
- .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "-5")
- .execute());
-
- AssertHelpers.assertThrows(
- "No negative values for max concurrent groups",
- IllegalArgumentException.class,
- () ->
- basicRewrite(table)
- .option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES,
"-5")
- .execute());
-
- AssertHelpers.assertThrows(
- "No unknown options allowed",
- IllegalArgumentException.class,
- () -> basicRewrite(table).option("foobarity", "-5").execute());
-
- AssertHelpers.assertThrows(
- "Cannot set rewrite-job-order to foo",
- IllegalArgumentException.class,
- () -> basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER,
"foo").execute());
+ Assertions.assertThatThrownBy(
+ () ->
+ basicRewrite(table)
+ .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
+ .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS,
"-5")
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot set partial-progress.max-commits to -5, "
+ + "the value must be positive when partial-progress.enabled is
true");
+
+ Assertions.assertThatThrownBy(
+ () ->
+ basicRewrite(table)
+
.option(RewriteDataFiles.MAX_CONCURRENT_FILE_GROUP_REWRITES, "-5")
+ .execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot set max-concurrent-file-group-rewrites to -5, the value
must be positive.");
+
+ Assertions.assertThatThrownBy(() ->
basicRewrite(table).option("foobarity", "-5").execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot use options [foobarity], they are not supported by the
action or the rewriter BIN-PACK");
+
+ Assertions.assertThatThrownBy(
+ () ->
basicRewrite(table).option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo").execute())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid rewrite job order name: foo");
}
@Test
@@ -1124,10 +1124,10 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
doReturn(util).when(spyAction).commitManager(table.currentSnapshot().snapshotId());
- AssertHelpers.assertThrows(
- "Should propagate CommitStateUnknown Exception",
- CommitStateUnknownException.class,
- () -> spyAction.execute());
+ Assertions.assertThatThrownBy(spyAction::execute)
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith(
+ "Unknown State\n" + "Cannot determine whether the commit was
successful or not");
List<Object[]> postRewriteData = currentData();
assertEquals("We shouldn't have changed the data", originalData,
postRewriteData);
@@ -1243,23 +1243,17 @@ public class TestRewriteDataFilesAction extends
SparkTestBase {
SortOrder sortOrder =
SortOrder.builderFor(table.schema()).asc("c2").build();
- AssertHelpers.assertThrows(
- "Should be unable to set Strategy more than once",
- IllegalArgumentException.class,
- "Must use only one rewriter type",
- () -> actions().rewriteDataFiles(table).binPack().sort());
-
- AssertHelpers.assertThrows(
- "Should be unable to set Strategy more than once",
- IllegalArgumentException.class,
- "Must use only one rewriter type",
- () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
-
- AssertHelpers.assertThrows(
- "Should be unable to set Strategy more than once",
- IllegalArgumentException.class,
- "Must use only one rewriter type",
- () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
+ Assertions.assertThatThrownBy(() ->
actions().rewriteDataFiles(table).binPack().sort())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Must use only one rewriter type (bin-pack, sort,
zorder)");
+
+ Assertions.assertThatThrownBy(() ->
actions().rewriteDataFiles(table).sort(sortOrder).binPack())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Must use only one rewriter type (bin-pack, sort,
zorder)");
+
+ Assertions.assertThatThrownBy(() ->
actions().rewriteDataFiles(table).sort(sortOrder).binPack())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Must use only one rewriter type (bin-pack, sort,
zorder)");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4aafb72ace..64dbf42d4c 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -33,7 +33,6 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
@@ -57,6 +56,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.TableIdentifier;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@@ -207,11 +207,11 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
Table spyTable = spy(table);
when(spyTable.rewriteManifests()).thenReturn(spyNewRewriteManifests);
- AssertHelpers.assertThrowsCause(
- "Should throw a Commit State Unknown Exception",
- RuntimeException.class,
- "Datacenter on Fire",
- () -> actions.rewriteManifests(spyTable).rewriteIf(manifest ->
true).execute());
+ Assertions.assertThatThrownBy(
+ () -> actions.rewriteManifests(spyTable).rewriteIf(manifest ->
true).execute())
+ .cause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage("Datacenter on Fire");
table.refresh();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 56e9490b99..5f86332120 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -25,7 +25,6 @@ import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.generic.GenericData;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.io.CloseableIterable;
@@ -46,6 +45,7 @@ import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Ignore;
@@ -220,17 +220,16 @@ public class TestParquetVectorizedReads extends
AvroDataTest {
@Test
@Override
public void testNestedStruct() {
- AssertHelpers.assertThrows(
- "Vectorized reads are not supported yet for struct fields",
- UnsupportedOperationException.class,
- "Vectorized reads are not supported yet for struct fields",
- () ->
- VectorizedSparkParquetReaders.buildReader(
- TypeUtil.assignIncreasingFreshIds(
- new Schema(required(1, "struct", SUPPORTED_PRIMITIVES))),
- new MessageType(
- "struct", new GroupType(Type.Repetition.OPTIONAL,
"struct").withId(1)),
- false));
+ Assertions.assertThatThrownBy(
+ () ->
+ VectorizedSparkParquetReaders.buildReader(
+ TypeUtil.assignIncreasingFreshIds(
+ new Schema(required(1, "struct",
SUPPORTED_PRIMITIVES))),
+ new MessageType(
+ "struct", new GroupType(Type.Repetition.OPTIONAL,
"struct").withId(1)),
+ false))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessage("Vectorized reads are not supported yet for struct
fields");
}
@Test
@@ -350,13 +349,10 @@ public class TestParquetVectorizedReads extends
AvroDataTest {
try (FileAppender<GenericData.Record> writer = getParquetV2Writer(schema,
dataFile)) {
writer.addAll(data);
}
- AssertHelpers.assertThrows(
- "Vectorized reads not supported",
- UnsupportedOperationException.class,
- "Cannot support vectorized reads for column",
- () -> {
- assertRecordsMatch(schema, 30000, data, dataFile, false, true,
BATCH_SIZE);
- return null;
- });
+ Assertions.assertThatThrownBy(
+ () -> assertRecordsMatch(schema, 30000, data, dataFile, false,
true, BATCH_SIZE))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageStartingWith("Cannot support vectorized reads for column")
+ .hasMessageEndingWith("Disable vectorized reads to read this
table/file");
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
index a9b4f0d3ad..76b138ced7 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java
@@ -19,7 +19,6 @@
package org.apache.iceberg.spark.source;
import java.util.List;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.Spark3Util;
@@ -33,6 +32,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.internal.SQLConf;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -76,18 +76,9 @@ public class TestDataFrameWriterV2 extends
SparkTestBaseWithCatalog {
// this has a different error message than the case without
accept-any-schema because it uses
// Iceberg checks
- AssertHelpers.assertThrows(
- "Should fail when merge-schema is not enabled on the writer",
- IllegalArgumentException.class,
- "Field new_col not found in source schema",
- () -> {
- try {
- threeColDF.writeTo(tableName).append();
- } catch (NoSuchTableException e) {
- // needed because append has checked exceptions
- throw new RuntimeException(e);
- }
- });
+ Assertions.assertThatThrownBy(() -> threeColDF.writeTo(tableName).append())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Field new_col not found in source schema");
}
@Test
@@ -111,18 +102,11 @@ public class TestDataFrameWriterV2 extends
SparkTestBaseWithCatalog {
"{ \"id\": 3, \"data\": \"c\", \"new_col\": 12.06 }",
"{ \"id\": 4, \"data\": \"d\", \"new_col\": 14.41 }");
- AssertHelpers.assertThrows(
- "Should fail when accept-any-schema is not enabled on the table",
- AnalysisException.class,
- "too many data columns",
- () -> {
- try {
- threeColDF.writeTo(tableName).option("merge-schema",
"true").append();
- } catch (NoSuchTableException e) {
- // needed because append has checked exceptions
- throw new RuntimeException(e);
- }
- });
+ Assertions.assertThatThrownBy(
+ () -> threeColDF.writeTo(tableName).option("merge-schema",
"true").append())
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageStartingWith(
+ "Cannot write to 'testhadoop.default.table', too many data
columns");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..44400b5ad4 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
@@ -52,6 +51,7 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -242,50 +242,47 @@ public class TestDataSourceOptions {
List<Long> snapshotIds = SnapshotUtil.currentAncestorIds(table);
// start-snapshot-id and snapshot-id are both configured.
- AssertHelpers.assertThrows(
- "Check both start-snapshot-id and snapshot-id are configured",
- IllegalArgumentException.class,
- "Cannot set start-snapshot-id and end-snapshot-id for incremental
scans",
- () -> {
- spark
- .read()
- .format("iceberg")
- .option("snapshot-id", snapshotIds.get(3).toString())
- .option("start-snapshot-id", snapshotIds.get(3).toString())
- .load(tableLocation)
- .explain();
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option("snapshot-id", snapshotIds.get(3).toString())
+ .option("start-snapshot-id", snapshotIds.get(3).toString())
+ .load(tableLocation)
+ .explain())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot set start-snapshot-id and end-snapshot-id for incremental
scans when either snapshot-id or as-of-timestamp is set");
// end-snapshot-id and as-of-timestamp are both configured.
- AssertHelpers.assertThrows(
- "Check both start-snapshot-id and snapshot-id are configured",
- IllegalArgumentException.class,
- "Cannot set start-snapshot-id and end-snapshot-id for incremental
scans",
- () -> {
- spark
- .read()
- .format("iceberg")
- .option(
- SparkReadOptions.AS_OF_TIMESTAMP,
-
Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
- .option("end-snapshot-id", snapshotIds.get(2).toString())
- .load(tableLocation)
- .explain();
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option(
+ SparkReadOptions.AS_OF_TIMESTAMP,
+
Long.toString(table.snapshot(snapshotIds.get(3)).timestampMillis()))
+ .option("end-snapshot-id", snapshotIds.get(2).toString())
+ .load(tableLocation)
+ .explain())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot set start-snapshot-id and end-snapshot-id for incremental
scans when either snapshot-id or as-of-timestamp is set");
// only end-snapshot-id is configured.
- AssertHelpers.assertThrows(
- "Check both start-snapshot-id and snapshot-id are configured",
- IllegalArgumentException.class,
- "Cannot set only end-snapshot-id for incremental scans",
- () -> {
- spark
- .read()
- .format("iceberg")
- .option("end-snapshot-id", snapshotIds.get(2).toString())
- .load(tableLocation)
- .explain();
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option("end-snapshot-id", snapshotIds.get(2).toString())
+ .load(tableLocation)
+ .explain())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot set only end-snapshot-id for incremental scans. Please,
set start-snapshot-id too.");
// test (1st snapshot, current snapshot] incremental scan.
List<SimpleRecord> result =
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
index fe44023590..73e572ecae 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
@@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
@@ -56,6 +55,7 @@ import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.MemoryStream;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -116,16 +116,15 @@ public class TestForwardCompatibility {
Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
- AssertHelpers.assertThrows(
- "Should reject write with unsupported transform",
- UnsupportedOperationException.class,
- "Cannot write using unsupported transforms: zero",
- () ->
- df.select("id", "data")
- .write()
- .format("iceberg")
- .mode("append")
- .save(location.toString()));
+ Assertions.assertThatThrownBy(
+ () ->
+ df.select("id", "data")
+ .write()
+ .format("iceberg")
+ .mode("append")
+ .save(location.toString()))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageEndingWith("Cannot write using unsupported transforms:
zero");
}
@Test
@@ -155,11 +154,9 @@ public class TestForwardCompatibility {
List<Integer> batch1 = Lists.newArrayList(1, 2);
send(batch1, inputStream);
- AssertHelpers.assertThrows(
- "Should reject streaming write with unsupported transform",
- StreamingQueryException.class,
- "Cannot write using unsupported transforms: zero",
- query::processAllAvailable);
+ Assertions.assertThatThrownBy(query::processAllAvailable)
+ .isInstanceOf(StreamingQueryException.class)
+ .hasMessageEndingWith("Cannot write using unsupported transforms:
zero");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index fc023cdfdb..ac53f09511 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -36,7 +36,6 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
@@ -1113,17 +1112,16 @@ public abstract class TestIcebergSourceTablesBase
extends SparkTestBase {
if (!spark.version().startsWith("2")) {
// Spark 2 isn't able to actually push down nested struct projections so
this will not break
- AssertHelpers.assertThrows(
- "Can't prune struct inside list",
- SparkException.class,
- "Cannot project a partial list element struct",
- () ->
- spark
- .read()
- .format("iceberg")
- .load(loadLocation(tableIdentifier, "manifests"))
- .select("partition_spec_id", "path",
"partition_summaries.contains_null")
- .collectAsList());
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier, "manifests"))
+ .select("partition_spec_id", "path",
"partition_summaries.contains_null")
+ .collectAsList())
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining("Cannot project a partial list element
struct");
}
Dataset<Row> actualDf =
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 82c9a58e33..1b96245019 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -32,7 +32,6 @@ import static
org.apache.iceberg.TableProperties.FORMAT_VERSION;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataTableType;
@@ -53,6 +52,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -667,11 +667,9 @@ public class TestMetadataTablesWithPartitionEvolution
extends SparkCatalogTestBa
sql("REFRESH TABLE %s", tableName);
for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES,
ENTRIES, ALL_ENTRIES)) {
- AssertHelpers.assertThrows(
- "Should complain about the partition type",
- ValidationException.class,
- "Cannot build table partition type, unknown transforms",
- () -> loadMetadataTable(tableType));
+ Assertions.assertThatThrownBy(() -> loadMetadataTable(tableType))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Cannot build table partition type, unknown transforms:
[zero]");
}
}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index ac481ca473..6c96a33a25 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -20,7 +20,6 @@ package org.apache.iceberg.spark.source;
import java.util.List;
import java.util.Map;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -29,6 +28,7 @@ import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
@@ -162,20 +162,17 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
// should fail if ordering is disabled
- AssertHelpers.assertThrowsCause(
- "Should reject writes without ordering",
- IllegalStateException.class,
- "Incoming records violate the writer assumption",
- () -> {
- try {
- inputDF
- .writeTo(tableName)
- .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
- .append();
- } catch (NoSuchTableException e) {
- throw new RuntimeException(e);
- }
- });
+ Assertions.assertThatThrownBy(
+ () ->
+ inputDF
+ .writeTo(tableName)
+
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .append())
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith(
+ "Incoming records violate the writer assumption that records are
clustered by spec "
+ + "and by partition within each spec. Either cluster the
incoming records or switch to fanout writers.");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
index 991719d616..d6a235674d 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManifestFile;
@@ -690,18 +689,17 @@ public class TestSparkDataWrite {
ManualSource.setTable(manualTableName, sparkTable);
// Although an exception is thrown here, write and commit have succeeded
- AssertHelpers.assertThrows(
- "Should throw a Commit State Unknown Exception",
- CommitStateUnknownException.class,
- "Datacenter on Fire",
- () ->
- df2.select("id", "data")
- .sort("data")
- .write()
- .format("org.apache.iceberg.spark.source.ManualSource")
- .option(ManualSource.TABLE_NAME, manualTableName)
- .mode(SaveMode.Append)
- .save(targetLocation));
+ Assertions.assertThatThrownBy(
+ () ->
+ df2.select("id", "data")
+ .sort("data")
+ .write()
+ .format("org.apache.iceberg.spark.source.ManualSource")
+ .option(ManualSource.TABLE_NAME, manualTableName)
+ .mode(SaveMode.Append)
+ .save(targetLocation))
+ .isInstanceOf(CommitStateUnknownException.class)
+ .hasMessageStartingWith("Datacenter on Fire");
// Since write and commit succeeded, the rows should be readable
Dataset<Row> result = spark.read().format("iceberg").load(targetLocation);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
index e399852285..1b46759590 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -29,7 +29,6 @@ import static org.apache.spark.sql.functions.lit;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.MetadataColumns;
@@ -52,6 +51,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -220,11 +220,9 @@ public class TestSparkMetadataColumns extends
SparkTestBase {
TableMetadata base = ops.current();
ops.commit(base, base.updatePartitionSpec(UNKNOWN_SPEC));
- AssertHelpers.assertThrows(
- "Should fail to query the partition metadata column",
- ValidationException.class,
- "Cannot build table partition type, unknown transforms",
- () -> sql("SELECT _partition FROM %s", TABLE_NAME));
+ Assertions.assertThatThrownBy(() -> sql("SELECT _partition FROM %s",
TABLE_NAME))
+ .isInstanceOf(ValidationException.class)
+ .hasMessage("Cannot build table partition type, unknown transforms:
[zero]");
}
@Test
@@ -242,11 +240,10 @@ public class TestSparkMetadataColumns extends
SparkTestBase {
ImmutableList.of(row(1L, "a1")),
sql("SELECT id, category FROM %s", TABLE_NAME));
- AssertHelpers.assertThrows(
- "Should fail to query conflicting columns",
- ValidationException.class,
- "column names conflict",
- () -> sql("SELECT * FROM %s", TABLE_NAME));
+ Assertions.assertThatThrownBy(() -> sql("SELECT * FROM %s", TABLE_NAME))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith(
+ "Table column names conflict with names reserved for Iceberg
metadata columns: [_spec_id, _file].");
table.refresh();
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index a7f18e1cb0..1c08744f56 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -29,7 +29,6 @@ import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
@@ -492,11 +491,10 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
StreamingQuery query = startStream();
- AssertHelpers.assertThrowsCause(
- "Streaming should fail with IllegalStateException, as the snapshot is
not of type APPEND",
- IllegalStateException.class,
- "Cannot process overwrite snapshot",
- () -> query.processAllAvailable());
+ Assertions.assertThatThrownBy(query::processAllAvailable)
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith("Cannot process overwrite snapshot");
}
@Test
@@ -533,11 +531,10 @@ public final class TestStructuredStreamingRead3 extends
SparkCatalogTestBase {
StreamingQuery query = startStream();
- AssertHelpers.assertThrowsCause(
- "Streaming should fail with IllegalStateException, as the snapshot is
not of type APPEND",
- IllegalStateException.class,
- "Cannot process delete snapshot",
- () -> query.processAllAvailable());
+ Assertions.assertThatThrownBy(query::processAllAvailable)
+ .cause()
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageStartingWith("Cannot process delete snapshot");
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
index 053f6dbaea..428e6b5a7f 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
@@ -28,7 +28,6 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
@@ -53,6 +52,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -159,21 +159,17 @@ public class TestTimestampWithoutZone extends
SparkTestBase {
@Test
public void testUnpartitionedTimestampWithoutZoneError() {
- AssertHelpers.assertThrows(
- String.format(
- "Read operation performed on a timestamp without timezone field
while "
- + "'%s' set to false should throw exception",
- SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE),
- IllegalArgumentException.class,
- SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR,
- () ->
- spark
- .read()
- .format("iceberg")
- .option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
- .option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE,
"false")
- .load(unpartitioned.toString())
- .collectAsList());
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+ .option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
+
.option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false")
+ .load(unpartitioned.toString())
+ .collectAsList())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
}
@Test
@@ -198,30 +194,21 @@ public class TestTimestampWithoutZone extends
SparkTestBase {
@Test
public void testUnpartitionedTimestampWithoutZoneWriteError() {
- String errorMessage =
- String.format(
- "Write operation performed on a timestamp without timezone field
while "
- + "'%s' set to false should throw exception",
- SparkSQLProperties.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
- Runnable writeOperation =
- () ->
- spark
- .read()
- .format("iceberg")
- .option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE,
"true")
- .option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
- .load(unpartitioned.toString())
- .write()
- .format("iceberg")
- .option(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE,
"false")
- .mode(SaveMode.Append)
- .save(unpartitioned.toString());
-
- AssertHelpers.assertThrows(
- errorMessage,
- IllegalArgumentException.class,
- SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR,
- writeOperation);
+ Assertions.assertThatThrownBy(
+ () ->
+ spark
+ .read()
+ .format("iceberg")
+
.option(SparkReadOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true")
+ .option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
+ .load(unpartitioned.toString())
+ .write()
+ .format("iceberg")
+
.option(SparkWriteOptions.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "false")
+ .mode(SaveMode.Append)
+ .save(unpartitioned.toString()))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR);
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
index 9bf00f1b13..1e2a825d8e 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
@@ -27,7 +27,6 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
@@ -49,6 +48,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
+import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -231,11 +231,11 @@ public class TestWriteMetricsConfig {
properties.put(TableProperties.DEFAULT_WRITE_METRICS_MODE, "counts");
properties.put("write.metadata.metrics.column.ids", "full");
- AssertHelpers.assertThrows(
- "Creating a table with invalid metrics should fail",
- ValidationException.class,
- null,
- () -> tables.create(SIMPLE_SCHEMA, spec, properties, tableLocation));
+ Assertions.assertThatThrownBy(
+ () -> tables.create(SIMPLE_SCHEMA, spec, properties,
tableLocation))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageStartingWith(
+ "Invalid metrics config, could not find column ids from table prop
write.metadata.metrics.column.ids in schema table");
}
@Test