This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 34b7688a6 [client] Change SchemaNotExistException as retriable
exception. (#2193)
34b7688a6 is described below
commit 34b7688a6bad68c4f51d80ec11f917f4e18d6e8a
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Dec 29 14:50:49 2025 +0800
[client] Change SchemaNotExistException as retriable exception. (#2193)
---
.../client/admin/ClientToServerITCaseBase.java | 4 --
.../fluss/client/table/FlussTableITCase.java | 3 -
.../batch/KvSnapshotBatchScannerITCase.java | 1 -
.../org/apache/fluss/client/write/SenderTest.java | 80 +++++++++++++++++++---
.../fluss/exception/SchemaNotExistException.java | 2 +-
.../fluss/flink/sink/FlinkTableSinkITCase.java | 4 --
.../source/lookup/FlinkLookupFunctionTest.java | 1 -
.../java/org/apache/fluss/server/kv/KvTablet.java | 1 -
.../server/tablet/TestTabletServerGateway.java | 4 +-
.../server/testutils/FlussClusterExtension.java | 29 --------
10 files changed, 74 insertions(+), 55 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 8270ed788..311d5a94f 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -248,10 +248,6 @@ public abstract class ClientToServerITCaseBase {
}
}
- public static void waitAllSchemaSync(TablePath tablePath, int schemaId) {
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, schemaId);
- }
-
protected static void verifyRows(
RowType rowType,
Map<Long, List<InternalRow>> actualRows,
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 27b6690a9..a65529648 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -275,7 +275,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
TableChange.ColumnPosition.last())),
false)
.get();
- waitAllSchemaSync(tablePath, 2);
Table newSchemaTable = conn.getTable(tableInfo.getTablePath());
// schema change case1: read new data with new schema.
verifyPutAndLookup(newSchemaTable, new Object[] {2, "b", "bb"});
@@ -363,7 +362,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
TableChange.ColumnPosition.last())),
false)
.get();
- waitAllSchemaSync(tablePath, 2);
try (Connection connection =
ConnectionFactory.createConnection(clientConf);
Table newSchemaTable =
connection.getTable(tableInfo.getTablePath())) {
// schema change case1: read new data with new schema.
@@ -1056,7 +1054,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
TableChange.ColumnPosition.last())),
false)
.get();
- waitAllSchemaSync(tablePath, 2);
try (Connection connection =
ConnectionFactory.createConnection(clientConf);
Table newSchemaTable = connection.getTable(tablePath)) {
UpsertWriter oldSchemaUpsertWriter =
table.newUpsert().createWriter();
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index 62b5bdc13..ea1aaf237 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -139,7 +139,6 @@ class KvSnapshotBatchScannerITCase extends
ClientToServerITCaseBase {
TableChange.ColumnPosition.last())),
false)
.get();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
Schema newSchema =
Schema.newBuilder()
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index f1d9481a6..b0f61c9ea 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -25,13 +25,20 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.TimeoutException;
+import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
+import org.apache.fluss.rpc.entity.PutKvResultForBucket;
import org.apache.fluss.rpc.messages.ApiMessage;
import org.apache.fluss.rpc.messages.ProduceLogRequest;
import org.apache.fluss.rpc.messages.ProduceLogResponse;
+import org.apache.fluss.rpc.messages.PutKvResponse;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.tablet.TestTabletServerGateway;
import org.apache.fluss.utils.clock.SystemClock;
@@ -52,11 +59,19 @@ import java.util.concurrent.CompletableFuture;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
@@ -93,7 +108,7 @@ final class SenderTest {
appendToAccumulator(tb1, row(1, "a"), future::complete);
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset,
1));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -118,7 +133,7 @@ final class SenderTest {
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
long offset = 0;
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset,
1));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -131,13 +146,13 @@ final class SenderTest {
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
// timeout error can retry send.
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
// Even if timeout error can retry send, but the retry number >
maxRetries, which will
// return error.
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
sender1.runOnce();
assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
assertThat(future.get())
@@ -168,12 +183,12 @@ final class SenderTest {
assertThat(firstRequest).isInstanceOf(ProduceLogRequest.class);
assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest)
firstRequest)).isFalse();
// first complete with retriable error.
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
sender.runOnce();
assertThat(future.isDone()).isFalse();
// second retry complete.
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
sender.runOnce();
assertThat(future.isDone()).isTrue();
assertThat(future.get()).isNull();
@@ -690,7 +705,7 @@ final class SenderTest {
sender.runOnce();
assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
- finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset,
1));
+ finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
// send again, should send nothing since no batch in queue
sender.runOnce();
@@ -698,9 +713,44 @@ final class SenderTest {
assertThat(future.get()).isNull();
}
+ @Test
+ void testRetryPutKeyWithSchemaNotExistException() throws Exception {
+ TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 0);
+
+ BinaryRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+ int[] pkIndex = DATA1_SCHEMA_PK.getPrimaryKeyIndexes();
+ byte[] key = new CompactedKeyEncoder(DATA1_ROW_TYPE,
pkIndex).encodeKey(row);
+ CompletableFuture<Exception> future = new CompletableFuture<>();
+ accumulator.append(
+ WriteRecord.forUpsert(
+ DATA1_TABLE_INFO_PK,
+ PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+ row,
+ key,
+ key,
+ WriteFormat.COMPACTED_KV,
+ null),
+ future::complete,
+ metadataUpdater.getCluster(),
+ 0,
+ false);
+ sender.runOnce();
+ finishRequest(tableBucket, 0, createPutKvResponse(tableBucket,
SCHEMA_NOT_EXIST));
+ assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
+
+ // retry to put kv request again
+ sender.runOnce();
+ assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(1);
+ finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 1));
+ assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
+ assertThat(future.get()).isNull();
+ }
+
private TestingMetadataUpdater initializeMetadataUpdater() {
- return new TestingMetadataUpdater(
- Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO));
+ Map<TablePath, TableInfo> tableInfos = new HashMap<>();
+ tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
+ tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
+ return new TestingMetadataUpdater(tableInfos);
}
private void appendToAccumulator(TableBucket tb, GenericRow row,
WriteCallback writeCallback)
@@ -721,7 +771,7 @@ final class SenderTest {
return gateway.getRequest(index);
}
- private void finishProduceLogRequest(TableBucket tb, int index,
ProduceLogResponse response) {
+ private void finishRequest(TableBucket tb, int index, ApiMessage response)
{
TestTabletServerGateway gateway =
(TestTabletServerGateway)
metadataUpdater.newTabletServerClientForNode(
@@ -762,6 +812,16 @@ final class SenderTest {
Collections.singletonList(new ProduceLogResultForBucket(tb,
error.toApiError())));
}
+ private PutKvResponse createPutKvResponse(TableBucket tb, long endOffset) {
+ return makePutKvResponse(
+ Collections.singletonList(new PutKvResultForBucket(tb,
endOffset)));
+ }
+
+ private PutKvResponse createPutKvResponse(TableBucket tb, Errors error) {
+ return makePutKvResponse(
+ Collections.singletonList(new PutKvResultForBucket(tb,
error.toApiError())));
+ }
+
private Sender setupWithIdempotenceState() {
return setupWithIdempotenceState(createIdempotenceManager(false));
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
index 9a73d98f5..5e02e987a 100644
---
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
@@ -25,7 +25,7 @@ import org.apache.fluss.annotation.PublicEvolving;
* @since 0.1
*/
@PublicEvolving
-public class SchemaNotExistException extends ApiException {
+public class SchemaNotExistException extends RetriableException {
public SchemaNotExistException(String message, Throwable cause) {
super(message, cause);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 68fa741d4..15e59295c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -188,7 +188,6 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from
sink_test").collect();
// add new column
tEnv.executeSql("alter table sink_test add add_column int").await();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB,
"sink_test"), 2);
tEnv.executeSql(
"INSERT INTO sink_test "
+ "VALUES (4, 3504, 'jerry', 4), "
@@ -417,7 +416,6 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from
sink_test").collect();
// add new column
tEnv.executeSql("alter table sink_test add add_column int").await();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB,
"sink_test"), 2);
tEnv.executeSql(
"INSERT INTO sink_test "
+ "VALUES (4, 3504, 'jerry', 4), "
@@ -505,7 +503,6 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
CloseableIterator<Row> rowIter = tEnv.executeSql("select * from
sink_test").collect();
// add new column
tEnv.executeSql("alter table sink_test add add_column string").await();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB,
"sink_test"), 2);
tEnv.executeSql(
"INSERT INTO sink_test(add_column, a ) VALUES
('new_value', 1), ('new_value', 2)")
.await();
@@ -819,7 +816,6 @@ abstract class FlinkTableSinkITCase extends
AbstractTestBase {
tBatchEnv
.executeSql(String.format("alter table %s add new_added_column
int", tableName))
.await();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB,
tableName), 2);
tBatchEnv
.executeSql("UPDATE " + tableName + " SET new_added_column = 2
WHERE a = 4")
.await();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 9903e7c45..4d136c865 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -173,7 +173,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
TableChange.ColumnPosition.last())),
false)
.get();
- FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
try (Table table = conn.getTable(tablePath)) {
UpsertWriter upsertWriter = table.newUpsert().createWriter();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0dde0a53d..d6cc08666 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -326,7 +326,6 @@ public final class KvTablet {
private void validateSchemaId(short schemaIdOfNewData, short
latestSchemaId) {
if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
- // TODO: we may need to support retriable exception here
throw new SchemaNotExistException(
"Invalid schema id: "
+ schemaIdOfNewData
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 8ebcc5748..500d197fc 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -184,7 +184,9 @@ public class TestTabletServerGateway implements
TabletServerGateway {
@Override
public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
- return null;
+ CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
+ requests.add(Tuple2.of(request, response));
+ return response;
}
@Override
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index a1ee83fa6..bab31f10b 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -25,7 +25,6 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.fs.local.LocalFileSystem;
import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -52,7 +51,6 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
import org.apache.fluss.server.metadata.ServerInfo;
-import org.apache.fluss.server.metadata.ServerSchemaCache;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.server.replica.ReplicaManager;
@@ -66,7 +64,6 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.PartitionAssignment;
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
import org.apache.fluss.server.zk.data.TableAssignment;
-import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.SystemClock;
@@ -663,32 +660,6 @@ public final class FlussClusterExtension
});
}
- public void waitAllSchemaSync(TablePath tablePath, int schemaId) {
- ZooKeeperClient zkClient = getZooKeeperClient();
- retry(
- Duration.ofMinutes(1),
- () -> {
- TableRegistration tableRegistration =
zkClient.getTable(tablePath).get();
- int bucketCount = tableRegistration.bucketCount;
- long tableId = tableRegistration.tableId;
- for (int bucketId = 0; bucketId < bucketCount; bucketId++)
{
- TableBucket tableBucket = new TableBucket(tableId,
bucketId);
- Optional<LeaderAndIsr> leaderAndIsrOpt =
- zkClient.getLeaderAndIsr(tableBucket);
- assertThat(leaderAndIsrOpt).isPresent();
- int leader = leaderAndIsrOpt.get().leader();
- TabletServer tabletServer =
getTabletServerById(leader);
- ServerSchemaCache serverSchemaCache =
-
tabletServer.getMetadataCache().getServerSchemaCache();
- Map<Long, SchemaInfo> latestSchemaByTablePath =
- serverSchemaCache.getLatestSchemaByTableId();
-
assertThat(latestSchemaByTablePath).containsKey(tableId);
-
assertThat(latestSchemaByTablePath.get(tableId).getSchemaId())
- .isEqualTo(schemaId);
- }
- });
- }
-
/**
* Wait until some log segments copy to remote. This method can only
ensure that there are at
* least one log segment has been copied to remote, but it does not ensure
that all log segments