This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new a4d47567e1 Core: Schema for a branch should return table schema (#9131)
a4d47567e1 is described below
commit a4d47567e1fef44f4443250537f09dc73a1f7583
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Dec 5 12:03:43 2023 +0100
Core: Schema for a branch should return table schema (#9131)
When retrieving the schema for branch we should always return the table
schema instead of the snapshot schema. This is because the table schema
is the schema that will be used when the branch will be created.
We should only return the schema of the snapshot when we have a tag.
---
.../java/org/apache/iceberg/util/SnapshotUtil.java | 40 ++++----
.../org/apache/iceberg/util/TestSnapshotUtil.java | 65 +++++++++++++
.../spark/source/TestSnapshotSelection.java | 106 +++++++++++++++++++--
.../spark/source/TestSnapshotSelection.java | 106 +++++++++++++++++++--
.../spark/source/TestSnapshotSelection.java | 106 +++++++++++++++++++--
5 files changed, 380 insertions(+), 43 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index 75d4493691..b0ec879bda 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -409,49 +409,51 @@ public class SnapshotUtil {
}
/**
- * Return the schema of the snapshot at a given branch.
+ * Return the schema of the snapshot at a given ref.
*
- * <p>If branch does not exist, the table schema is returned because it will
be the schema when
- * the new branch is created.
+ * <p>If the ref does not exist or the ref is a branch, the table schema is
returned because it
+ * will be the schema when the new branch is created. If the ref is a tag,
then the snapshot
+ * schema is returned.
*
* @param table a {@link Table}
- * @param branch branch name of the table (nullable)
- * @return schema of the specific snapshot at the given branch
+ * @param ref ref name of the table (nullable)
+ * @return schema of the specific snapshot at the given ref
*/
- public static Schema schemaFor(Table table, String branch) {
- if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ public static Schema schemaFor(Table table, String ref) {
+ if (ref == null || ref.equals(SnapshotRef.MAIN_BRANCH)) {
return table.schema();
}
- Snapshot ref = table.snapshot(branch);
- if (ref == null) {
+ SnapshotRef snapshotRef = table.refs().get(ref);
+ if (null == snapshotRef || snapshotRef.isBranch()) {
return table.schema();
}
- return schemaFor(table, ref.snapshotId());
+ return schemaFor(table, snapshotRef.snapshotId());
}
/**
- * Return the schema of the snapshot at a given branch.
+ * Return the schema of the snapshot at a given ref.
*
- * <p>If branch does not exist, the table schema is returned because it will
be the schema when
- * the new branch is created.
+ * <p>If the ref does not exist or the ref is a branch, the table schema is
returned because it
+ * will be the schema when the new branch is created. If the ref is a tag,
then the snapshot
+ * schema is returned.
*
* @param metadata a {@link TableMetadata}
- * @param branch branch name of the table (nullable)
+ * @param ref ref name of the table (nullable)
* @return schema of the specific snapshot at the given branch
*/
- public static Schema schemaFor(TableMetadata metadata, String branch) {
- if (branch == null || branch.equals(SnapshotRef.MAIN_BRANCH)) {
+ public static Schema schemaFor(TableMetadata metadata, String ref) {
+ if (ref == null || ref.equals(SnapshotRef.MAIN_BRANCH)) {
return metadata.schema();
}
- SnapshotRef ref = metadata.ref(branch);
- if (ref == null) {
+ SnapshotRef snapshotRef = metadata.ref(ref);
+ if (snapshotRef == null || snapshotRef.isBranch()) {
return metadata.schema();
}
- Snapshot snapshot = metadata.snapshot(ref.snapshotId());
+ Snapshot snapshot = metadata.snapshot(snapshotRef.snapshotId());
return metadata.schemas().get(snapshot.schemaId());
}
diff --git a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
index 576df47e7b..db6be5bcfe 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestSnapshotUtil.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.util;
+import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
@@ -187,4 +188,68 @@ public class TestSnapshotUtil {
.toArray();
assertThat(actualSnapshots).isEqualTo(snapshotIdExpected);
}
+
+ @Test
+ public void schemaForRef() {
+ Schema initialSchema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()));
+ assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct());
+
+ assertThat(SnapshotUtil.schemaFor(table,
null).asStruct()).isEqualTo(initialSchema.asStruct());
+ assertThat(SnapshotUtil.schemaFor(table, "non-existing-ref").asStruct())
+ .isEqualTo(initialSchema.asStruct());
+ assertThat(SnapshotUtil.schemaFor(table,
SnapshotRef.MAIN_BRANCH).asStruct())
+ .isEqualTo(initialSchema.asStruct());
+ }
+
+ @Test
+ public void schemaForBranch() {
+ Schema initialSchema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()));
+ assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct());
+
+ String branch = "branch";
+ table.manageSnapshots().createBranch(branch).commit();
+
+ assertThat(SnapshotUtil.schemaFor(table, branch).asStruct())
+ .isEqualTo(initialSchema.asStruct());
+
+ table.updateSchema().addColumn("zip", Types.IntegerType.get()).commit();
+ Schema expected =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "zip", Types.IntegerType.get()));
+
+ assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct());
+ assertThat(SnapshotUtil.schemaFor(table,
branch).asStruct()).isEqualTo(expected.asStruct());
+ }
+
+ @Test
+ public void schemaForTag() {
+ Schema initialSchema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()));
+ assertThat(table.schema().asStruct()).isEqualTo(initialSchema.asStruct());
+
+ String tag = "tag";
+ table.manageSnapshots().createTag(tag,
table.currentSnapshot().snapshotId()).commit();
+
+ assertThat(SnapshotUtil.schemaFor(table,
tag).asStruct()).isEqualTo(initialSchema.asStruct());
+
+ table.updateSchema().addColumn("zip", Types.IntegerType.get()).commit();
+ Schema expected =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()),
+ optional(3, "zip", Types.IntegerType.get()));
+
+ assertThat(table.schema().asStruct()).isEqualTo(expected.asStruct());
+ assertThat(SnapshotUtil.schemaFor(table,
tag).asStruct()).isEqualTo(initialSchema.asStruct());
+ }
}
diff --git
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index 276fbcd592..a161224275 100644
---
a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++
b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -31,10 +31,12 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
@@ -402,16 +404,104 @@ public class TestSnapshotSelection {
// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();
- // The data should have the deleted column as it was captured in an
earlier snapshot.
- Dataset<Row> deletedColumnBranchSnapshotResult =
+ // The data should not have the deleted column
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(RowFactory.create(1), RowFactory.create(2),
RowFactory.create(3));
+
+ // re-introducing the column should not let the data re-appear
+ table.updateSchema().addColumn("data", Types.StringType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList())
+ .containsExactly(
+ new SimpleRecord(1, null), new SimpleRecord(2, null), new
SimpleRecord(3, null));
+ }
+
+ @Test
+ public void testWritingToBranchAfterSchemaChange() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new
SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch",
table.currentSnapshot().snapshotId()).commit();
+
+ Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch",
"branch").load(tableLocation);
- List<SimpleRecord> deletedColumnBranchSnapshotRecords =
- deletedColumnBranchSnapshotResult
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ List<SimpleRecord> branchSnapshotRecords =
+
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
- "Current snapshot rows should match", expectedRecords,
deletedColumnBranchSnapshotRecords);
+ "Current snapshot rows should match", expectedRecords,
branchSnapshotRecords);
+
+ // Deleting and add a new column of the same type to indicate schema change
+ table.updateSchema().deleteColumn("data").addColumn("zip",
Types.IntegerType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null));
+
+ // writing new records into the branch should work with the new column
+ List<Row> records =
+ Lists.newArrayList(
+ RowFactory.create(4, 12345), RowFactory.create(5, 54321),
RowFactory.create(6, 67890));
+
+ Dataset<Row> dataFrame =
+ spark.createDataFrame(
+ records,
+ SparkSchemaUtil.convert(
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "zip", Types.IntegerType.get()))));
+ dataFrame
+ .select("id", "zip")
+ .write()
+ .format("iceberg")
+ .option("branch", "branch")
+ .mode("append")
+ .save(tableLocation);
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .collectAsList())
+ .hasSize(6)
+ .contains(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null))
+ .containsAll(records);
}
@Test
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index f1374c050d..645afd4542 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -37,10 +37,12 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
@@ -425,16 +427,104 @@ public class TestSnapshotSelection {
// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();
- // The data should have the deleted column as it was captured in an
earlier snapshot.
- Dataset<Row> deletedColumnBranchSnapshotResult =
+ // The data should not have the deleted column
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(RowFactory.create(1), RowFactory.create(2),
RowFactory.create(3));
+
+ // re-introducing the column should not let the data re-appear
+ table.updateSchema().addColumn("data", Types.StringType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList())
+ .containsExactly(
+ new SimpleRecord(1, null), new SimpleRecord(2, null), new
SimpleRecord(3, null));
+ }
+
+ @Test
+ public void testWritingToBranchAfterSchemaChange() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new
SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch",
table.currentSnapshot().snapshotId()).commit();
+
+ Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch",
"branch").load(tableLocation);
- List<SimpleRecord> deletedColumnBranchSnapshotRecords =
- deletedColumnBranchSnapshotResult
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ List<SimpleRecord> branchSnapshotRecords =
+
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
- "Current snapshot rows should match", expectedRecords,
deletedColumnBranchSnapshotRecords);
+ "Current snapshot rows should match", expectedRecords,
branchSnapshotRecords);
+
+ // Deleting and add a new column of the same type to indicate schema change
+ table.updateSchema().deleteColumn("data").addColumn("zip",
Types.IntegerType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null));
+
+ // writing new records into the branch should work with the new column
+ List<Row> records =
+ Lists.newArrayList(
+ RowFactory.create(4, 12345), RowFactory.create(5, 54321),
RowFactory.create(6, 67890));
+
+ Dataset<Row> dataFrame =
+ spark.createDataFrame(
+ records,
+ SparkSchemaUtil.convert(
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "zip", Types.IntegerType.get()))));
+ dataFrame
+ .select("id", "zip")
+ .write()
+ .format("iceberg")
+ .option("branch", "branch")
+ .mode("append")
+ .save(tableLocation);
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .collectAsList())
+ .hasSize(6)
+ .contains(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null))
+ .containsAll(records);
}
@Test
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
index f1374c050d..645afd4542 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
@@ -37,10 +37,12 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
@@ -425,16 +427,104 @@ public class TestSnapshotSelection {
// Deleting a column to indicate schema change
table.updateSchema().deleteColumn("data").commit();
- // The data should have the deleted column as it was captured in an
earlier snapshot.
- Dataset<Row> deletedColumnBranchSnapshotResult =
+ // The data should not have the deleted column
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(RowFactory.create(1), RowFactory.create(2),
RowFactory.create(3));
+
+ // re-introducing the column should not let the data re-appear
+ table.updateSchema().addColumn("data", Types.StringType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList())
+ .containsExactly(
+ new SimpleRecord(1, null), new SimpleRecord(2, null), new
SimpleRecord(3, null));
+ }
+
+ @Test
+ public void testWritingToBranchAfterSchemaChange() throws IOException {
+ String tableLocation = temp.newFolder("iceberg-table").toString();
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Table table = tables.create(SCHEMA, spec, properties, tableLocation);
+
+ // produce the first snapshot
+ List<SimpleRecord> firstBatchRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new
SimpleRecord(3, "c"));
+ Dataset<Row> firstDf = spark.createDataFrame(firstBatchRecords,
SimpleRecord.class);
+ firstDf.select("id",
"data").write().format("iceberg").mode("append").save(tableLocation);
+
+ table.manageSnapshots().createBranch("branch",
table.currentSnapshot().snapshotId()).commit();
+
+ Dataset<Row> branchSnapshotResult =
spark.read().format("iceberg").option("branch",
"branch").load(tableLocation);
- List<SimpleRecord> deletedColumnBranchSnapshotRecords =
- deletedColumnBranchSnapshotResult
- .orderBy("id")
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
+ List<SimpleRecord> branchSnapshotRecords =
+
branchSnapshotResult.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ List<SimpleRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(firstBatchRecords);
Assert.assertEquals(
- "Current snapshot rows should match", expectedRecords,
deletedColumnBranchSnapshotRecords);
+ "Current snapshot rows should match", expectedRecords,
branchSnapshotRecords);
+
+ // Deleting and add a new column of the same type to indicate schema change
+ table.updateSchema().deleteColumn("data").addColumn("zip",
Types.IntegerType.get()).commit();
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .orderBy("id")
+ .collectAsList())
+ .containsExactly(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null));
+
+ // writing new records into the branch should work with the new column
+ List<Row> records =
+ Lists.newArrayList(
+ RowFactory.create(4, 12345), RowFactory.create(5, 54321),
RowFactory.create(6, 67890));
+
+ Dataset<Row> dataFrame =
+ spark.createDataFrame(
+ records,
+ SparkSchemaUtil.convert(
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "zip", Types.IntegerType.get()))));
+ dataFrame
+ .select("id", "zip")
+ .write()
+ .format("iceberg")
+ .option("branch", "branch")
+ .mode("append")
+ .save(tableLocation);
+
+ Assertions.assertThat(
+ spark
+ .read()
+ .format("iceberg")
+ .option("branch", "branch")
+ .load(tableLocation)
+ .collectAsList())
+ .hasSize(6)
+ .contains(
+ RowFactory.create(1, null), RowFactory.create(2, null),
RowFactory.create(3, null))
+ .containsAll(records);
}
@Test