This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 231f1f7ee [lake/iceberg] Add IT for Iceberg files compaction for
primary key table (#1658)
231f1f7ee is described below
commit 231f1f7ee93c6aa915de6676ca8642063c77f558
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Sep 9 19:07:12 2025 +0800
[lake/iceberg] Add IT for Iceberg files compaction for primary key table
(#1658)
---------
Co-authored-by: maxcwang <[email protected]>
---
.../maintenance/IcebergRewriteDataFiles.java | 20 ++-
.../iceberg/maintenance/RewriteDataFileResult.java | 13 +-
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 20 ++-
.../iceberg/maintenance/IcebergRewriteITCase.java | 150 +++++++++++++++++++--
.../testutils/FlinkIcebergTieringTestBase.java | 133 ++++--------------
.../lake/iceberg/tiering/IcebergTieringITCase.java | 86 +++++++++++-
.../tiering/IcebergWriteResultSerializerTest.java | 4 +-
7 files changed, 293 insertions(+), 133 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
index 742a808a3..4f93bf983 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.IcebergGenericReader;
import org.apache.iceberg.data.Record;
@@ -82,10 +83,15 @@ public class IcebergRewriteDataFiles {
return this;
}
- private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
+ private List<CombinedScanTask> planRewriteFileGroups(long snapshotId)
throws IOException {
List<FileScanTask> fileScanTasks = new ArrayList<>();
try (CloseableIterable<FileScanTask> tasks =
-
table.newScan().includeColumnStats().filter(filter).ignoreResiduals().planFiles())
{
+ table.newScan()
+ .useSnapshot(snapshotId)
+ .includeColumnStats()
+ .filter(filter)
+ .ignoreResiduals()
+ .planFiles()) {
tasks.forEach(fileScanTasks::add);
}
@@ -137,7 +143,12 @@ public class IcebergRewriteDataFiles {
public RewriteDataFileResult execute() {
try {
// plan the file groups to be rewrite
- List<CombinedScanTask> tasksToRewrite = planRewriteFileGroups();
+ Snapshot snapshot = table.currentSnapshot();
+ // if no snapshot, just return
+ if (snapshot == null) {
+ return null;
+ }
+ List<CombinedScanTask> tasksToRewrite =
planRewriteFileGroups(snapshot.snapshotId());
if (tasksToRewrite.isEmpty()) {
return null;
}
@@ -152,7 +163,8 @@ public class IcebergRewriteDataFiles {
.collect(Collectors.toList()));
}
LOG.info("Finish rewriting files from {} to {}.",
deletedDataFiles, addedDataFiles);
- return new RewriteDataFileResult(deletedDataFiles, addedDataFiles);
+ return new RewriteDataFileResult(
+ snapshot.snapshotId(), deletedDataFiles, addedDataFiles);
} catch (Exception e) {
throw new RuntimeException(
String.format("Fail to compact bucket %s of table %s.",
bucket, table.name()),
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
index e24580cab..79e0dfe21 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
@@ -28,10 +28,13 @@ public class RewriteDataFileResult implements Serializable {
private static final long serialVersionUID = 1L;
+ private final long snapshotId;
private final List<DataFile> deletedDataFiles;
private final List<DataFile> addedDataFiles;
- public RewriteDataFileResult(List<DataFile> deletedDataFiles,
List<DataFile> addedDataFiles) {
+ public RewriteDataFileResult(
+ long snapshotId, List<DataFile> deletedDataFiles, List<DataFile>
addedDataFiles) {
+ this.snapshotId = snapshotId;
this.deletedDataFiles = deletedDataFiles;
this.addedDataFiles = addedDataFiles;
}
@@ -44,10 +47,16 @@ public class RewriteDataFileResult implements Serializable {
return addedDataFiles;
}
+ public long snapshotId() {
+ return snapshotId;
+ }
+
@Override
public String toString() {
return "RewriteDataFileResult{"
- + "deletedDataFiles="
+ + "snapshotId="
+ + snapshotId
+ + ", deletedDataFiles="
+ deletedDataFiles
+ ", addedDataFiles="
+ addedDataFiles
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 604f6cc6b..80b22e4d9 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -28,6 +28,7 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.RewriteFiles;
@@ -162,11 +163,20 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
Map<String, String> snapshotProperties) {
icebergTable.refresh();
RewriteFiles rewriteFiles = icebergTable.newRewrite();
- for (RewriteDataFileResult rewriteDataFileResult :
rewriteDataFileResults) {
-
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
-
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
- }
try {
+ if (rewriteDataFileResults.stream()
+ .map(RewriteDataFileResult::snapshotId)
+ .distinct()
+ .count()
+ > 1) {
+ throw new IllegalArgumentException(
+ "Rewrite data file results must have same snapshot
id.");
+ }
+
rewriteFiles.validateFromSnapshot(rewriteDataFileResults.get(0).snapshotId());
+ for (RewriteDataFileResult rewriteDataFileResult :
rewriteDataFileResults) {
+
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+ }
return commit(rewriteFiles, snapshotProperties);
} catch (Exception e) {
List<String> rewriteAddedDataFiles =
@@ -174,7 +184,7 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
.flatMap(
rewriteDataFileResult ->
rewriteDataFileResult.addedDataFiles().stream())
- .map(dataFile -> dataFile.path().toString())
+ .map(ContentFile::location)
.collect(Collectors.toList());
LOG.error(
"Failed to commit rewrite files to iceberg, delete rewrite
added files {}.",
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
index a1a28b2d0..4b6d9de71 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java
@@ -19,20 +19,27 @@
package org.apache.fluss.lake.iceberg.maintenance;
import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.data.Record;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
/** Integration test for Iceberg compaction. */
class IcebergRewriteITCase extends FlinkIcebergTieringTestBase {
@@ -40,6 +47,19 @@ class IcebergRewriteITCase extends
FlinkIcebergTieringTestBase {
private static StreamExecutionEnvironment execEnv;
+ private static final Schema pkSchema =
+ Schema.newBuilder()
+ .column("f_int", DataTypes.INT())
+ .column("f_string", DataTypes.STRING())
+ .primaryKey("f_int")
+ .build();
+
+ private static final Schema logSchema =
+ Schema.newBuilder()
+ .column("f_int", DataTypes.INT())
+ .column("f_string", DataTypes.STRING())
+ .build();
+
@BeforeAll
protected static void beforeAll() {
FlinkIcebergTieringTestBase.beforeAll();
@@ -48,42 +68,148 @@ class IcebergRewriteITCase extends
FlinkIcebergTieringTestBase {
execEnv.enableCheckpointing(1000);
}
+ @Test
+ void testPkTableCompaction() throws Exception {
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_1");
+ long t1Id = createPkTable(t1, 1, true, pkSchema);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+
+ List<InternalRow> rows = Collections.singletonList(row(1, "v1"));
+ writeIcebergTableRecords(t1, t1Bucket, 1, false, rows);
+ flussRows.addAll(rows);
+
+ rows = Collections.singletonList(row(2, "v1"));
+ writeIcebergTableRecords(t1, t1Bucket, 2, false, rows);
+ flussRows.addAll(rows);
+
+ // add pos-delete
+ rows = Arrays.asList(row(3, "v1"), row(3, "v2"));
+ writeIcebergTableRecords(t1, t1Bucket, 5, false, rows);
+ // one UPDATE_BEFORE and one UPDATE_AFTER
+ checkFileStatusInIcebergTable(t1, 3, true);
+ flussRows.add(rows.get(1));
+
+ // trigger compaction
+ rows = Collections.singletonList(row(4, "v1"));
+ writeIcebergTableRecords(t1, t1Bucket, 6, false, rows);
+ checkFileStatusInIcebergTable(t1, 2, false);
+ flussRows.addAll(rows);
+
+ checkRecords(getIcebergRecords(t1), flussRows);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ private void checkRecords(List<Record> actualRows, List<InternalRow>
expectedRows) {
+ // check records size
+ assertThat(actualRows.size()).isEqualTo(expectedRows.size());
+
+ // check records content
+ Iterator<Record> actualIterator =
+ actualRows.stream()
+ .sorted(Comparator.comparingInt((Record r) -> (int)
r.get(0)))
+ .iterator();
+ Iterator<InternalRow> expectedIterator =
+ expectedRows.stream().sorted(Comparator.comparingInt(r ->
r.getInt(0))).iterator();
+ while (actualIterator.hasNext() && expectedIterator.hasNext()) {
+ Record record = actualIterator.next();
+ InternalRow row = expectedIterator.next();
+ assertThat(record.get(0)).isEqualTo(row.getInt(0));
+ assertThat(record.get(1)).isEqualTo(row.getString(1).toString());
+ }
+ assertThat(actualIterator.hasNext()).isFalse();
+ assertThat(expectedIterator.hasNext()).isFalse();
+ }
+
+ @Test
+ void testPkTableCompactionWithConflict() throws Exception {
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
+ long t1Id = createPkTable(t1, 1, true, pkSchema);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+
+ List<InternalRow> rows = Collections.singletonList(row(1, "v1"));
+ flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 1, false,
rows));
+ checkFileStatusInIcebergTable(t1, 1, false);
+
+ rows = Collections.singletonList(row(2, "v1"));
+ flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 2, false,
rows));
+
+ rows = Collections.singletonList(row(3, "v1"));
+ flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 3, false,
rows));
+
+ // add pos-delete and trigger compaction
+ rows = Arrays.asList(row(4, "v1"), row(4, "v2"));
+ flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false,
rows).get(1));
+ // rewritten files should fail to commit due to conflict, add
check here
+ checkRecords(getIcebergRecords(t1), flussRows);
+ // 4 data file and 1 delete file
+ checkFileStatusInIcebergTable(t1, 4, true);
+
+ // previous compaction conflicts won't prevent further compaction,
and check iceberg
+ // records
+ rows = Collections.singletonList(row(5, "v1"));
+ flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false,
rows));
+ checkRecords(getIcebergRecords(t1), flussRows);
+ checkFileStatusInIcebergTable(t1, 2, false);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
@Test
void testLogTableCompaction() throws Exception {
JobClient jobClient = buildTieringJob(execEnv);
try {
TablePath t1 = TablePath.of(DEFAULT_DB, "log_table");
- long t1Id = createLogTable(t1, true);
+ long t1Id = createLogTable(t1, 1, true, logSchema);
TableBucket t1Bucket = new TableBucket(t1Id, 0);
int i = 0;
List<InternalRow> flussRows = new ArrayList<>();
- flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+ flussRows.addAll(
+ writeIcebergTableRecords(
+ t1, t1Bucket, ++i, true,
Collections.singletonList(row(1, "v1"))));
- flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+ flussRows.addAll(
+ writeIcebergTableRecords(
+ t1, t1Bucket, ++i, true,
Collections.singletonList(row(1, "v1"))));
- flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
- checkFileCountInIcebergTable(t1, 3);
+ flussRows.addAll(
+ writeIcebergTableRecords(
+ t1, t1Bucket, ++i, true,
Collections.singletonList(row(1, "v1"))));
+ checkFileStatusInIcebergTable(t1, 3, false);
// Write should trigger compaction now since the current data file
count is greater or
// equal MIN_FILES_TO_COMPACT
- flussRows.addAll(writeLogTableRecords(t1, t1Bucket, ++i));
+ flussRows.addAll(
+ writeIcebergTableRecords(
+ t1, t1Bucket, ++i, true,
Collections.singletonList(row(1, "v1"))));
// Should only have two files now, one file it for newly written,
one file is for target
// compacted file
- checkFileCountInIcebergTable(t1, 2);
+ checkFileStatusInIcebergTable(t1, 2, false);
// check data in iceberg to make sure compaction won't lose data
or duplicate data
- checkDataInIcebergAppendOnlyTable(t1, flussRows, 0);
+ checkRecords(getIcebergRecords(t1), flussRows);
} finally {
jobClient.cancel().get();
}
}
- private List<InternalRow> writeLogTableRecords(
- TablePath tablePath, TableBucket tableBucket, long
expectedLogEndOffset)
+ private List<InternalRow> writeIcebergTableRecords(
+ TablePath tablePath,
+ TableBucket tableBucket,
+ long expectedLogEndOffset,
+ boolean append,
+ List<InternalRow> rows)
throws Exception {
- List<InternalRow> rows = Arrays.asList(row(1, "v1"));
- writeRows(tablePath, rows, true);
+ writeRows(tablePath, rows, append);
assertReplicaStatus(tableBucket, expectedLogEndOffset);
return rows;
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index fdf5b02c5..2a216ddea 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -24,7 +24,6 @@ import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.writer.AppendWriter;
import org.apache.fluss.client.table.writer.TableWriter;
import org.apache.fluss.client.table.writer.UpsertWriter;
-import org.apache.fluss.config.AutoPartitionTimeUnit;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
@@ -38,8 +37,6 @@ import org.apache.fluss.row.InternalRow;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.server.zk.ZooKeeperClient;
-import org.apache.fluss.types.DataTypes;
-import org.apache.fluss.utils.DateTimeUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
@@ -67,10 +64,6 @@ import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.time.Duration;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.time.OffsetDateTime;
-import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -187,85 +180,36 @@ public class FlinkIcebergTieringTestBase {
return catalog;
}
- protected long createPkTable(TablePath tablePath) throws Exception {
- return createPkTable(tablePath, false);
- }
-
- protected long createPkTable(TablePath tablePath, boolean
enableAutoCompaction)
- throws Exception {
- return createPkTable(tablePath, 1, enableAutoCompaction);
- }
-
- protected long createLogTable(TablePath tablePath) throws Exception {
- return createLogTable(tablePath, false);
- }
-
- protected long createLogTable(TablePath tablePath, boolean
enableAutoCompaction)
- throws Exception {
- return createLogTable(tablePath, 1, false, enableAutoCompaction);
- }
-
- protected long createLogTable(
- TablePath tablePath, int bucketNum, boolean isPartitioned, boolean
enableAutoCompaction)
+ protected long createPkTable(
+ TablePath tablePath, int bucketNum, boolean enableAutoCompaction,
Schema schema)
throws Exception {
- Schema.Builder schemaBuilder =
- Schema.newBuilder().column("a", DataTypes.INT()).column("b",
DataTypes.STRING());
-
- TableDescriptor.Builder tableBuilder =
+ TableDescriptor.Builder pkTableBuilder =
TableDescriptor.builder()
- .distributedBy(bucketNum, "a")
+ .schema(schema)
+ .distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
- if (isPartitioned) {
- schemaBuilder.column("c", DataTypes.STRING());
- tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
- tableBuilder.partitionedBy("c");
- tableBuilder.property(
- ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
- }
if (enableAutoCompaction) {
-
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
+
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
}
- tableBuilder.schema(schemaBuilder.build());
- return createTable(tablePath, tableBuilder.build());
+ return createTable(tablePath, pkTableBuilder.build());
}
- protected long createPkTable(TablePath tablePath, int bucketNum, boolean
enableAutoCompaction)
+ protected long createLogTable(
+ TablePath tablePath, int bucketNum, boolean enableAutoCompaction,
Schema schema)
throws Exception {
- TableDescriptor.Builder pkTableBuilder =
+ TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
- .schema(
- Schema.newBuilder()
- .column("f_boolean",
DataTypes.BOOLEAN())
- .column("f_byte", DataTypes.TINYINT())
- .column("f_short",
DataTypes.SMALLINT())
- .column("f_int", DataTypes.INT())
- .column("f_long", DataTypes.BIGINT())
- .column("f_float", DataTypes.FLOAT())
- .column("f_double", DataTypes.DOUBLE())
- .column("f_string", DataTypes.STRING())
- .column("f_decimal1",
DataTypes.DECIMAL(5, 2))
- .column("f_decimal2",
DataTypes.DECIMAL(20, 0))
- .column("f_timestamp_ltz1",
DataTypes.TIMESTAMP_LTZ(3))
- .column("f_timestamp_ltz2",
DataTypes.TIMESTAMP_LTZ(6))
- .column("f_timestamp_ntz1",
DataTypes.TIMESTAMP(3))
- .column("f_timestamp_ntz2",
DataTypes.TIMESTAMP(6))
- .column("f_binary",
DataTypes.BINARY(4))
- .column("f_date", DataTypes.DATE())
- .column("f_time", DataTypes.TIME())
- .column("f_char", DataTypes.CHAR(3))
- .column("f_bytes", DataTypes.BYTES())
- .primaryKey("f_int")
- .build())
- .distributedBy(bucketNum)
+ .distributedBy(bucketNum, "f_int")
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
if (enableAutoCompaction) {
-
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
+
tableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
}
- return createTable(tablePath, pkTableBuilder.build());
+ tableBuilder.schema(schema);
+ return createTable(tablePath, tableBuilder.build());
}
protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
@@ -342,45 +286,14 @@ public class FlinkIcebergTieringTestBase {
}
}
- protected void checkDataInIcebergPrimaryKeyTable(
- TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
+ protected List<Record> getIcebergRecords(TablePath tablePath) throws
IOException {
+ List<Record> icebergRecords = new ArrayList<>();
try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
- for (InternalRow row : expectedRows) {
- Record record = records.next();
- assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
- assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
- assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
- assertThat(record.get(3)).isEqualTo(row.getInt(3));
- assertThat(record.get(4)).isEqualTo(row.getLong(4));
- assertThat(record.get(5)).isEqualTo(row.getFloat(5));
- assertThat(record.get(6)).isEqualTo(row.getDouble(6));
-
assertThat(record.get(7)).isEqualTo(row.getString(7).toString());
- // Iceberg expects BigDecimal for decimal types.
- assertThat(record.get(8)).isEqualTo(row.getDecimal(8, 5,
2).toBigDecimal());
- assertThat(record.get(9)).isEqualTo(row.getDecimal(9, 20,
0).toBigDecimal());
- assertThat(record.get(10))
- .isEqualTo(
- OffsetDateTime.ofInstant(
- row.getTimestampLtz(10,
3).toInstant(), ZoneOffset.UTC));
- assertThat(record.get(11))
- .isEqualTo(
- OffsetDateTime.ofInstant(
- row.getTimestampLtz(11,
6).toInstant(), ZoneOffset.UTC));
- assertThat(record.get(12)).isEqualTo(row.getTimestampNtz(12,
6).toLocalDateTime());
- assertThat(record.get(13)).isEqualTo(row.getTimestampNtz(13,
6).toLocalDateTime());
- // Iceberg's Record interface expects ByteBuffer for binary
types.
-
assertThat(record.get(14)).isEqualTo(ByteBuffer.wrap(row.getBinary(14, 4)));
- assertThat(record.get(15))
- .isEqualTo(DateTimeUtils.toLocalDate(row.getInt(15)))
- .isEqualTo(LocalDate.of(2023, 10, 25));
- assertThat(record.get(16))
- .isEqualTo(DateTimeUtils.toLocalTime(row.getInt(16)))
- .isEqualTo(LocalTime.of(9, 30, 0, 0));
- assertThat(record.get(17)).isEqualTo(row.getChar(17,
3).toString());
-
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
+ while (records.hasNext()) {
+ icebergRecords.add(records.next());
}
- assertThat(records.hasNext()).isFalse();
}
+ return icebergRecords;
}
protected void checkDataInIcebergAppendOnlyTable(
@@ -400,12 +313,18 @@ public class FlinkIcebergTieringTestBase {
}
}
- protected void checkFileCountInIcebergTable(TablePath tablePath, int
expectedFileCount)
+ protected void checkFileStatusInIcebergTable(
+ TablePath tablePath, int expectedFileCount, boolean
shouldDeleteFileExist)
throws IOException {
org.apache.iceberg.Table table =
icebergCatalog.loadTable(toIceberg(tablePath));
int count = 0;
try (CloseableIterable<FileScanTask> tasks =
table.newScan().planFiles()) {
for (FileScanTask ignored : tasks) {
+ if (shouldDeleteFileExist) {
+ assertThat(ignored.deletes()).isNotEmpty();
+ } else {
+ assertThat(ignored.deletes()).isEmpty();
+ }
count++;
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 9561c3a71..a8dbe76e3 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -30,24 +30,33 @@ import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
import org.apache.fluss.types.DataTypes;
+import org.apache.fluss.utils.DateTimeUtils;
import org.apache.fluss.utils.TypeUtils;
import org.apache.fluss.utils.types.Tuple2;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.data.Record;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.nio.ByteBuffer;
import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static
org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
/** The ITCase for tiering into iceberg. */
class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
@@ -56,6 +65,36 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
private static StreamExecutionEnvironment execEnv;
+ private static final Schema pkSchema =
+ Schema.newBuilder()
+ .column("f_boolean", DataTypes.BOOLEAN())
+ .column("f_byte", DataTypes.TINYINT())
+ .column("f_short", DataTypes.SMALLINT())
+ .column("f_int", DataTypes.INT())
+ .column("f_long", DataTypes.BIGINT())
+ .column("f_float", DataTypes.FLOAT())
+ .column("f_double", DataTypes.DOUBLE())
+ .column("f_string", DataTypes.STRING())
+ .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+ .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+ .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
+ .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
+ .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+ .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+ .column("f_binary", DataTypes.BINARY(4))
+ .column("f_date", DataTypes.DATE())
+ .column("f_time", DataTypes.TIME())
+ .column("f_char", DataTypes.CHAR(3))
+ .column("f_bytes", DataTypes.BYTES())
+ .primaryKey("f_int")
+ .build();
+
+ private static final Schema logSchema =
+ Schema.newBuilder()
+ .column("f_int", DataTypes.INT())
+ .column("f_str", DataTypes.STRING())
+ .build();
+
@BeforeAll
protected static void beforeAll() {
FlinkIcebergTieringTestBase.beforeAll();
@@ -68,7 +107,7 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
void testTiering() throws Exception {
// create a pk table, write some records and wait until snapshot
finished
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
- long t1Id = createPkTable(t1);
+ long t1Id = createPkTable(t1, 1, false, pkSchema);
TableBucket t1Bucket = new TableBucket(t1Id, 0);
// write records
List<InternalRow> rows =
@@ -239,6 +278,49 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
}
}
+ private void checkDataInIcebergPrimaryKeyTable(
+ TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
+ Iterator<Record> acturalIterator =
getIcebergRecords(tablePath).iterator();
+ Iterator<InternalRow> iterator = expectedRows.iterator();
+ while (iterator.hasNext() && acturalIterator.hasNext()) {
+ InternalRow row = iterator.next();
+ Record record = acturalIterator.next();
+ assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
+ assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
+ assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
+ assertThat(record.get(3)).isEqualTo(row.getInt(3));
+ assertThat(record.get(4)).isEqualTo(row.getLong(4));
+ assertThat(record.get(5)).isEqualTo(row.getFloat(5));
+ assertThat(record.get(6)).isEqualTo(row.getDouble(6));
+ assertThat(record.get(7)).isEqualTo(row.getString(7).toString());
+ // Iceberg expects BigDecimal for decimal types.
+ assertThat(record.get(8)).isEqualTo(row.getDecimal(8, 5,
2).toBigDecimal());
+ assertThat(record.get(9)).isEqualTo(row.getDecimal(9, 20,
0).toBigDecimal());
+ assertThat(record.get(10))
+ .isEqualTo(
+ OffsetDateTime.ofInstant(
+ row.getTimestampLtz(10, 3).toInstant(),
ZoneOffset.UTC));
+ assertThat(record.get(11))
+ .isEqualTo(
+ OffsetDateTime.ofInstant(
+ row.getTimestampLtz(11, 6).toInstant(),
ZoneOffset.UTC));
+ assertThat(record.get(12)).isEqualTo(row.getTimestampNtz(12,
6).toLocalDateTime());
+ assertThat(record.get(13)).isEqualTo(row.getTimestampNtz(13,
6).toLocalDateTime());
+ // Iceberg's Record interface expects ByteBuffer for binary types.
+
assertThat(record.get(14)).isEqualTo(ByteBuffer.wrap(row.getBinary(14, 4)));
+ assertThat(record.get(15))
+ .isEqualTo(DateTimeUtils.toLocalDate(row.getInt(15)))
+ .isEqualTo(LocalDate.of(2023, 10, 25));
+ assertThat(record.get(16))
+ .isEqualTo(DateTimeUtils.toLocalTime(row.getInt(16)))
+ .isEqualTo(LocalTime.of(9, 30, 0, 0));
+ assertThat(record.get(17)).isEqualTo(row.getChar(17,
3).toString());
+
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
+ }
+ assertThat(acturalIterator.hasNext()).isFalse();
+ assertThat(iterator.hasNext()).isFalse();
+ }
+
private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath
partitionedTablePath)
throws Exception {
TableDescriptor partitionedTableDescriptor =
@@ -265,7 +347,7 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
private void testLogTableTiering() throws Exception {
// then, create another log table
TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
- long t2Id = createLogTable(t2);
+ long t2Id = createLogTable(t2, 1, false, logSchema);
TableBucket t2Bucket = new TableBucket(t2Id, 0);
List<InternalRow> flussRows = new ArrayList<>();
List<InternalRow> rows;
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
index 207714305..d44125857 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
@@ -79,7 +79,9 @@ class IcebergWriteResultSerializerTest {
// with rewrite result
RewriteDataFileResult rewriteDataFileResult =
new RewriteDataFileResult(
- Collections.singletonList(dataFile),
Collections.singletonList(dataFile));
+ 1L,
+ Collections.singletonList(dataFile),
+ Collections.singletonList(dataFile));
originalResult = new IcebergWriteResult(writeResult,
rewriteDataFileResult);
serializedData = serializer.serialize(originalResult);
deserializedResult = serializer.deserialize(serializer.getVersion(),
serializedData);