This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2ce837889 [lake] Re-initial tiering splits for tables if reader
failover happen (#1984)
2ce837889 is described below
commit 2ce837889a6de56f558f7c5ea4766783dd63304e
Author: Junbo Wang <[email protected]>
AuthorDate: Thu Jan 15 10:35:53 2026 +0800
[lake] Re-initial tiering splits for tables if reader failover happen
(#1984)
---
.../fluss/flink/adapter/RuntimeContextAdapter.java | 34 ----
.../tiering/committer/TieringCommitOperator.java | 28 +--
.../flink/tiering/event/TieringFailOverEvent.java | 26 ---
.../source/enumerator/TieringSourceEnumerator.java | 80 ++++++--
.../fluss/flink/tiering/FlinkTieringTestBase.java | 164 +++++++++++++++++
.../fluss/flink/tiering/TieringFailoverITCase.java | 177 ++++++++++++++++++
.../FlussMockSplitEnumeratorContext.java | 85 +++++++++
.../enumerator/TieringSourceEnumeratorTest.java | 118 ++++++------
.../fluss/lake/values/TestingValuesLake.java | 202 +++++++++++++++++++++
.../lake/values/TestingValuesLakeCatalog.java | 43 +++++
.../lake/values/TestingValuesLakeStorage.java | 44 +++++
.../values/TestingValuesLakeStoragePlugin.java | 42 +++++
.../values/tiering/TestingValuesLakeCommitter.java | 121 ++++++++++++
.../tiering/TestingValuesLakeTieringFactory.java | 60 ++++++
.../values/tiering/TestingValuesLakeWriter.java | 100 ++++++++++
...apache.fluss.lake.lakestorage.LakeStoragePlugin | 19 ++
.../lake/iceberg/tiering/IcebergTieringITCase.java | 8 +-
17 files changed, 1185 insertions(+), 166 deletions(-)
diff --git
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
b/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
deleted file mode 100644
index be97b7048..000000000
---
a/fluss-flink/fluss-flink-1.18/src/main/java/org/apache/fluss/flink/adapter/RuntimeContextAdapter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.adapter;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * An adapter for Flink {@link RuntimeContext} class. The {@link
RuntimeContext} class added the
- * `getJobInfo` and `getTaskInfo` methods in version 1.19 and deprecated many
methods, such as
- * `getAttemptNumber`.
- *
- * <p>TODO: remove this class when no longer support flink 1.18.
- */
-public class RuntimeContextAdapter {
-
- public static int getAttemptNumber(RuntimeContext runtimeContext) {
- return runtimeContext.getAttemptNumber();
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 79bcc5f85..49afd5c44 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -23,10 +23,8 @@ import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.LakeSnapshot;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
-import org.apache.fluss.flink.adapter.RuntimeContextAdapter;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
import org.apache.fluss.flink.tiering.source.TieringSource;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
@@ -40,13 +38,10 @@ import org.apache.fluss.utils.ExceptionUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
-import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
import javax.annotation.Nullable;
@@ -108,29 +103,14 @@ public class TieringCommitOperator<WriteResult,
Committable>
this.collectedTableBucketWriteResults = new HashMap<>();
this.flussConfig = flussConf;
this.lakeTieringConfig = lakeTieringConfig;
- this.operatorEventGateway =
- parameters
- .getOperatorEventDispatcher()
-
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
this.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
parameters.getOutput());
- }
-
- @Override
- public void setup(
- StreamTask<?, ?> containingTask,
- StreamConfig config,
- Output<StreamRecord<CommittableMessage<Committable>>> output) {
- super.setup(containingTask, config, output);
- int attemptNumber =
RuntimeContextAdapter.getAttemptNumber(getRuntimeContext());
- if (attemptNumber > 0) {
- LOG.info("Send TieringFailoverEvent, current attempt number: {}",
attemptNumber);
- // attempt number is greater than zero, the job must failover
- operatorEventGateway.sendEventToCoordinator(
- new SourceEventWrapper(new TieringFailOverEvent()));
- }
+ this.operatorEventGateway =
+ parameters
+ .getOperatorEventDispatcher()
+
.getOperatorEventGateway(TieringSource.TIERING_SOURCE_OPERATOR_UID);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
deleted file mode 100644
index 8cbe87fd1..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringFailOverEvent.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.flink.tiering.event;
-
-import org.apache.flink.api.connector.source.SourceEvent;
-
-/** SourceEvent used to represent tiering is failover. */
-public class TieringFailOverEvent implements SourceEvent {
- private static final long serialVersionUID = 1L;
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 1ef73eb39..c1c26390a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,7 +26,6 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -41,6 +40,7 @@ import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.MapUtils;
+import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -60,6 +60,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator.HeartBeatHelper.basicHeartBeat;
@@ -105,6 +106,8 @@ public class TieringSourceEnumerator
private TieringSplitGenerator splitGenerator;
private int flussCoordinatorEpoch;
+ private volatile boolean isFailOvering = false;
+
private volatile boolean closed = false;
public TieringSourceEnumerator(
@@ -179,11 +182,53 @@ public class TieringSourceEnumerator
@Override
public void addReader(int subtaskId) {
LOG.info("Adding reader: {} to Tiering Source enumerator.", subtaskId);
- if (context.registeredReaders().containsKey(subtaskId)) {
+ Map<Integer, ReaderInfo> readerByAttempt =
+ context.registeredReadersOfAttempts().get(subtaskId);
+ if (readerByAttempt != null && !readerByAttempt.isEmpty()) {
readersAwaitingSplit.add(subtaskId);
+ int maxAttempt = max(readerByAttempt.keySet());
+ if (maxAttempt >= 1) {
+ if (isFailOvering) {
+ LOG.warn(
+ "Subtask {} (max attempt {}) registered during
ongoing failover.",
+ subtaskId,
+ maxAttempt);
+ } else {
+ LOG.warn(
+ "Detected failover: subtask {} has max attempt {}
> 0. Triggering global failover handling.",
+ subtaskId,
+ maxAttempt);
+ // should be failover
+ isFailOvering = true;
+ handleSourceReaderFailOver();
+ }
+
+ // if registered readers equal to current parallelism, check
whether all registered
+ // readers have same max attempt
+ if (context.registeredReadersOfAttempts().size() ==
context.currentParallelism()) {
+ // Check if all readers have the same max attempt number
+ Set<Integer> maxAttempts =
+
context.registeredReadersOfAttempts().values().stream()
+ .map(_readerByAttempt ->
max(_readerByAttempt.keySet()))
+ .collect(Collectors.toSet());
+ int globalMaxAttempt = max(maxAttempts);
+ if (maxAttempts.size() == 1 && globalMaxAttempt >= 1) {
+ LOG.info(
+ "Failover completed. All {} subtasks reached
the same attempt number {}. Current registered readers are {}",
+ context.currentParallelism(),
+ globalMaxAttempt,
+ context.registeredReadersOfAttempts());
+ isFailOvering = false;
+ }
+ }
+ }
}
}
+ private int max(Set<Integer> integers) {
+ return integers.stream().max(Integer::compareTo).orElse(-1);
+ }
+
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof FinishedTieringEvent) {
@@ -218,18 +263,23 @@ public class TieringSourceEnumerator
}
}
- if (sourceEvent instanceof TieringFailOverEvent) {
- LOG.info(
- "Receiving tiering failover event, mark current tiering
table epoch {} as failed.",
- tieringTableEpochs);
- // we need to make all as failed
- failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
- tieringTableEpochs.clear();
- // also clean all pending splits since we mark all as failed
- pendingSplits.clear();
+ if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
+ // call one round of heartbeat to notify table has been finished
or failed
+ this.context.callAsync(
+ this::requestTieringTableSplitsViaHeartBeat,
this::generateAndAssignSplits);
}
+ }
- if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
+ private void handleSourceReaderFailOver() {
+ LOG.info(
+ "Handling source reader fail over, mark current tiering table
epoch {} as failed.",
+ tieringTableEpochs);
+ // we need to make all as failed
+ failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+ tieringTableEpochs.clear();
+ // also clean all pending splits since we mark all as failed
+ pendingSplits.clear();
+ if (!failedTableEpochs.isEmpty()) {
// call one round of heartbeat to notify table has been finished
or failed
this.context.callAsync(
this::requestTieringTableSplitsViaHeartBeat,
this::generateAndAssignSplits);
@@ -248,6 +298,10 @@ public class TieringSourceEnumerator
}
private void assignSplits() {
+ // we don't assign splits during failovering
+ if (isFailOvering) {
+ return;
+ }
/* This method may be called from both addSplitsBack and
handleSplitRequest, make it thread safe. */
synchronized (readersAwaitingSplit) {
if (!readersAwaitingSplit.isEmpty()) {
@@ -476,7 +530,7 @@ public class TieringSourceEnumerator
static LakeTieringHeartbeatResponse waitHeartbeatResponse(
CompletableFuture<LakeTieringHeartbeatResponse>
responseCompletableFuture) {
try {
- return responseCompletableFuture.get();
+ return responseCompletableFuture.get(3, TimeUnit.MINUTES);
} catch (Exception e) {
LOG.error("Failed to wait heartbeat response due to ", e);
throw new FlinkRuntimeException("Failed to wait heartbeat
response due to ", e);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
new file mode 100644
index 000000000..c87078d7f
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.List;
+
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for tiering to Values Lake by Flink. */
+class FlinkTieringTestBase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
+ protected StreamExecutionEnvironment execEnv;
+
+ protected static Connection conn;
+ protected static Admin admin;
+ protected static Configuration clientConf;
+
+ private static Configuration initConfig() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
+
+ // Configure the tiering sink to be Lance
+ conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.LANCE);
+ return conf;
+ }
+
+ @BeforeAll
+ protected static void beforeAll() {
+ clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @AfterAll
+ static void afterAll() throws Exception {
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (conn != null) {
+ conn.close();
+ conn = null;
+ }
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ execEnv.setParallelism(2);
+ }
+
+ protected long createPkTable(
+ TablePath tablePath, int bucketNum, boolean enableAutoCompaction,
Schema schema)
+ throws Exception {
+ TableDescriptor.Builder pkTableBuilder =
+ TableDescriptor.builder()
+ .schema(schema)
+ .distributedBy(bucketNum)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+
+ if (enableAutoCompaction) {
+
pkTableBuilder.property(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION.key(),
"true");
+ }
+ return createTable(tablePath, pkTableBuilder.build());
+ }
+
+ protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
+ throws Exception {
+ admin.createTable(tablePath, tableDescriptor, true).get();
+ return admin.getTableInfo(tablePath).get().getTableId();
+ }
+
+ protected void assertReplicaStatus(TableBucket tb, long
expectedLogEndOffset) {
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ Replica replica = getLeaderReplica(tb);
+ // datalake snapshot id should be updated
+ assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+ .isGreaterThanOrEqualTo(0);
+
assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+ });
+ }
+
+ protected Replica getLeaderReplica(TableBucket tableBucket) {
+ return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ }
+
+ protected void writeRows(TablePath tablePath, List<InternalRow> rows)
throws Exception {
+ try (Table table = conn.getTable(tablePath)) {
+ TableWriter tableWriter;
+ tableWriter = table.newUpsert().createWriter();
+ for (InternalRow row : rows) {
+ ((UpsertWriter) tableWriter).upsert(row);
+ }
+ tableWriter.flush();
+ }
+ }
+
+ protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
+ for (int i = 0; i < bucketNum; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ }
+ }
+
+ @SuppressWarnings("resource")
+ public List<InternalRow> getValuesRecords(TablePath tablePath) {
+ return TestingValuesLake.getResults(tablePath.toString());
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
new file mode 100644
index 000000000..a8ee17a6d
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringFailoverITCase.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.flink.tiering.committer.CommittableMessageTypeInfo;
+import org.apache.fluss.flink.tiering.committer.TieringCommitOperatorFactory;
+import org.apache.fluss.flink.tiering.source.TableBucketWriteResultTypeInfo;
+import org.apache.fluss.flink.tiering.source.TieringSource;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.lake.values.tiering.TestingValuesLakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static
org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
+import static
org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test tiering failover. */
+class TieringFailoverITCase extends FlinkTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+
+ private static StreamExecutionEnvironment execEnv;
+
+ private static final Schema schema =
+ Schema.newBuilder()
+ .column("f_int", DataTypes.INT())
+ .column("f_string", DataTypes.STRING())
+ .primaryKey("f_string")
+ .build();
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkTieringTestBase.beforeAll();
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+ execEnv.enableCheckpointing(1000);
+ }
+
+ @Test
+ void testTiering() throws Exception {
+ // create a pk table, write some records and wait until snapshot
finished
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+ long t1Id = createPkTable(t1, 1, false, schema);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ // write records
+ List<InternalRow> rows = Arrays.asList(row(1, "i1"), row(2, "i2"),
row(3, "i3"));
+ List<InternalRow> expectedRows = new ArrayList<>(rows);
+ writeRows(t1, rows);
+ waitUntilSnapshot(t1Id, 1, 0);
+
+ // fail the first write to the pk table
+ TestingValuesLake.failWhen(t1.toString()).failWriteOnce();
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 3);
+
+ checkDataInValuesTable(t1, rows);
+
+ // then write data to the pk tables
+ // write records
+ rows = Arrays.asList(row(1, "i11"), row(2, "i22"), row(3, "i33"));
+ expectedRows.addAll(rows);
+ // write records
+ writeRows(t1, rows);
+
+ // check the status of replica of t1 after synced
+ // not check start offset since we won't
+ // update start log offset for primary key table
+ assertReplicaStatus(t1Bucket, expectedRows.size());
+
+ checkDataInValuesTable(t1, expectedRows);
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv)
throws Exception {
+ Configuration flussConfig = new Configuration(clientConf);
+ flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+
+ LakeTieringFactory lakeTieringFactory = new
TestingValuesLakeTieringFactory();
+
+ // build tiering source
+ TieringSource.Builder<?> tieringSourceBuilder =
+ new TieringSource.Builder<>(flussConfig, lakeTieringFactory);
+ if (flussConfig.get(POLL_TIERING_TABLE_INTERVAL) != null) {
+ tieringSourceBuilder.withPollTieringTableIntervalMs(
+ flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
+ }
+ TieringSource<?> tieringSource = tieringSourceBuilder.build();
+ DataStreamSource<?> source =
+ execEnv.fromSource(
+ tieringSource,
+ WatermarkStrategy.noWatermarks(),
+ "TieringSource",
+ TableBucketWriteResultTypeInfo.of(
+ () ->
lakeTieringFactory.getWriteResultSerializer()));
+
+ source.getTransformation().setUid(TIERING_SOURCE_TRANSFORMATION_UID);
+
+ source.transform(
+ "TieringCommitter",
+ CommittableMessageTypeInfo.of(
+ () ->
lakeTieringFactory.getCommittableSerializer()),
+ new TieringCommitOperatorFactory(
+ flussConfig,
+ Configuration.fromMap(Collections.emptyMap()),
+ lakeTieringFactory))
+ .setParallelism(1)
+ .setMaxParallelism(1)
+ .sinkTo(new DiscardingSink())
+ .name("end")
+ .setParallelism(1);
+ String jobName =
+ execEnv.getConfiguration()
+ .getOptional(PipelineOptions.NAME)
+ .orElse("Fluss Lake Tiering FailOver IT Test.");
+
+ return execEnv.executeAsync(jobName);
+ }
+
+ private void checkDataInValuesTable(TablePath tablePath, List<InternalRow>
expectedRows)
+ throws Exception {
+ Iterator<InternalRow> actualIterator =
getValuesRecords(tablePath).iterator();
+ Iterator<InternalRow> iterator = expectedRows.iterator();
+ while (iterator.hasNext() && actualIterator.hasNext()) {
+ InternalRow row = iterator.next();
+ InternalRow record = actualIterator.next();
+ assertThat(record.getInt(0)).isEqualTo(row.getInt(0));
+ assertThat(record.getString(1)).isEqualTo(row.getString(1));
+ }
+ assertThat(actualIterator.hasNext()).isFalse();
+ assertThat(iterator.hasNext()).isFalse();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
new file mode 100644
index 000000000..2bbd11ef6
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/FlussMockSplitEnumeratorContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.source.enumerator;
+
+import org.apache.fluss.utils.MapUtils;
+
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A mock extension of {@link MockSplitEnumeratorContext} for testing
purposes, support registering
+ * source readers with attempt number.
+ *
+ * @param <SplitT> The type of {@link SourceSplit} used by the source.
+ */
+class FlussMockSplitEnumeratorContext<SplitT extends SourceSplit>
+ extends MockSplitEnumeratorContext<SplitT> {
+
+ private final ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
registeredReaders;
+
+ public FlussMockSplitEnumeratorContext(int parallelism) {
+ super(parallelism);
+ this.registeredReaders = MapUtils.newConcurrentHashMap();
+ }
+
+ public void registerSourceReader(int subtaskId, int attemptNumber, String
location) {
+ final Map<Integer, ReaderInfo> attemptReaders =
+ registeredReaders.computeIfAbsent(subtaskId, k ->
MapUtils.newConcurrentHashMap());
+ checkState(
+ !attemptReaders.containsKey(attemptNumber),
+ "ReaderInfo of subtask %s (#%s) already exists.",
+ subtaskId,
+ attemptNumber);
+ attemptReaders.put(attemptNumber, new ReaderInfo(subtaskId, location));
+ }
+
+ @Override
+ public void registerReader(ReaderInfo readerInfo) {
+ this.registerSourceReader(readerInfo.getSubtaskId(), 0,
readerInfo.getLocation());
+ }
+
+ @Override
+ public Map<Integer, ReaderInfo> registeredReaders() {
+ final Map<Integer, ReaderInfo> readers = new HashMap<>();
+ for (Map.Entry<Integer, ConcurrentMap<Integer, ReaderInfo>> entry :
+ registeredReaders.entrySet()) {
+ final int subtaskIndex = entry.getKey();
+ final Map<Integer, ReaderInfo> attemptReaders = entry.getValue();
+ if (!attemptReaders.isEmpty()) {
+ int earliestAttempt = Collections.min(attemptReaders.keySet());
+ readers.put(subtaskIndex, attemptReaders.get(earliestAttempt));
+ }
+ }
+ return Collections.unmodifiableMap(readers);
+ }
+
+ @Override
+ public Map<Integer, Map<Integer, ReaderInfo>>
registeredReadersOfAttempts() {
+ return Collections.unmodifiableMap(registeredReaders);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
index ba8c5b17a..84e5bd5db 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumeratorTest.java
@@ -20,7 +20,6 @@ package org.apache.fluss.flink.tiering.source.enumerator;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
-import org.apache.fluss.flink.tiering.event.TieringFailOverEvent;
import org.apache.fluss.flink.tiering.source.TieringTestBase;
import org.apache.fluss.flink.tiering.source.split.TieringLogSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSnapshotSplit;
@@ -32,9 +31,7 @@ import
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.PbLakeTableOffsetForBucket;
import org.apache.fluss.rpc.messages.PbLakeTableSnapshotInfo;
-import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -76,8 +73,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
int numSubtasks = 4;
int expectNumberOfSplits = 3;
// test get snapshot split & log split and the assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -85,10 +82,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
// try to assign splits
context.runPeriodicCallable(0);
@@ -161,8 +155,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
int expectNumberOfSplits = 3;
// test get snapshot split assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -170,10 +164,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < 3; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
waitUntilTieringTableSplitAssignmentReady(context,
DEFAULT_BUCKET_NUM, 3000L);
Map<Integer, List<TieringSplit>> expectedSnapshotAssignment = new
HashMap<>();
@@ -252,8 +243,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
int numSubtasks = 4;
int expectNumberOfSplits = 3;
// test get log split and the assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -261,10 +252,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
// write one row to one bucket, keep the other buckets empty
Map<Integer, Long> bucketOffsetOfFirstWrite =
@@ -351,8 +339,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
int numSubtasks = 6;
int expectNumberOfSplits = 6;
// test get snapshot split assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -360,10 +348,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
// try to assign splits
context.runPeriodicCallable(0);
@@ -455,8 +440,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
int numSubtasks = 6;
int expectNumberOfSplits = 6;
// test get log split assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -464,10 +449,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
Map<Long, Map<Integer, Long>> bucketOffsetOfFirstWrite =
appendRowForPartitionedTable(
@@ -581,8 +563,8 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
Map<Integer, Long> bucketOffsetOfWrite =
appendRow(tablePath, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
// test get log split and the assignment
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(numSubtasks)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
@@ -590,10 +572,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
// register all readers
- for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
- enumerator.handleSplitRequest(subtaskId, "localhost-" +
subtaskId);
- }
+ registerReaderAndHandleSplitRequests(context, enumerator,
numSubtasks, 0);
waitUntilTieringTableSplitAssignmentReady(context,
DEFAULT_BUCKET_NUM, 200);
List<TieringSplit> expectedAssignment = new ArrayList<>();
@@ -630,7 +609,7 @@ class TieringSourceEnumeratorTest extends TieringTestBase {
}
@Test
- void testHandleFailOverEvent() throws Throwable {
+ void testHandleReaderFailOver() throws Throwable {
TablePath tablePath1 = TablePath.of(DEFAULT_DB,
"tiering-failover-test-log-table1");
createTable(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR);
appendRow(tablePath1, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
@@ -639,37 +618,41 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
createTable(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR);
appendRow(tablePath2, DEFAULT_LOG_TABLE_DESCRIPTOR, 0, 10);
- int numSubtasks = 1;
- try (MockSplitEnumeratorContext<TieringSplit> context =
- new MockSplitEnumeratorContext<>(numSubtasks)) {
+ try (FlussMockSplitEnumeratorContext<TieringSplit> context =
+ new FlussMockSplitEnumeratorContext<>(3)) {
TieringSourceEnumerator enumerator =
new TieringSourceEnumerator(flussConf, context, 500);
enumerator.start();
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
- // register one reader
- int subtaskId = 0;
- registerReader(context, enumerator, subtaskId, "localhost-" +
subtaskId);
-
- // handle split request
- enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+ // register readers and handle split requests for attempt 0
+ registerReaderAndHandleSplitRequests(context, enumerator, 3, 0);
// should get one tiering split, and the split is for tablePath1
- verifyTieringSplitAssignment(context, 1, tablePath1);
+ verifyTieringSplitAssignment(context, 3, tablePath1);
// clean assignment
context.getSplitsAssignmentSequence().clear();
- // enumerator handle TieringFailOverEvent, which will mark current
tiering tablePath1 as
- // fail, and all pending splits should be clear
- enumerator.handleSourceEvent(subtaskId, new
TieringFailOverEvent());
+ // readers failover: Enumerator marks tablePath1 as failed and
clears its splits
+ // register readers and handle split requests (attempt 1)
+ registerReaderAndHandleSplitRequests(context, enumerator, 3, 1);
- // handle split request
- enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
// now, should get another one tiering split, the split is for
tablePath2 since all
// pending split for tablePath1 is clear
- verifyTieringSplitAssignment(context, 1, tablePath2);
+ verifyTieringSplitAssignment(context, 3, tablePath2);
+
+ // clean assignment
+ context.getSplitsAssignmentSequence().clear();
+
+ // readers failover again: Enumerator marks tablePath2 as failed
and clears its splits
+ // register readers and process split requests for attempt 2
+ registerReaderAndHandleSplitRequests(context, enumerator, 3, 2);
+
+ // now, should get no tiering split, since all pending split for
tablePath1 and
+ // tablePath2 is clear
+ verifyTieringSplitAssignment(context, 0, tablePath2);
}
}
@@ -697,17 +680,22 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
}
// --------------------- Test Utils ---------------------
- private void registerReader(
- MockSplitEnumeratorContext<TieringSplit> context,
+ private void registerReaderAndHandleSplitRequests(
+ FlussMockSplitEnumeratorContext<TieringSplit> context,
TieringSourceEnumerator enumerator,
- int readerId,
- String hostname) {
- context.registerReader(new ReaderInfo(readerId, hostname));
- enumerator.addReader(readerId);
+ int numSubtasks,
+ int attemptNumber) {
+ for (int subtaskId = 0; subtaskId < numSubtasks; subtaskId++) {
+ context.registerSourceReader(subtaskId, attemptNumber,
"localhost-" + subtaskId);
+ enumerator.addReader(subtaskId);
+ enumerator.handleSplitRequest(subtaskId, "localhost-" + subtaskId);
+ }
}
private void waitUntilTieringTableSplitAssignmentReady(
- MockSplitEnumeratorContext<TieringSplit> context, int
expectedSplitsNum, long sleepMs)
+ FlussMockSplitEnumeratorContext<TieringSplit> context,
+ int expectedSplitsNum,
+ long sleepMs)
throws Throwable {
while (context.getSplitsAssignmentSequence().size() <
expectedSplitsNum) {
if (!context.getPeriodicCallables().isEmpty()) {
@@ -726,11 +714,11 @@ class TieringSourceEnumeratorTest extends TieringTestBase
{
}
private void verifyTieringSplitAssignment(
- MockSplitEnumeratorContext<TieringSplit> context,
+ FlussMockSplitEnumeratorContext<TieringSplit> context,
int expectedSplitSize,
TablePath expectedTablePath)
throws Throwable {
- waitUntilTieringTableSplitAssignmentReady(context, 1, 200);
+ waitUntilTieringTableSplitAssignmentReady(context, expectedSplitSize,
200);
List<SplitsAssignment<TieringSplit>> actualAssignment =
context.getSplitsAssignmentSequence();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
new file mode 100644
index 000000000..218d7a6f8
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLake.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.MapUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
+
+/**
+ * An in-memory lake storage implementation for testing, supporting only log
tables.
+ *
+ * <p>Provides utilities for managing tables, writing records, committing
stages, and retrieving
+ * results in a test environment.
+ */
+public class TestingValuesLake {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestingValuesLake.class);
+
+ private static final Map<String, TestingValuesTable> globalTables =
+ MapUtils.newConcurrentHashMap();
+ private static final Map<String, TableFailureController>
FAILURE_CONTROLLERS =
+ MapUtils.newConcurrentHashMap();
+
+ public static TableFailureController failWhen(String tableId) {
+ return FAILURE_CONTROLLERS.computeIfAbsent(tableId, k -> new
TableFailureController());
+ }
+
+ public static List<InternalRow> getResults(String tableId) {
+ TestingValuesTable table = globalTables.get(tableId);
+ checkNotNull(table, tableId + " does not exist");
+ return table.getResult();
+ }
+
+ public static void writeRecord(String tableId, String stageId, LogRecord
record)
+ throws IOException {
+ TestingValuesTable table = globalTables.get(tableId);
+ checkNotNull(table, tableId + " does not exist");
+ TableFailureController controller = FAILURE_CONTROLLERS.get(tableId);
+ if (controller != null) {
+ controller.checkWriteShouldFail(tableId);
+ }
+ table.writeRecord(stageId, record);
+ LOG.info("Write record to stage {}: {}", stageId, record);
+ }
+
+ public static long commit(
+ String tableId, List<String> stageIds, Map<String, String>
snapshotProperties)
+ throws IOException {
+ TestingValuesTable table = globalTables.get(tableId);
+ checkNotNull(table, "commit stage %s failed, table %s does not exist",
stageIds, tableId);
+ table.commit(stageIds, snapshotProperties);
+ LOG.info("Commit table {} stage {}", tableId, stageIds);
+ return table.getSnapshotId();
+ }
+
+ public static void abort(String tableId, List<String> stageIds) {
+ TestingValuesTable table = globalTables.get(tableId);
+ checkNotNull(
+ table, "abort stage record %s failed, table %s does not
exist", stageIds, tableId);
+ table.abort(stageIds);
+ LOG.info("Abort table {} stage {}", tableId, stageIds);
+ }
+
+ public static TestingValuesTable getTable(String tableId) {
+ return globalTables.get(tableId);
+ }
+
+ public static void createTable(String tableId) {
+ if (!globalTables.containsKey(tableId)) {
+ globalTables.put(tableId, new TestingValuesTable());
+ TestingValuesTable table = globalTables.get(tableId);
+ checkNotNull(table, "create table %s failed", tableId);
+ }
+ }
+
+ /** maintain the columns, primaryKeys and records of a specific table in
memory. */
+ public static class TestingValuesTable {
+
+ private final Lock lock = new ReentrantLock();
+
+ private final List<InternalRow> records;
+ private final Map<String, List<LogRecord>> stageRecords;
+
+ private long snapshotId = -1L;
+
+ public TestingValuesTable() {
+ this.records = new ArrayList<>();
+ this.stageRecords = new HashMap<>();
+ }
+
+ public List<InternalRow> getResult() {
+ return inLock(lock, () -> new ArrayList<>(records));
+ }
+
+ public void writeRecord(String stageId, LogRecord record) {
+ inLock(
+ lock,
+ () -> {
+ this.stageRecords
+ .computeIfAbsent(stageId, k -> new
ArrayList<>())
+ .add(record);
+ });
+ }
+
+ public void commit(List<String> stageIds, Map<String, String>
snapshotProperties) {
+ inLock(
+ lock,
+ () -> {
+ for (String stageId : stageIds) {
+ List<LogRecord> stageRecords =
this.stageRecords.get(stageId);
+ stageRecords.forEach(record ->
records.add(record.getRow()));
+ this.stageRecords.remove(stageId);
+ }
+ this.snapshotId++;
+ });
+ }
+
+ public void abort(List<String> stageIds) {
+ inLock(
+ lock,
+ () -> {
+ for (String stageId : stageIds) {
+ this.stageRecords.remove(stageId);
+ }
+ });
+ }
+
+ public long getSnapshotId() {
+ return snapshotId;
+ }
+ }
+
+ /** Controller to control the failure of table write and commit. */
+ public static class TableFailureController {
+ private volatile boolean writeFailEnabled = false;
+ private final AtomicInteger writeFailTimes = new AtomicInteger(0);
+ private final AtomicInteger writeFailCounter = new AtomicInteger(0);
+
+ public TableFailureController failWriteOnce() {
+ return failWriteNext(1);
+ }
+
+ /** Force the next N write calls to throw IOException (thread-safe). */
+ public TableFailureController failWriteNext(int times) {
+ this.writeFailEnabled = true;
+ this.writeFailTimes.set(times);
+ this.writeFailCounter.set(0);
+ return this;
+ }
+
+ private void checkWriteShouldFail(String tableId) throws IOException {
+ if (!writeFailEnabled) {
+ return;
+ }
+
+ int count = writeFailCounter.incrementAndGet();
+ if (count <= writeFailTimes.get()) {
+ LOG.warn(
+ "ValuesLake FAIL_INJECTED: write() intentionally
failed [table={} attempt={}/{}]",
+ tableId,
+ count,
+ writeFailTimes.get());
+ throw new IOException(
+ String.format(
+ "ValuesLake write failure injected for test
(attempt %d of %d)",
+ count, writeFailTimes.get()));
+ } else {
+ writeFailEnabled = false;
+ }
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
new file mode 100644
index 000000000..3c5692695
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeCatalog.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.exception.TableAlreadyExistException;
+import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.TableChange;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+
+import java.util.List;
+
+/** Implementation of {@link LakeCatalog} for values lake. */
+public class TestingValuesLakeCatalog implements LakeCatalog {
+ @Override
+ public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor, Context context)
+ throws TableAlreadyExistException {
+ TestingValuesLake.createTable(tablePath.toString());
+ }
+
+ @Override
+ public void alterTable(TablePath tablePath, List<TableChange>
tableChanges, Context context)
+ throws TableNotExistException {
+ throw new RuntimeException("Not impl.");
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
new file mode 100644
index 000000000..96adaebe3
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStorage.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.values.tiering.TestingValuesLakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.metadata.TablePath;
+
+/** Implementation of {@link LakeStorage} for values lake. */
+public class TestingValuesLakeStorage implements LakeStorage {
+ @Override
+ public LakeTieringFactory<?, ?> createLakeTieringFactory() {
+ return new TestingValuesLakeTieringFactory();
+ }
+
+ @Override
+ public LakeCatalog createLakeCatalog() {
+ return new TestingValuesLakeCatalog();
+ }
+
+ @Override
+ public LakeSource<?> createLakeSource(TablePath tablePath) {
+ throw new UnsupportedOperationException("Not impl.");
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
new file mode 100644
index 000000000..678080d8c
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/TestingValuesLakeStoragePlugin.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.metadata.DataLakeFormat;
+
+/** Implementation of {@link LakeStoragePlugin} for values lake. */
+public class TestingValuesLakeStoragePlugin implements LakeStoragePlugin {
+
+ // Testing/mock implementation for values lake storage that reuses the
Lance data lake format
+ // identifier for compatibility with existing Fluss lake storage
infrastructure.
+ private static final String IDENTIFIER = DataLakeFormat.LANCE.toString();
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public LakeStorage createLakeStorage(Configuration configuration) {
+ return new TestingValuesLakeStorage();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
new file mode 100644
index 000000000..03e1866ab
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeCommitter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
+import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Implementation of {@link LakeCommitter} for values lake. */
+public class TestingValuesLakeCommitter
+ implements LakeCommitter<
+ TestingValuesLakeWriter.TestingValuesWriteResult,
+ TestingValuesLakeCommitter.TestingValuesCommittable> {
+ private final String tableId;
+
+ public TestingValuesLakeCommitter(String tableId) {
+ this.tableId = tableId;
+ }
+
+ @Override
+ public TestingValuesCommittable toCommittable(
+ List<TestingValuesLakeWriter.TestingValuesWriteResult>
valuesWriteResults)
+ throws IOException {
+ return new TestingValuesCommittable(
+ valuesWriteResults.stream()
+
.map(TestingValuesLakeWriter.TestingValuesWriteResult::getStageId)
+ .collect(Collectors.toList()));
+ }
+
+ @Override
+ public long commit(TestingValuesCommittable committable, Map<String,
String> snapshotProperties)
+ throws IOException {
+ return TestingValuesLake.commit(tableId, committable.getStageIds(),
snapshotProperties);
+ }
+
+ @Override
+ public void abort(TestingValuesCommittable committable) throws IOException
{
+ TestingValuesLake.abort(tableId, committable.getStageIds());
+ }
+
+ @Override
+ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long
latestLakeSnapshotIdOfFluss)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ /** Committable of {@link TestingValuesLake}. */
+ public static class TestingValuesCommittable implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<String> stageIdList = new ArrayList<>();
+
+ public TestingValuesCommittable(List<String> stageIds) {
+ this.stageIdList.addAll(stageIds);
+ }
+
+ public List<String> getStageIds() {
+ return stageIdList;
+ }
+ }
+
+ /** A serializer for {@link TestingValuesCommittable}. */
+ public static class ValuesCommittableSerializer
+ implements SimpleVersionedSerializer<TestingValuesCommittable> {
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(TestingValuesCommittable committable) throws
IOException {
+ return InstantiationUtils.serializeObject(committable);
+ }
+
+ @Override
+ public TestingValuesCommittable deserialize(int version, byte[]
serialized)
+ throws IOException {
+ TestingValuesCommittable valuesCommittable;
+ try {
+ valuesCommittable =
+ InstantiationUtils.deserializeObject(
+ serialized, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return valuesCommittable;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
new file mode 100644
index 000000000..3a9c78578
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeTieringFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.committer.CommitterInitContext;
+import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.writer.LakeTieringFactory;
+import org.apache.fluss.lake.writer.LakeWriter;
+import org.apache.fluss.lake.writer.WriterInitContext;
+
+import java.io.IOException;
+
+/** Implementation of {@link LakeTieringFactory} for values lake. */
+public class TestingValuesLakeTieringFactory
+ implements LakeTieringFactory<
+ TestingValuesLakeWriter.TestingValuesWriteResult,
+ TestingValuesLakeCommitter.TestingValuesCommittable> {
+ @Override
+ public LakeWriter<TestingValuesLakeWriter.TestingValuesWriteResult>
createLakeWriter(
+ WriterInitContext writerInitContext) throws IOException {
+ return new
TestingValuesLakeWriter(writerInitContext.tablePath().toString());
+ }
+
+ @Override
+ public
SimpleVersionedSerializer<TestingValuesLakeWriter.TestingValuesWriteResult>
+ getWriteResultSerializer() {
+ return new
TestingValuesLakeWriter.TestingValuesWriteResultSerializer();
+ }
+
+ @Override
+ public LakeCommitter<
+ TestingValuesLakeWriter.TestingValuesWriteResult,
+ TestingValuesLakeCommitter.TestingValuesCommittable>
+ createLakeCommitter(CommitterInitContext committerInitContext)
throws IOException {
+ return new
TestingValuesLakeCommitter(committerInitContext.tablePath().toString());
+ }
+
+ @Override
+ public
SimpleVersionedSerializer<TestingValuesLakeCommitter.TestingValuesCommittable>
+ getCommittableSerializer() {
+ return new TestingValuesLakeCommitter.ValuesCommittableSerializer();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
new file mode 100644
index 000000000..480ebe307
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/lake/values/tiering/TestingValuesLakeWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.values.tiering;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.values.TestingValuesLake;
+import org.apache.fluss.lake.writer.LakeWriter;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+
+/** Implementation of {@link LakeWriter} for values lake. */
+public class TestingValuesLakeWriter
+ implements
LakeWriter<TestingValuesLakeWriter.TestingValuesWriteResult> {
+ private final String tableId;
+ private final String writerId;
+
+ public TestingValuesLakeWriter(String tableId) {
+ this.tableId = tableId;
+ this.writerId = UUID.randomUUID().toString();
+ }
+
+ @Override
+ public void write(LogRecord record) throws IOException {
+ TestingValuesLake.writeRecord(tableId, writerId, record);
+ }
+
+ @Override
+ public TestingValuesWriteResult complete() throws IOException {
+ return new TestingValuesWriteResult(writerId);
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ /** Write result of {@link TestingValuesLake}. */
+ public static class TestingValuesWriteResult implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final String stageId;
+
+ public TestingValuesWriteResult(String stageId) {
+ this.stageId = stageId;
+ }
+
+ public String getStageId() {
+ return stageId;
+ }
+ }
+
+ /** A serializer for {@link TestingValuesWriteResult}. */
+ public static class TestingValuesWriteResultSerializer
+ implements SimpleVersionedSerializer<TestingValuesWriteResult> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(TestingValuesWriteResult valuesWriteResult)
throws IOException {
+ return InstantiationUtils.serializeObject(valuesWriteResult);
+ }
+
+ @Override
+ public TestingValuesWriteResult deserialize(int version, byte[]
serialized)
+ throws IOException {
+ TestingValuesWriteResult valuesWriteResult;
+ try {
+ valuesWriteResult =
+ InstantiationUtils.deserializeObject(
+ serialized, getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return valuesWriteResult;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
new file mode 100644
index 000000000..3fafafad0
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/resources/META-INF/services/org.apache.fluss.lake.lakestorage.LakeStoragePlugin
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.fluss.lake.values.TestingValuesLakeStoragePlugin
\ No newline at end of file
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
index 49983b603..11de6baed 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -270,11 +270,11 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
private void checkDataInIcebergPrimaryKeyTable(
TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
- Iterator<Record> acturalIterator =
getIcebergRecords(tablePath).iterator();
+ Iterator<Record> actualIterator =
getIcebergRecords(tablePath).iterator();
Iterator<InternalRow> iterator = expectedRows.iterator();
- while (iterator.hasNext() && acturalIterator.hasNext()) {
+ while (iterator.hasNext() && actualIterator.hasNext()) {
InternalRow row = iterator.next();
- Record record = acturalIterator.next();
+ Record record = actualIterator.next();
assertThat(record.get(0)).isEqualTo(row.getBoolean(0));
assertThat(record.get(1)).isEqualTo((int) row.getByte(1));
assertThat(record.get(2)).isEqualTo((int) row.getShort(2));
@@ -307,7 +307,7 @@ class IcebergTieringITCase extends
FlinkIcebergTieringTestBase {
assertThat(record.get(17)).isEqualTo(row.getChar(17,
3).toString());
assertThat(record.get(18)).isEqualTo(ByteBuffer.wrap(row.getBytes(18)));
}
- assertThat(acturalIterator.hasNext()).isFalse();
+ assertThat(actualIterator.hasNext()).isFalse();
assertThat(iterator.hasNext()).isFalse();
}