This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b20d30ca04 Spark: Create base classes for migration to JUnit5 (#9129)
b20d30ca04 is described below
commit b20d30ca04ddc19e440036b0c554f0b16ec61a1c
Author: Tom Tanaka <[email protected]>
AuthorDate: Fri Nov 24 20:30:04 2023 +0900
Spark: Create base classes for migration to JUnit5 (#9129)
---
.../org/apache/iceberg/spark/CatalogTestBase.java | 51 ++++
.../apache/iceberg/spark/SparkTestHelperBase.java | 19 +-
.../java/org/apache/iceberg/spark/TestBase.java | 287 +++++++++++++++++++++
.../apache/iceberg/spark/TestBaseWithCatalog.java | 131 ++++++++++
.../spark/actions/TestSparkFileRewriter.java | 109 ++++----
5 files changed, 535 insertions(+), 62 deletions(-)
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
new file mode 100644
index 0000000000..dbb839eacc
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/CatalogTestBase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.junit.jupiter.params.provider.Arguments;
+
+public abstract class CatalogTestBase extends TestBaseWithCatalog {
+
+ // these parameters are broken out to avoid changes that need to modify lots
of test suites
+ public static Stream<Arguments> parameters() {
+ return Stream.of(
+ Arguments.of(
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties()),
+ Arguments.of(
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties()),
+ Arguments.of(
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties()));
+ }
+
+ public CatalogTestBase(SparkCatalogConfig config) {
+ super(config);
+ }
+
+ public CatalogTestBase(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
index 97484702ca..6fa8da8411 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestHelperBase.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.spark.sql.Row;
-import org.junit.Assert;
+import org.assertj.core.api.Assertions;
public class SparkTestHelperBase {
protected static final Object ANY = new Object();
@@ -55,12 +55,13 @@ public class SparkTestHelperBase {
protected void assertEquals(
String context, List<Object[]> expectedRows, List<Object[]> actualRows) {
- Assert.assertEquals(
- context + ": number of results should match", expectedRows.size(),
actualRows.size());
+ Assertions.assertThat(actualRows)
+ .as(context + ": number of results should match")
+ .hasSameSizeAs(expectedRows);
for (int row = 0; row < expectedRows.size(); row += 1) {
Object[] expected = expectedRows.get(row);
Object[] actual = actualRows.get(row);
- Assert.assertEquals("Number of columns should match", expected.length,
actual.length);
+ Assertions.assertThat(actual).as("Number of columns should
match").hasSameSizeAs(expected);
for (int col = 0; col < actualRows.get(row).length; col += 1) {
String newContext = String.format("%s: row %d col %d", context, row +
1, col + 1);
assertEquals(newContext, expected, actual);
@@ -69,19 +70,23 @@ public class SparkTestHelperBase {
}
protected void assertEquals(String context, Object[] expectedRow, Object[]
actualRow) {
- Assert.assertEquals("Number of columns should match", expectedRow.length,
actualRow.length);
+ Assertions.assertThat(actualRow)
+ .as("Number of columns should match")
+ .hasSameSizeAs(expectedRow);
for (int col = 0; col < actualRow.length; col += 1) {
Object expectedValue = expectedRow[col];
Object actualValue = actualRow[col];
if (expectedValue != null && expectedValue.getClass().isArray()) {
String newContext = String.format("%s (nested col %d)", context, col +
1);
if (expectedValue instanceof byte[]) {
- Assert.assertArrayEquals(newContext, (byte[]) expectedValue,
(byte[]) actualValue);
+
Assertions.assertThat(actualValue).as(newContext).isEqualTo(expectedValue);
} else {
assertEquals(newContext, (Object[]) expectedValue, (Object[])
actualValue);
}
} else if (expectedValue != ANY) {
- Assert.assertEquals(context + " contents should match", expectedValue,
actualValue);
+ Assertions.assertThat(actualValue)
+ .as(context + " contents should match")
+ .isEqualTo(expectedValue);
}
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
new file mode 100644
index 0000000000..a456fdcf44
--- /dev/null
+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.QueryExecutionListener;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+public abstract class TestBase extends SparkTestHelperBase {
+
+ protected static TestHiveMetastore metastore = null;
+ protected static HiveConf hiveConf = null;
+ protected static SparkSession spark = null;
+ protected static JavaSparkContext sparkContext = null;
+ protected static HiveCatalog catalog = null;
+
+ @BeforeAll
+ public static void startMetastoreAndSpark() {
+ TestBase.metastore = new TestHiveMetastore();
+ metastore.start();
+ TestBase.hiveConf = metastore.hiveConf();
+
+ TestBase.spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic")
+ .config("spark.hadoop." + METASTOREURIS.varname,
hiveConf.get(METASTOREURIS.varname))
+
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+ .enableHiveSupport()
+ .getOrCreate();
+
+ TestBase.sparkContext =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+ TestBase.catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+
+ try {
+ catalog.createNamespace(Namespace.of("default"));
+ } catch (AlreadyExistsException ignored) {
+ // the default namespace already exists. ignore the create error
+ }
+ }
+
+ @AfterAll
+ public static void stopMetastoreAndSpark() throws Exception {
+ TestBase.catalog = null;
+ if (metastore != null) {
+ metastore.stop();
+ TestBase.metastore = null;
+ }
+ if (spark != null) {
+ spark.stop();
+ TestBase.spark = null;
+ TestBase.sparkContext = null;
+ }
+ }
+
+ protected long waitUntilAfter(long timestampMillis) {
+ long current = System.currentTimeMillis();
+ while (current <= timestampMillis) {
+ current = System.currentTimeMillis();
+ }
+ return current;
+ }
+
+ protected List<Object[]> sql(String query, Object... args) {
+ List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
+ if (rows.size() < 1) {
+ return ImmutableList.of();
+ }
+
+ return rowsToJava(rows);
+ }
+
+ protected Object scalarSql(String query, Object... args) {
+ List<Object[]> rows = sql(query, args);
+ Assertions.assertThat(rows.size()).as("Scalar SQL should return one
row").isEqualTo(1);
+ Object[] row = Iterables.getOnlyElement(rows);
+ Assertions.assertThat(row.length).as("Scalar SQL should return one
value").isEqualTo(1);
+ return row[0];
+ }
+
+ protected Object[] row(Object... values) {
+ return values;
+ }
+
+ protected static String dbPath(String dbName) {
+ return metastore.getDatabasePath(dbName);
+ }
+
+ protected void withUnavailableFiles(Iterable<? extends ContentFile<?>>
files, Action action) {
+ Iterable<String> fileLocations = Iterables.transform(files, file ->
file.path().toString());
+ withUnavailableLocations(fileLocations, action);
+ }
+
+ private void move(String location, String newLocation) {
+ Path path = Paths.get(URI.create(location));
+ Path tempPath = Paths.get(URI.create(newLocation));
+
+ try {
+ Files.move(path, tempPath);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to move: " + location, e);
+ }
+ }
+
+ protected void withUnavailableLocations(Iterable<String> locations, Action
action) {
+ for (String location : locations) {
+ move(location, location + "_temp");
+ }
+
+ try {
+ action.invoke();
+ } finally {
+ for (String location : locations) {
+ move(location + "_temp", location);
+ }
+ }
+ }
+
+ protected void withDefaultTimeZone(String zoneId, Action action) {
+ TimeZone currentZone = TimeZone.getDefault();
+ try {
+ TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
+ action.invoke();
+ } finally {
+ TimeZone.setDefault(currentZone);
+ }
+ }
+
+ protected void withSQLConf(Map<String, String> conf, Action action) {
+ SQLConf sqlConf = SQLConf.get();
+
+ Map<String, String> currentConfValues = Maps.newHashMap();
+ conf.keySet()
+ .forEach(
+ confKey -> {
+ if (sqlConf.contains(confKey)) {
+ String currentConfValue = sqlConf.getConfString(confKey);
+ currentConfValues.put(confKey, currentConfValue);
+ }
+ });
+
+ conf.forEach(
+ (confKey, confValue) -> {
+ if (SQLConf.isStaticConfigKey(confKey)) {
+ throw new RuntimeException("Cannot modify the value of a static
config: " + confKey);
+ }
+ sqlConf.setConfString(confKey, confValue);
+ });
+
+ try {
+ action.invoke();
+ } finally {
+ conf.forEach(
+ (confKey, confValue) -> {
+ if (currentConfValues.containsKey(confKey)) {
+ sqlConf.setConfString(confKey, currentConfValues.get(confKey));
+ } else {
+ sqlConf.unsetConf(confKey);
+ }
+ });
+ }
+ }
+
+ protected Dataset<Row> jsonToDF(String schema, String... records) {
+ Dataset<String> jsonDF =
spark.createDataset(ImmutableList.copyOf(records), Encoders.STRING());
+ return spark.read().schema(schema).json(jsonDF);
+ }
+
+ protected void append(String table, String... jsonRecords) {
+ try {
+ String schema = spark.table(table).schema().toDDL();
+ Dataset<Row> df = jsonToDF(schema, jsonRecords);
+ df.coalesce(1).writeTo(table).append();
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException("Failed to write data", e);
+ }
+ }
+
+ protected String tablePropsAsString(Map<String, String> tableProps) {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ for (Map.Entry<String, String> property : tableProps.entrySet()) {
+ if (stringBuilder.length() > 0) {
+ stringBuilder.append(", ");
+ }
+ stringBuilder.append(String.format("'%s' '%s'", property.getKey(),
property.getValue()));
+ }
+
+ return stringBuilder.toString();
+ }
+
+ protected SparkPlan executeAndKeepPlan(String query, Object... args) {
+ return executeAndKeepPlan(() -> sql(query, args));
+ }
+
+ protected SparkPlan executeAndKeepPlan(Action action) {
+ AtomicReference<SparkPlan> executedPlanRef = new AtomicReference<>();
+
+ QueryExecutionListener listener =
+ new QueryExecutionListener() {
+ @Override
+ public void onSuccess(String funcName, QueryExecution qe, long
durationNs) {
+ executedPlanRef.set(qe.executedPlan());
+ }
+
+ @Override
+ public void onFailure(String funcName, QueryExecution qe, Exception
exception) {}
+ };
+
+ spark.listenerManager().register(listener);
+
+ action.invoke();
+
+ try {
+ spark.sparkContext().listenerBus().waitUntilEmpty();
+ } catch (TimeoutException e) {
+ throw new RuntimeException("Timeout while waiting for processing
events", e);
+ }
+
+ SparkPlan executedPlan = executedPlanRef.get();
+ if (executedPlan instanceof AdaptiveSparkPlanExec) {
+ return ((AdaptiveSparkPlanExec) executedPlan).executedPlan();
+ } else {
+ return executedPlan;
+ }
+ }
+
+ @FunctionalInterface
+ protected interface Action {
+ void invoke();
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
new file mode 100644
index 0000000000..83767039bb
--- /dev/null
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBaseWithCatalog.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.util.PropertyUtil;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+public abstract class TestBaseWithCatalog extends TestBase {
+ protected static File warehouse = null;
+
+ @BeforeAll
+ public static void createWarehouse() throws IOException {
+ TestBaseWithCatalog.warehouse = File.createTempFile("warehouse", null);
+ Assertions.assertThat(warehouse.delete()).isTrue();
+ }
+
+ @AfterAll
+ public static void dropWarehouse() throws IOException {
+ if (warehouse != null && warehouse.exists()) {
+ Path warehousePath = new Path(warehouse.getAbsolutePath());
+ FileSystem fs = warehousePath.getFileSystem(hiveConf);
+ Assertions.assertThat(fs.delete(warehousePath, true))
+ .as("Failed to delete " + warehousePath)
+ .isTrue();
+ }
+ }
+
+ @TempDir protected File temp;
+
+ protected final String catalogName;
+ protected final Map<String, String> catalogConfig;
+ protected final Catalog validationCatalog;
+ protected final SupportsNamespaces validationNamespaceCatalog;
+ protected final TableIdentifier tableIdent =
TableIdentifier.of(Namespace.of("default"), "table");
+ protected final String tableName;
+
+ public TestBaseWithCatalog() {
+ this(SparkCatalogConfig.HADOOP);
+ }
+
+ public TestBaseWithCatalog(SparkCatalogConfig config) {
+ this(config.catalogName(), config.implementation(), config.properties());
+ }
+
+ public TestBaseWithCatalog(
+ String catalogName, String implementation, Map<String, String> config) {
+ this.catalogName = catalogName;
+ this.catalogConfig = config;
+ this.validationCatalog =
+ catalogName.equals("testhadoop")
+ ? new HadoopCatalog(spark.sessionState().newHadoopConf(), "file:"
+ warehouse)
+ : catalog;
+ this.validationNamespaceCatalog = (SupportsNamespaces) validationCatalog;
+
+ spark.conf().set("spark.sql.catalog." + catalogName, implementation);
+ config.forEach(
+ (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName +
"." + key, value));
+
+ if (config.get("type").equalsIgnoreCase("hadoop")) {
+ spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse",
"file:" + warehouse);
+ }
+
+ this.tableName =
+ (catalogName.equals("spark_catalog") ? "" : catalogName + ".") +
"default.table";
+
+ sql("CREATE NAMESPACE IF NOT EXISTS default");
+ }
+
+ protected String tableName(String name) {
+ return (catalogName.equals("spark_catalog") ? "" : catalogName + ".") +
"default." + name;
+ }
+
+ protected String commitTarget() {
+ return tableName;
+ }
+
+ protected String selectTarget() {
+ return tableName;
+ }
+
+ protected boolean cachingCatalogEnabled() {
+ return PropertyUtil.propertyAsBoolean(
+ catalogConfig, CatalogProperties.CACHE_ENABLED,
CatalogProperties.CACHE_ENABLED_DEFAULT);
+ }
+
+ protected void configurePlanningMode(PlanningMode planningMode) {
+ configurePlanningMode(tableName, planningMode);
+ }
+
+ protected void configurePlanningMode(String table, PlanningMode
planningMode) {
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' '%s', '%s' '%s')",
+ table,
+ TableProperties.DATA_PLANNING_MODE,
+ planningMode.modeName(),
+ TableProperties.DELETE_PLANNING_MODE,
+ planningMode.modeName());
+ }
+}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
index 055e5be681..0da6bdb362 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
@@ -33,16 +33,15 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.StringType;
import org.assertj.core.api.Assertions;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
-public class TestSparkFileRewriter extends SparkTestBase {
+public class TestSparkFileRewriter extends TestBase {
private static final TableIdentifier TABLE_IDENT =
TableIdentifier.of("default", "tbl");
private static final Schema SCHEMA =
@@ -53,7 +52,7 @@ public class TestSparkFileRewriter extends SparkTestBase {
PartitionSpec.builderFor(SCHEMA).identity("dep").build();
private static final SortOrder SORT_ORDER =
SortOrder.builderFor(SCHEMA).asc("id").build();
- @After
+ @AfterEach
public void removeTable() {
catalog.dropTable(TABLE_IDENT);
}
@@ -110,9 +109,9 @@ public class TestSparkFileRewriter extends SparkTestBase {
rewriter.init(options);
Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
- Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+ Assertions.assertThat(groups).as("Must have 1 group").hasSize(1);
List<FileScanTask> group = Iterables.getOnlyElement(groups);
- Assert.assertEquals("Must rewrite 2 files", 2, group.size());
+ Assertions.assertThat(group).as("Must rewrite 2 files").hasSize(2);
}
private void checkDataFilesDeleteThreshold(SizeBasedDataRewriter rewriter) {
@@ -129,9 +128,9 @@ public class TestSparkFileRewriter extends SparkTestBase {
rewriter.init(options);
Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
- Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+ Assertions.assertThat(groups).as("Must have 1 group").hasSize(1);
List<FileScanTask> group = Iterables.getOnlyElement(groups);
- Assert.assertEquals("Must rewrite 1 file", 1, group.size());
+ Assertions.assertThat(group).as("Must rewrite 1 file").hasSize(1);
}
private void checkDataFileGroupWithEnoughFiles(SizeBasedDataRewriter
rewriter) {
@@ -152,9 +151,9 @@ public class TestSparkFileRewriter extends SparkTestBase {
rewriter.init(options);
Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
- Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+ Assertions.assertThat(groups).as("Must have 1 group").hasSize(1);
List<FileScanTask> group = Iterables.getOnlyElement(groups);
- Assert.assertEquals("Must rewrite 4 files", 4, group.size());
+ Assertions.assertThat(group).as("Must rewrite 4 files").hasSize(4);
}
private void checkDataFileGroupWithEnoughData(SizeBasedDataRewriter
rewriter) {
@@ -172,9 +171,9 @@ public class TestSparkFileRewriter extends SparkTestBase {
rewriter.init(options);
Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
- Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+ Assertions.assertThat(groups).as("Must have 1 group").hasSize(1);
List<FileScanTask> group = Iterables.getOnlyElement(groups);
- Assert.assertEquals("Must rewrite 3 files", 3, group.size());
+ Assertions.assertThat(group).as("Must rewrite 3 files").hasSize(3);
}
private void checkDataFileGroupWithTooMuchData(SizeBasedDataRewriter
rewriter) {
@@ -190,9 +189,9 @@ public class TestSparkFileRewriter extends SparkTestBase {
rewriter.init(options);
Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
- Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+ Assertions.assertThat(groups).as("Must have 1 group").hasSize(1);
List<FileScanTask> group = Iterables.getOnlyElement(groups);
- Assert.assertEquals("Must rewrite big file", 1, group.size());
+ Assertions.assertThat(group).as("Must rewrite big file").hasSize(1);
}
@Test
@@ -240,17 +239,17 @@ public class TestSparkFileRewriter extends SparkTestBase {
Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark,
table);
- Assert.assertEquals(
- "Rewriter must report all supported options",
- ImmutableSet.of(
- SparkBinPackDataRewriter.TARGET_FILE_SIZE_BYTES,
- SparkBinPackDataRewriter.MIN_FILE_SIZE_BYTES,
- SparkBinPackDataRewriter.MAX_FILE_SIZE_BYTES,
- SparkBinPackDataRewriter.MIN_INPUT_FILES,
- SparkBinPackDataRewriter.REWRITE_ALL,
- SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
- SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD),
- rewriter.validOptions());
+ Assertions.assertThat(rewriter.validOptions())
+ .as("Rewriter must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ SparkBinPackDataRewriter.TARGET_FILE_SIZE_BYTES,
+ SparkBinPackDataRewriter.MIN_FILE_SIZE_BYTES,
+ SparkBinPackDataRewriter.MAX_FILE_SIZE_BYTES,
+ SparkBinPackDataRewriter.MIN_INPUT_FILES,
+ SparkBinPackDataRewriter.REWRITE_ALL,
+ SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+ SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD));
}
@Test
@@ -258,19 +257,19 @@ public class TestSparkFileRewriter extends SparkTestBase {
Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table,
SORT_ORDER);
- Assert.assertEquals(
- "Rewriter must report all supported options",
- ImmutableSet.of(
- SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
- SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
- SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
- SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
- SparkSortDataRewriter.MIN_INPUT_FILES,
- SparkSortDataRewriter.REWRITE_ALL,
- SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
- SparkSortDataRewriter.DELETE_FILE_THRESHOLD,
- SparkSortDataRewriter.COMPRESSION_FACTOR),
- rewriter.validOptions());
+ Assertions.assertThat(rewriter.validOptions())
+ .as("Rewriter must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ SparkSortDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
+ SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
+ SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
+ SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
+ SparkSortDataRewriter.MIN_INPUT_FILES,
+ SparkSortDataRewriter.REWRITE_ALL,
+ SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+ SparkSortDataRewriter.DELETE_FILE_THRESHOLD,
+ SparkSortDataRewriter.COMPRESSION_FACTOR));
}
@Test
@@ -279,21 +278,21 @@ public class TestSparkFileRewriter extends SparkTestBase {
ImmutableList<String> zOrderCols = ImmutableList.of("id");
SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark,
table, zOrderCols);
- Assert.assertEquals(
- "Rewriter must report all supported options",
- ImmutableSet.of(
- SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
- SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
- SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
- SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,
- SparkZOrderDataRewriter.MIN_INPUT_FILES,
- SparkZOrderDataRewriter.REWRITE_ALL,
- SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
- SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD,
- SparkZOrderDataRewriter.COMPRESSION_FACTOR,
- SparkZOrderDataRewriter.MAX_OUTPUT_SIZE,
- SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION),
- rewriter.validOptions());
+ Assertions.assertThat(rewriter.validOptions())
+ .as("Rewriter must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ SparkZOrderDataRewriter.SHUFFLE_PARTITIONS_PER_FILE,
+ SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
+ SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
+ SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,
+ SparkZOrderDataRewriter.MIN_INPUT_FILES,
+ SparkZOrderDataRewriter.REWRITE_ALL,
+ SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+ SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD,
+ SparkZOrderDataRewriter.COMPRESSION_FACTOR,
+ SparkZOrderDataRewriter.MAX_OUTPUT_SIZE,
+ SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION));
}
@Test