This is an automated email from the ASF dual-hosted git repository. mehulbatra pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push: new ae7387e38 [lake/iceberg] Add iceberg it case (#1572) ae7387e38 is described below commit ae7387e387168fe97fd2cfb0a493aeb60bfdb5ca Author: yuxia Luo <luoyu...@alumni.sjtu.edu.cn> AuthorDate: Fri Aug 22 17:31:15 2025 +0800 [lake/iceberg] Add iceberg it case (#1572) * add iceberg it case * address comments --- fluss-lake/fluss-lake-iceberg/pom.xml | 83 +++ .../testutils/FlinkIcebergTieringTestBase.java} | 554 ++++++++++----------- .../lake/iceberg/tiering/IcebergTieringITCase.java | 205 ++++++++ .../iceberg/{ => tiering}/IcebergTieringTest.java | 21 +- .../testutils/FlinkPaimonTieringTestBase.java | 14 - 5 files changed, 568 insertions(+), 309 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml b/fluss-lake/fluss-lake-iceberg/pom.xml index 274de4919..e0443c49b 100644 --- a/fluss-lake/fluss-lake-iceberg/pom.xml +++ b/fluss-lake/fluss-lake-iceberg/pom.xml @@ -68,6 +68,8 @@ <artifactId>iceberg-bundled-guava</artifactId> <version>${iceberg.version}</version> </dependency> + + <!-- test dependency --> <dependency> <groupId>com.alibaba.fluss</groupId> <artifactId>fluss-common</artifactId> @@ -80,6 +82,33 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-server</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-server</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-test-utils</artifactId> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> @@ -130,6 +159,22 @@ <artifactId>commons-io</artifactId> <groupId>commons-io</groupId> </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> </exclusions> </dependency> @@ -140,6 +185,44 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>com.alibaba.fluss</groupId> + <artifactId>fluss-flink-common</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-test-utils</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> <build> diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java similarity index 53% copy from fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java copy to fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index 8abb04d35..b4abe4ab5 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,7 +16,7 @@ * limitations under the License. */ -package com.alibaba.fluss.lake.paimon.testutils; +package com.alibaba.fluss.lake.iceberg.testutils; import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; @@ -43,40 +44,48 @@ import com.alibaba.fluss.types.DataTypes; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.paimon.Snapshot; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.CatalogContext; -import org.apache.paimon.catalog.CatalogFactory; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.options.Options; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.utils.CloseableIterator; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.parquet.Parquet; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; +import java.io.Closeable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.file.Files; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import static com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL; +import static com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; import static com.alibaba.fluss.testutils.DataTestUtils.row; import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry; -import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil; import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue; +import static org.apache.iceberg.expressions.Expressions.equal; import static org.assertj.core.api.Assertions.assertThat; -/** Test base for sync to paimon by Flink. */ -public class FlinkPaimonTieringTestBase { - protected static final String DEFAULT_DB = "fluss"; +/** Test base for tiering to Iceberg by Flink. */ +public class FlinkIcebergTieringTestBase { @RegisterExtension public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = @@ -85,31 +94,31 @@ public class FlinkPaimonTieringTestBase { .setNumOfTabletServers(3) .build(); - protected static final String CATALOG_NAME = "testcatalog"; protected StreamExecutionEnvironment execEnv; protected static Connection conn; protected static Admin admin; protected static Configuration clientConf; protected static String warehousePath; - protected static Catalog paimonCatalog; + protected static Catalog icebergCatalog; private static Configuration initConfig() { Configuration conf = new Configuration(); conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1)) - // not to clean snapshots for test purpose .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE); - conf.setString("datalake.format", "paimon"); - conf.setString("datalake.paimon.metastore", "filesystem"); + + // Configure the tiering sink to be Iceberg + conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG); + conf.setString("datalake.iceberg.type", "hadoop"); try { warehousePath = - Files.createTempDirectory("fluss-testing-datalake-tiered") + Files.createTempDirectory("fluss-testing-iceberg-tiered") .resolve("warehouse") .toString(); } catch (Exception e) { - throw new FlussRuntimeException("Failed to create warehouse path"); + throw new FlussRuntimeException("Failed to create Iceberg warehouse path", e); } - conf.setString("datalake.paimon.warehouse", warehousePath); + conf.setString("datalake.iceberg.warehouse", warehousePath); return conf; } @@ -118,33 +127,7 @@ public class FlinkPaimonTieringTestBase { clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig(); conn = ConnectionFactory.createConnection(clientConf); admin = conn.getAdmin(); - paimonCatalog = getPaimonCatalog(); - } - - @BeforeEach - public void beforeEach() { - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); - execEnv.setParallelism(2); - execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - } - - protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { - Configuration flussConfig = new Configuration(clientConf); - flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); - return LakeTieringJobBuilder.newBuilder( - execEnv, - flussConfig, - Configuration.fromMap(getPaimonCatalogConf()), - DataLakeFormat.PAIMON.toString()) - .build(); - } - - protected static Map<String, String> getPaimonCatalogConf() { - Map<String, String> paimonConf = new HashMap<>(); - paimonConf.put("metastore", "filesystem"); - paimonConf.put("warehouse", warehousePath); - return paimonConf; + icebergCatalog = getIcebergCatalog(); } @AfterAll @@ -157,106 +140,48 @@ public class FlinkPaimonTieringTestBase { conn.close(); conn = null; } - } - - protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) - throws Exception { - admin.createTable(tablePath, tableDescriptor, true).get(); - return admin.getTableInfo(tablePath).get().getTableId(); - } - - protected void waitUntilSnapshot(long tableId, int bucketNum, long snapshotId) { - for (int i = 0; i < bucketNum; i++) { - TableBucket tableBucket = new TableBucket(tableId, i); - FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId); - } - } - - protected void writeRows(TablePath tablePath, List<InternalRow> rows, boolean append) - throws Exception { - try (Table table = conn.getTable(tablePath)) { - TableWriter tableWriter; - if (append) { - tableWriter = table.newAppend().createWriter(); - } else { - tableWriter = table.newUpsert().createWriter(); - } - for (InternalRow row : rows) { - if (tableWriter instanceof AppendWriter) { - ((AppendWriter) tableWriter).append(row); - } else { - ((UpsertWriter) tableWriter).upsert(row); - } - } - tableWriter.flush(); + if (icebergCatalog instanceof Closeable) { + ((Closeable) icebergCatalog).close(); + icebergCatalog = null; } } - protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable( - TablePath tablePath, - TableDescriptor tableDescriptor, - Map<Long, String> partitionNameByIds) - throws Exception { - List<InternalRow> rows = new ArrayList<>(); - Map<String, List<InternalRow>> writtenRowsByPartition = new HashMap<>(); - for (String partitionName : partitionNameByIds.values()) { - List<InternalRow> partitionRows = - Arrays.asList( - row(11, "v1", partitionName), - row(12, "v2", partitionName), - row(13, "v3", partitionName)); - rows.addAll(partitionRows); - writtenRowsByPartition.put(partitionName, partitionRows); - } - - writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey()); - return writtenRowsByPartition; + @BeforeEach + public void beforeEach() { + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING); + execEnv.setParallelism(2); } - /** - * Wait until the default number of partitions is created. Return the map from partition id to - * partition name. . - */ - public static Map<Long, String> waitUntilPartitions( - ZooKeeperClient zooKeeperClient, TablePath tablePath) { - return waitUntilPartitions( - zooKeeperClient, - tablePath, - ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue()); + protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception { + Configuration flussConfig = new Configuration(clientConf); + flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L)); + return LakeTieringJobBuilder.newBuilder( + execEnv, + flussConfig, + Configuration.fromMap(getIcebergCatalogConf()), + DataLakeFormat.ICEBERG.toString()) + .build(); } - public static Map<Long, String> waitUntilPartitions(TablePath tablePath) { - return waitUntilPartitions( - FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), - tablePath, - ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue()); + protected static Map<String, String> getIcebergCatalogConf() { + Map<String, String> icebergConf = new HashMap<>(); + icebergConf.put("type", "hadoop"); + icebergConf.put("warehouse", warehousePath); + return icebergConf; } - /** - * Wait until the given number of partitions is created. Return the map from partition id to - * partition name. - */ - public static Map<Long, String> waitUntilPartitions( - ZooKeeperClient zooKeeperClient, TablePath tablePath, int expectPartitions) { - return waitValue( - () -> { - Map<Long, String> gotPartitions = - zooKeeperClient.getPartitionIdAndNames(tablePath); - return expectPartitions == gotPartitions.size() - ? Optional.of(gotPartitions) - : Optional.empty(); - }, - Duration.ofMinutes(1), - String.format("expect %d table partition has not been created", expectPartitions)); - } - - protected static Catalog getPaimonCatalog() { - Map<String, String> catalogOptions = getPaimonCatalogConf(); - return CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(catalogOptions))); + protected static Catalog getIcebergCatalog() { + HadoopCatalog catalog = new HadoopCatalog(); + catalog.setConf(new org.apache.hadoop.conf.Configuration()); + Map<String, String> properties = new HashMap<>(); + properties.put("warehouse", warehousePath); + catalog.initialize("hadoop", properties); + return catalog; } - protected Replica getLeaderReplica(TableBucket tableBucket) { - return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + protected long createPkTable(TablePath tablePath) throws Exception { + return createPkTable(tablePath, 1); } protected long createLogTable(TablePath tablePath) throws Exception { @@ -289,61 +214,6 @@ public class FlinkPaimonTieringTestBase { return createTable(tablePath, tableBuilder.build()); } - protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned) - throws Exception { - Schema.Builder schemaBuilder = - Schema.newBuilder() - .column("f_boolean", DataTypes.BOOLEAN()) - .column("f_byte", DataTypes.TINYINT()) - .column("f_short", DataTypes.SMALLINT()) - .column("f_int", DataTypes.INT()) - .column("f_long", DataTypes.BIGINT()) - .column("f_float", DataTypes.FLOAT()) - .column("f_double", DataTypes.DOUBLE()) - .column("f_string", DataTypes.STRING()) - .column("f_decimal1", DataTypes.DECIMAL(5, 2)) - .column("f_decimal2", DataTypes.DECIMAL(20, 0)) - .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3)) - .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6)) - .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3)) - .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6)) - .column("f_binary", DataTypes.BINARY(4)); - - TableDescriptor.Builder tableBuilder = - TableDescriptor.builder() - .distributedBy(bucketNum, "f_int") - .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); - - if (isPartitioned) { - schemaBuilder.column("p", DataTypes.STRING()); - tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true); - tableBuilder.partitionedBy("p"); - tableBuilder.property( - ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); - } - tableBuilder.schema(schemaBuilder.build()); - return createTable(tablePath, tableBuilder.build()); - } - - protected long createPrimaryKeyTable( - TablePath tablePath, int bucketNum, List<Schema.Column> columns) throws Exception { - Schema.Builder schemaBuilder = - Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName()); - - TableDescriptor.Builder tableBuilder = - TableDescriptor.builder() - .distributedBy(bucketNum) - .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); - tableBuilder.schema(schemaBuilder.build()); - return createTable(tablePath, tableBuilder.build()); - } - - protected long createPkTable(TablePath tablePath) throws Exception { - return createPkTable(tablePath, 1); - } - protected long createPkTable(TablePath tablePath, int bucketNum) throws Exception { TableDescriptor table1Descriptor = TableDescriptor.builder() @@ -360,41 +230,10 @@ public class FlinkPaimonTieringTestBase { return createTable(tablePath, table1Descriptor); } - protected void dropTable(TablePath tablePath) throws Exception { - admin.dropTable(tablePath, false).get(); - Identifier tableIdentifier = toPaimonIdentifier(tablePath); - try { - paimonCatalog.dropTable(tableIdentifier, false); - } catch (Catalog.TableNotExistException e) { - // do nothing, table not exists - } - } - - private Identifier toPaimonIdentifier(TablePath tablePath) { - return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); - } - - protected void assertReplicaStatus( - TablePath tablePath, - long tableId, - int bucketCount, - boolean isPartitioned, - Map<TableBucket, Long> expectedLogEndOffset) { - if (isPartitioned) { - Map<Long, String> partitionById = - waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath); - for (Long partitionId : partitionById.keySet()) { - for (int i = 0; i < bucketCount; i++) { - TableBucket tableBucket = new TableBucket(tableId, partitionId, i); - assertReplicaStatus(tableBucket, expectedLogEndOffset.get(tableBucket)); - } - } - } else { - for (int i = 0; i < bucketCount; i++) { - TableBucket tableBucket = new TableBucket(tableId, i); - assertReplicaStatus(tableBucket, expectedLogEndOffset.get(tableBucket)); - } - } + protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor) + throws Exception { + admin.createTable(tablePath, tableDescriptor, true).get(); + return admin.getTableInfo(tablePath).get().getTableId(); } protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) { @@ -409,70 +248,219 @@ public class FlinkPaimonTieringTestBase { }); } - protected void waitUntilBucketSynced( - TablePath tablePath, long tableId, int bucketCount, boolean isPartition) { - if (isPartition) { - Map<Long, String> partitionById = waitUntilPartitions(tablePath); - for (Long partitionId : partitionById.keySet()) { - for (int i = 0; i < bucketCount; i++) { - TableBucket tableBucket = new TableBucket(tableId, partitionId, i); - waitUntilBucketSynced(tableBucket); - } + public static Map<Long, String> waitUntilPartitions(TablePath tablePath) { + return waitUntilPartitions( + FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), + tablePath, + ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue()); + } + + /** + * Wait until the given number of partitions is created. Return the map from partition id to + * partition name. + */ + public static Map<Long, String> waitUntilPartitions( + ZooKeeperClient zooKeeperClient, TablePath tablePath, int expectPartitions) { + return waitValue( + () -> { + Map<Long, String> gotPartitions = + zooKeeperClient.getPartitionIdAndNames(tablePath); + return expectPartitions == gotPartitions.size() + ? Optional.of(gotPartitions) + : Optional.empty(); + }, + Duration.ofMinutes(1), + String.format("expect %d table partition has not been created", expectPartitions)); + } + + protected Replica getLeaderReplica(TableBucket tableBucket) { + return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket); + } + + protected void writeRows(TablePath tablePath, List<InternalRow> rows, boolean append) + throws Exception { + try (Table table = conn.getTable(tablePath)) { + TableWriter tableWriter; + if (append) { + tableWriter = table.newAppend().createWriter(); + } else { + tableWriter = table.newUpsert().createWriter(); } - } else { - for (int i = 0; i < bucketCount; i++) { - TableBucket tableBucket = new TableBucket(tableId, i); - waitUntilBucketSynced(tableBucket); + for (InternalRow row : rows) { + if (tableWriter instanceof AppendWriter) { + ((AppendWriter) tableWriter).append(row); + } else { + ((UpsertWriter) tableWriter).upsert(row); + } } + tableWriter.flush(); } } - protected void waitUntilBucketSynced(TableBucket tb) { - waitUntil( - () -> { - Replica replica = getLeaderReplica(tb); - return replica.getLogTablet().getLakeTableSnapshotId() >= 0; - }, - Duration.ofMinutes(2), - "bucket " + tb + "not synced"); + protected void waitUntilSnapshot(long tableId, int bucketNum, long snapshotId) { + for (int i = 0; i < bucketNum; i++) { + TableBucket tableBucket = new TableBucket(tableId, i); + FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, snapshotId); + } } - protected void checkDataInPaimonPrimayKeyTable( + protected void checkDataInIcebergPrimaryKeyTable( TablePath tablePath, List<InternalRow> expectedRows) throws Exception { - Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator = - getPaimonRowCloseableIterator(tablePath); - for (InternalRow expectedRow : expectedRows) { - org.apache.paimon.data.InternalRow row = paimonRowIterator.next(); - assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0)); - assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString()); + try (CloseableIterator<Record> records = getIcebergRows(tablePath)) { + for (InternalRow row : expectedRows) { + Record record = records.next(); + assertThat(record.get(0)).isEqualTo(row.getInt(0)); + assertThat(record.get(1)).isEqualTo(row.getString(1).toString()); + } + assertThat(records.hasNext()).isFalse(); } } - protected CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator( - TablePath tablePath) throws Exception { - Identifier tableIdentifier = - Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName()); + protected void checkDataInIcebergAppendOnlyTable( + TablePath tablePath, List<InternalRow> expectedRows, long startingOffset) + throws Exception { + try (CloseableIterator<Record> records = getIcebergRows(tablePath)) { + Iterator<InternalRow> flussRowIterator = expectedRows.iterator(); + while (records.hasNext()) { + Record actualRecord = records.next(); + InternalRow flussRow = flussRowIterator.next(); + assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0)); + assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString()); + // the idx 2 is __bucket, so use 3 + assertThat(actualRecord.get(3)).isEqualTo(startingOffset++); + } + assertThat(flussRowIterator.hasNext()).isFalse(); + } + } - paimonCatalog = getPaimonCatalog(); + protected void checkDataInIcebergAppendOnlyPartitionedTable( + TablePath tablePath, + Map<String, String> partitionSpec, + List<InternalRow> expectedRows, + long startingOffset) + throws Exception { + try (CloseableIterator<Record> records = getIcebergRows(tablePath, partitionSpec)) { + Iterator<InternalRow> flussRowIterator = expectedRows.iterator(); + while (records.hasNext()) { + Record actualRecord = records.next(); + InternalRow flussRow = flussRowIterator.next(); + assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0)); + assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString()); + assertThat(actualRecord.get(2)).isEqualTo(flussRow.getString(2).toString()); + // the idx 3 is __bucket, so use 4 + assertThat(actualRecord.get(4)).isEqualTo(startingOffset++); + } + assertThat(flussRowIterator.hasNext()).isFalse(); + } + } - FileStoreTable table = (FileStoreTable) paimonCatalog.getTable(tableIdentifier); + private CloseableIterator<Record> getIcebergRows(TablePath tablePath) { + return getIcebergRows(tablePath, Collections.emptyMap()); + } + + @SuppressWarnings("resource") + private CloseableIterator<Record> getIcebergRows( + TablePath tablePath, Map<String, String> partitionSpec) { + org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath)); + // is primary key, we don't care about records order, + // use iceberg read api directly + if (!table.schema().identifierFieldIds().isEmpty()) { + IcebergGenerics.ScanBuilder scanBuilder = + filterByPartition(IcebergGenerics.read(table), partitionSpec); + return scanBuilder.build().iterator(); + } else { + // is log table, we want to compare __offset column + // so sort data files by __offset according to the column stats + List<Record> records = new ArrayList<>(); + int fieldId = table.schema().findField(OFFSET_COLUMN_NAME).fieldId(); + SortedSet<DataFile> files = + new TreeSet<>( + (f1, f2) -> { + ByteBuffer buffer1 = + (ByteBuffer) + f1.lowerBounds() + .get(fieldId) + .order(ByteOrder.LITTLE_ENDIAN) + .rewind(); + long offset1 = buffer1.getLong(); + ByteBuffer buffer2 = + (ByteBuffer) + f2.lowerBounds() + .get(fieldId) + .order(ByteOrder.LITTLE_ENDIAN) + .rewind(); + long offset2 = buffer2.getLong(); + return Long.compare(offset1, offset2); + }); + + table.refresh(); + TableScan tableScan = filterByPartition(table.newScan(), partitionSpec); + tableScan + .includeColumnStats() + .planFiles() + .iterator() + .forEachRemaining(fileScanTask -> files.add(fileScanTask.file())); + + for (DataFile file : files) { + Iterable<Record> iterable = + Parquet.read(table.io().newInputFile(file.path().toString())) + .project(table.schema()) + .createReaderFunc( + fileSchema -> + GenericParquetReaders.buildReader( + table.schema(), fileSchema)) + .build(); + iterable.forEach(records::add); + } + + return CloseableIterator.withClose(records.iterator()); + } + } + + private IcebergGenerics.ScanBuilder filterByPartition( + IcebergGenerics.ScanBuilder scanBuilder, Map<String, String> partitionSpec) { + for (Map.Entry<String, String> partitionKeyAndValue : partitionSpec.entrySet()) { + String partitionCol = partitionKeyAndValue.getKey(); + String partitionValue = partitionKeyAndValue.getValue(); + scanBuilder = scanBuilder.where(equal(partitionCol, partitionValue)); + } + return scanBuilder; + } - RecordReader<org.apache.paimon.data.InternalRow> reader = - table.newRead().createReader(table.newReadBuilder().newScan().plan()); - return reader.toCloseableIterator(); + private TableScan filterByPartition(TableScan tableScan, Map<String, String> partitionSpec) { + for (Map.Entry<String, String> partitionKeyAndValue : partitionSpec.entrySet()) { + String partitionCol = partitionKeyAndValue.getKey(); + String partitionValue = partitionKeyAndValue.getValue(); + tableScan = tableScan.filter(equal(partitionCol, partitionValue)); + } + return tableScan; } - protected void checkSnapshotPropertyInPaimon( + protected void checkSnapshotPropertyInIceberg( TablePath tablePath, Map<String, String> expectedProperties) throws Exception { - FileStoreTable table = - (FileStoreTable) - getPaimonCatalog() - .getTable( - Identifier.create( - tablePath.getDatabaseName(), - tablePath.getTableName())); - Snapshot snapshot = table.snapshotManager().latestSnapshot(); - assertThat(snapshot).isNotNull(); - assertThat(snapshot.properties()).isEqualTo(expectedProperties); + org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath)); + Snapshot snapshot = table.currentSnapshot(); + assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties); + } + + protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable( + TablePath tablePath, + TableDescriptor tableDescriptor, + Map<Long, String> partitionNameByIds) + throws Exception { + List<InternalRow> rows = new ArrayList<>(); + Map<String, List<InternalRow>> writtenRowsByPartition = new HashMap<>(); + for (String partitionName : partitionNameByIds.values()) { + List<InternalRow> partitionRows = + Arrays.asList( + row(11, "v1", partitionName), + row(12, "v2", partitionName), + row(13, "v3", partitionName)); + rows.addAll(partitionRows); + writtenRowsByPartition.put(partitionName, partitionRows); + } + + writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey()); + return writtenRowsByPartition; } } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java new file mode 100644 index 000000000..0b3e5f2dd --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.lake.iceberg.tiering; + +import com.alibaba.fluss.config.AutoPartitionTimeUnit; +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.row.InternalRow; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.utils.types.Tuple2; + +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY; +import static com.alibaba.fluss.testutils.DataTestUtils.row; + +/** The ITCase for tiering into iceberg. */ +class IcebergTieringITCase extends FlinkIcebergTieringTestBase { + + protected static final String DEFAULT_DB = "fluss"; + + private static StreamExecutionEnvironment execEnv; + + @BeforeAll + protected static void beforeAll() { + FlinkIcebergTieringTestBase.beforeAll(); + execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.setParallelism(2); + execEnv.enableCheckpointing(1000); + } + + @Test + void testTiering() throws Exception { + // create a pk table, write some records and wait until snapshot finished + TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable"); + long t1Id = createPkTable(t1); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + // write records + List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + writeRows(t1, rows, false); + waitUntilSnapshot(t1Id, 1, 0); + + // then start tiering job + JobClient jobClient = buildTieringJob(execEnv); + try { + // check the status of replica after synced + assertReplicaStatus(t1Bucket, 3); + + checkDataInIcebergPrimaryKeyTable(t1, rows); + // check snapshot property in iceberg + Map<String, String> properties = + new HashMap<String, String>() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + "[{\"bucket_id\":0,\"log_offset\":3}]"); + } + }; + checkSnapshotPropertyInIceberg(t1, properties); + + // test log table + testLogTableTiering(); + + // then write data to the pk tables + // write records + rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333")); + // write records + writeRows(t1, rows, false); + + // check the status of replica of t1 after synced + // not check start offset since we won't + // update start log offset for primary key table + // 3 initial + (3 deletes + 3 inserts) = 9 + assertReplicaStatus(t1Bucket, 9); + + checkDataInIcebergPrimaryKeyTable(t1, rows); + + // then create partitioned table and wait partitions are ready + testPartitionedTableTiering(); + } finally { + jobClient.cancel().get(); + } + } + + private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partitionedTablePath) + throws Exception { + TableDescriptor partitionedTableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("date", DataTypes.STRING()) + .build()) + .partitionedBy("date") + .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true) + .property( + ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, + AutoPartitionTimeUnit.YEAR) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .build(); + return Tuple2.of( + createTable(partitionedTablePath, partitionedTableDescriptor), + partitionedTableDescriptor); + } + + private void testLogTableTiering() throws Exception { + // then, create another log table + TablePath t2 = TablePath.of(DEFAULT_DB, "logTable"); + long t2Id = createLogTable(t2); + TableBucket t2Bucket = new TableBucket(t2Id, 0); + List<InternalRow> flussRows = new ArrayList<>(); + List<InternalRow> rows; + // write records + for (int i = 0; i < 10; i++) { + rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + flussRows.addAll(rows); + // write records + writeRows(t2, rows, true); + } + // check the status of replica after synced; + // note: we can't update log start offset for unaware bucket mode log table + assertReplicaStatus(t2Bucket, 30); + + // check data in iceberg + checkDataInIcebergAppendOnlyTable(t2, flussRows, 0); + } + + private void testPartitionedTableTiering() throws Exception { + TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTable"); + Tuple2<Long, TableDescriptor> tableIdAndDescriptor = + createPartitionedTable(partitionedTablePath); + Map<Long, String> partitionNameByIds = waitUntilPartitions(partitionedTablePath); + + // now, write rows into partitioned table + TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1; + Map<String, List<InternalRow>> writtenRowsByPartition = + writeRowsIntoPartitionedTable( + partitionedTablePath, partitionedTableDescriptor, partitionNameByIds); + long tableId = tableIdAndDescriptor.f0; + + // wait until synced to iceberg + for (Long partitionId : partitionNameByIds.keySet()) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, 0); + assertReplicaStatus(tableBucket, 3); + } + + // now, let's check data in iceberg per partition + // check data in iceberg + String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0); + for (String partitionName : partitionNameByIds.values()) { + checkDataInIcebergAppendOnlyPartitionedTable( + partitionedTablePath, + Collections.singletonMap(partitionCol, partitionName), + writtenRowsByPartition.get(partitionName), + 0); + } + + Map<String, String> properties = + new HashMap<String, String>() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + "[" + + "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3}," + + "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}" + + "]"); + } + }; + + checkSnapshotPropertyInIceberg(partitionedTablePath, properties); + } +} diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java similarity index 95% rename from fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java rename to fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java index 8a666b677..c55b834f7 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -1,12 +1,13 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,14 +16,10 @@ * limitations under the License. */ -package com.alibaba.fluss.lake.iceberg; +package com.alibaba.fluss.lake.iceberg.tiering; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.lake.committer.LakeCommitter; -import com.alibaba.fluss.lake.iceberg.tiering.IcebergCatalogProvider; -import com.alibaba.fluss.lake.iceberg.tiering.IcebergCommittable; -import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory; -import com.alibaba.fluss.lake.iceberg.tiering.IcebergWriteResult; import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer; import com.alibaba.fluss.lake.writer.LakeWriter; import com.alibaba.fluss.lake.writer.WriterInitContext; diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 8abb04d35..3dcfc826d 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -326,20 +326,6 @@ public class FlinkPaimonTieringTestBase { return createTable(tablePath, tableBuilder.build()); } - protected long createPrimaryKeyTable( - TablePath tablePath, int bucketNum, List<Schema.Column> columns) throws Exception { - Schema.Builder schemaBuilder = - Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName()); - - TableDescriptor.Builder tableBuilder = - TableDescriptor.builder() - .distributedBy(bucketNum) - .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") - .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); - tableBuilder.schema(schemaBuilder.build()); - return createTable(tablePath, tableBuilder.build()); - } - protected long createPkTable(TablePath tablePath) throws Exception { return createPkTable(tablePath, 1); }