Copilot commented on code in PR #1572:
URL: https://github.com/apache/fluss/pull/1572#discussion_r2290261020


##########
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for tiering into iceberg. */
+class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkIcebergTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create a pk table, write some records and wait until snapshot 
finished
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+        long t1Id = createPkTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 3);
+
+        checkDataInIcebergPrimayKeyTable(t1, rows);
+        // check snapshot property in paimon
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "[{\"bucket_id\":0,\"log_offset\":3}]");
+                    }
+                };
+        checkSnapshotPropertyInIceberg(t1, properties);
+
+        // then, create another log table
+        TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+        long t2Id = createLogTable(t2);
+        TableBucket t2Bucket = new TableBucket(t2Id, 0);
+        List<InternalRow> flussRows = new ArrayList<>();
+        // write records
+        for (int i = 0; i < 10; i++) {
+            rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+            flussRows.addAll(rows);
+            // write records
+            writeRows(t2, rows, true);
+        }
+        // check the status of replica after synced;
+        // note: we can't update log start offset for unaware bucket mode log 
table
+        assertReplicaStatus(t2Bucket, 30);
+
+        // check data in paimon
+        checkDataInIcebergAppendOnlyTable(t2, flussRows, 0);
+
+        // then write data to the pk tables
+        // write records
+        rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
+        // write records
+        writeRows(t1, rows, false);
+
+        // check the status of replica of t2 after synced
+        // not check start offset since we won't
+        // update start log offset for primary key table
+        assertReplicaStatus(t1Bucket, 9);
+
+        checkDataInIcebergPrimayKeyTable(t1, rows);

Review Comment:
   Method name has a typo: 'PrimayKeyTable' should be 'PrimaryKeyTable'
   ```suggestion
           checkDataInIcebergPrimaryKeyTable(t1, rows);
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for tiering into iceberg. */
+class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
+
+    protected static final String DEFAULT_DB = "fluss";
+
+    private static StreamExecutionEnvironment execEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkIcebergTieringTestBase.beforeAll();
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setParallelism(2);
+        execEnv.enableCheckpointing(1000);
+    }
+
+    @Test
+    void testTiering() throws Exception {
+        // create a pk table, write some records and wait until snapshot 
finished
+        TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+        long t1Id = createPkTable(t1);
+        TableBucket t1Bucket = new TableBucket(t1Id, 0);
+        // write records
+        List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), 
row(3, "v3"));
+        writeRows(t1, rows, false);
+        waitUntilSnapshot(t1Id, 1, 0);
+
+        // then start tiering job
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1Bucket, 3);
+
+        checkDataInIcebergPrimayKeyTable(t1, rows);

Review Comment:
   Method name has a typo: 'PrimayKeyTable' should be 'PrimaryKeyTable'
   ```suggestion
           checkDataInIcebergPrimaryKeyTable(t1, rows);
   ```



##########
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java:
##########
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.testutils;
+
+import com.alibaba.fluss.client.Connection;
+import com.alibaba.fluss.client.ConnectionFactory;
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.writer.AppendWriter;
+import com.alibaba.fluss.client.table.writer.TableWriter;
+import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.FlussRuntimeException;
+import com.alibaba.fluss.flink.tiering.LakeTieringJobBuilder;
+import com.alibaba.fluss.metadata.DataLakeFormat;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.server.replica.Replica;
+import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.server.zk.ZooKeeperClient;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.parquet.Parquet;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.Closeable;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for tiering to Iceberg by Flink. */
+public class FlinkIcebergTieringTestBase {
+
+    @RegisterExtension
+    public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+            FlussClusterExtension.builder()
+                    .setClusterConf(initConfig())
+                    .setNumOfTabletServers(3)
+                    .build();
+
+    protected StreamExecutionEnvironment execEnv;
+
+    protected static Connection conn;
+    protected static Admin admin;
+    protected static Configuration clientConf;
+    protected static String warehousePath;
+    protected static Catalog icebergCatalog;
+
+    private static Configuration initConfig() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+                .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 
Integer.MAX_VALUE);
+
+        // Configure the tiering sink to be Iceberg
+        conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
+        conf.setString("datalake.iceberg.type", "hadoop");
+        try {
+            warehousePath =
+                    Files.createTempDirectory("fluss-testing-iceberg-tiered")
+                            .resolve("warehouse")
+                            .toString();
+        } catch (Exception e) {
+            throw new FlussRuntimeException("Failed to create Iceberg 
warehouse path", e);
+        }
+        conf.setString("datalake.iceberg.warehouse", warehousePath);
+        return conf;
+    }
+
+    @BeforeAll
+    protected static void beforeAll() {
+        clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        conn = ConnectionFactory.createConnection(clientConf);
+        admin = conn.getAdmin();
+        icebergCatalog = getIcebergCatalog();
+    }
+
+    @AfterAll
+    static void afterAll() throws Exception {
+        if (admin != null) {
+            admin.close();
+            admin = null;
+        }
+        if (conn != null) {
+            conn.close();
+            conn = null;
+        }
+        if (icebergCatalog instanceof Closeable) {
+            ((Closeable) icebergCatalog).close();
+            icebergCatalog = null;
+        }
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        execEnv.setParallelism(2);
+    }
+
+    protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) 
throws Exception {
+        Configuration flussConfig = new Configuration(clientConf);
+        flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+        return LakeTieringJobBuilder.newBuilder(
+                        execEnv,
+                        flussConfig,
+                        Configuration.fromMap(getIcebergCatalogConf()),
+                        DataLakeFormat.ICEBERG.toString())
+                .build();
+    }
+
+    protected static Map<String, String> getIcebergCatalogConf() {
+        Map<String, String> icebergConf = new HashMap<>();
+        icebergConf.put("type", "hadoop");
+        icebergConf.put("warehouse", warehousePath);
+        return icebergConf;
+    }
+
+    protected static Catalog getIcebergCatalog() {
+        HadoopCatalog catalog = new HadoopCatalog();
+        catalog.setConf(new org.apache.hadoop.conf.Configuration());
+        Map<String, String> properties = new HashMap<>();
+        properties.put("warehouse", warehousePath);
+        catalog.initialize("hadoop", properties);
+        return catalog;
+    }
+
+    protected long createPkTable(TablePath tablePath) throws Exception {
+        return createPkTable(tablePath, 1);
+    }
+
+    protected long createLogTable(TablePath tablePath) throws Exception {
+        return createLogTable(tablePath, 1);
+    }
+
+    protected long createLogTable(TablePath tablePath, int bucketNum) throws 
Exception {
+        return createLogTable(tablePath, bucketNum, false);
+    }
+
+    protected long createLogTable(TablePath tablePath, int bucketNum, boolean 
isPartitioned)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder().column("a", DataTypes.INT()).column("b", 
DataTypes.STRING());
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(bucketNum, "a")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (isPartitioned) {
+            schemaBuilder.column("c", DataTypes.STRING());
+            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
+            tableBuilder.partitionedBy("c");
+            tableBuilder.property(
+                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
+        }
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
+    protected long createPkTable(TablePath tablePath, int bucketNum) throws 
Exception {
+        TableDescriptor table1Descriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT())
+                                        .column("b", DataTypes.STRING())
+                                        .primaryKey("a")
+                                        .build())
+                        .distributedBy(bucketNum)
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500))
+                        .build();
+        return createTable(tablePath, table1Descriptor);
+    }
+
+    protected long createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws Exception {
+        admin.createTable(tablePath, tableDescriptor, true).get();
+        return admin.getTableInfo(tablePath).get().getTableId();
+    }
+
+    protected void assertReplicaStatus(TableBucket tb, long 
expectedLogEndOffset) {
+        retry(
+                Duration.ofMinutes(1),
+                () -> {
+                    Replica replica = getLeaderReplica(tb);
+                    // datalake snapshot id should be updated
+                    assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+                            .isGreaterThanOrEqualTo(0);
+                    
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+                });
+    }
+
+    public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
+        return waitUntilPartitions(
+                FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+                tablePath,
+                
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+    }
+
+    /**
+     * Wait until the given number of partitions is created. Return the map 
from partition id to
+     * partition name.
+     */
+    public static Map<Long, String> waitUntilPartitions(
+            ZooKeeperClient zooKeeperClient, TablePath tablePath, int 
expectPartitions) {
+        return waitValue(
+                () -> {
+                    Map<Long, String> gotPartitions =
+                            zooKeeperClient.getPartitionIdAndNames(tablePath);
+                    return expectPartitions == gotPartitions.size()
+                            ? Optional.of(gotPartitions)
+                            : Optional.empty();
+                },
+                Duration.ofMinutes(1),
+                String.format("expect %d table partition has not been 
created", expectPartitions));
+    }
+
+    protected Replica getLeaderReplica(TableBucket tableBucket) {
+        return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+    }
+
+    protected void writeRows(TablePath tablePath, List<InternalRow> rows, 
boolean append)
+            throws Exception {
+        try (Table table = conn.getTable(tablePath)) {
+            TableWriter tableWriter;
+            if (append) {
+                tableWriter = table.newAppend().createWriter();
+            } else {
+                tableWriter = table.newUpsert().createWriter();
+            }
+            for (InternalRow row : rows) {
+                if (tableWriter instanceof AppendWriter) {
+                    ((AppendWriter) tableWriter).append(row);
+                } else {
+                    ((UpsertWriter) tableWriter).upsert(row);
+                }
+            }
+            tableWriter.flush();
+        }
+    }
+
+    protected void waitUntilSnapshot(long tableId, int bucketNum, long 
snapshotId) {
+        for (int i = 0; i < bucketNum; i++) {
+            TableBucket tableBucket = new TableBucket(tableId, i);
+            FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket, 
snapshotId);
+        }
+    }
+
+    protected void checkDataInIcebergPrimayKeyTable(

Review Comment:
   Method name has a typo: 'PrimayKeyTable' should be 'PrimaryKeyTable'
   ```suggestion
       protected void checkDataInIcebergPrimaryKeyTable(
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to