This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 34b7688a6 [client] Change SchemaNotExistException as retriable 
exception. (#2193)
34b7688a6 is described below

commit 34b7688a6bad68c4f51d80ec11f917f4e18d6e8a
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Dec 29 14:50:49 2025 +0800

    [client] Change SchemaNotExistException as retriable exception. (#2193)
---
 .../client/admin/ClientToServerITCaseBase.java     |  4 --
 .../fluss/client/table/FlussTableITCase.java       |  3 -
 .../batch/KvSnapshotBatchScannerITCase.java        |  1 -
 .../org/apache/fluss/client/write/SenderTest.java  | 80 +++++++++++++++++++---
 .../fluss/exception/SchemaNotExistException.java   |  2 +-
 .../fluss/flink/sink/FlinkTableSinkITCase.java     |  4 --
 .../source/lookup/FlinkLookupFunctionTest.java     |  1 -
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  1 -
 .../server/tablet/TestTabletServerGateway.java     |  4 +-
 .../server/testutils/FlussClusterExtension.java    | 29 --------
 10 files changed, 74 insertions(+), 55 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
index 8270ed788..311d5a94f 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/ClientToServerITCaseBase.java
@@ -248,10 +248,6 @@ public abstract class ClientToServerITCaseBase {
         }
     }
 
-    public static void waitAllSchemaSync(TablePath tablePath, int schemaId) {
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, schemaId);
-    }
-
     protected static void verifyRows(
             RowType rowType,
             Map<Long, List<InternalRow>> actualRows,
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
index 27b6690a9..a65529648 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java
@@ -275,7 +275,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                                         TableChange.ColumnPosition.last())),
                         false)
                 .get();
-        waitAllSchemaSync(tablePath, 2);
         Table newSchemaTable = conn.getTable(tableInfo.getTablePath());
         // schema change case1: read new data with new schema.
         verifyPutAndLookup(newSchemaTable, new Object[] {2, "b", "bb"});
@@ -363,7 +362,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                                         TableChange.ColumnPosition.last())),
                         false)
                 .get();
-        waitAllSchemaSync(tablePath, 2);
         try (Connection connection = 
ConnectionFactory.createConnection(clientConf);
                 Table newSchemaTable = 
connection.getTable(tableInfo.getTablePath())) {
             // schema change case1: read new data with new schema.
@@ -1056,7 +1054,6 @@ class FlussTableITCase extends ClientToServerITCaseBase {
                                             
TableChange.ColumnPosition.last())),
                             false)
                     .get();
-            waitAllSchemaSync(tablePath, 2);
             try (Connection connection = 
ConnectionFactory.createConnection(clientConf);
                     Table newSchemaTable = connection.getTable(tablePath)) {
                 UpsertWriter oldSchemaUpsertWriter = 
table.newUpsert().createWriter();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
index 62b5bdc13..ea1aaf237 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java
@@ -139,7 +139,6 @@ class KvSnapshotBatchScannerITCase extends 
ClientToServerITCaseBase {
                                         TableChange.ColumnPosition.last())),
                         false)
                 .get();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
 
         Schema newSchema =
                 Schema.newBuilder()
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index f1d9481a6..b0f61c9ea 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -25,13 +25,20 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.exception.TimeoutException;
+import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.encode.CompactedKeyEncoder;
 import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
+import org.apache.fluss.rpc.entity.PutKvResultForBucket;
 import org.apache.fluss.rpc.messages.ApiMessage;
 import org.apache.fluss.rpc.messages.ProduceLogRequest;
 import org.apache.fluss.rpc.messages.ProduceLogResponse;
+import org.apache.fluss.rpc.messages.PutKvResponse;
 import org.apache.fluss.rpc.protocol.Errors;
 import org.apache.fluss.server.tablet.TestTabletServerGateway;
 import org.apache.fluss.utils.clock.SystemClock;
@@ -52,11 +59,19 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
+import static org.apache.fluss.rpc.protocol.Errors.SCHEMA_NOT_EXIST;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeProduceLogResponse;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makePutKvResponse;
+import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -93,7 +108,7 @@ final class SenderTest {
         appendToAccumulator(tb1, row(1, "a"), future::complete);
         sender.runOnce();
         assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 
1));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
 
         sender.runOnce();
         assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -118,7 +133,7 @@ final class SenderTest {
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
         long offset = 0;
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 
1));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
 
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
@@ -131,13 +146,13 @@ final class SenderTest {
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
 
         // timeout error can retry send.
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(1);
 
         // Even if timeout error can retry send, but the retry number > 
maxRetries, which will
         // return error.
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
         sender1.runOnce();
         assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
         assertThat(future.get())
@@ -168,12 +183,12 @@ final class SenderTest {
         assertThat(firstRequest).isInstanceOf(ProduceLogRequest.class);
         assertThat(hasIdempotentRecords(tb1, (ProduceLogRequest) 
firstRequest)).isFalse();
         // first complete with retriable error.
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
         sender.runOnce();
         assertThat(future.isDone()).isFalse();
 
         // second retry complete.
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, 0L, 1L));
         sender.runOnce();
         assertThat(future.isDone()).isTrue();
         assertThat(future.get()).isNull();
@@ -690,7 +705,7 @@ final class SenderTest {
         sender.runOnce();
         assertThat(sender.numOfInFlightBatches(tb1)).isEqualTo(1);
 
-        finishProduceLogRequest(tb1, 0, createProduceLogResponse(tb1, offset, 
1));
+        finishRequest(tb1, 0, createProduceLogResponse(tb1, offset, 1));
 
         // send again, should send nothing since no batch in queue
         sender.runOnce();
@@ -698,9 +713,44 @@ final class SenderTest {
         assertThat(future.get()).isNull();
     }
 
+    @Test
+    void testRetryPutKeyWithSchemaNotExistException() throws Exception {
+        TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 0);
+
+        BinaryRow row = compactedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+        int[] pkIndex = DATA1_SCHEMA_PK.getPrimaryKeyIndexes();
+        byte[] key = new CompactedKeyEncoder(DATA1_ROW_TYPE, 
pkIndex).encodeKey(row);
+        CompletableFuture<Exception> future = new CompletableFuture<>();
+        accumulator.append(
+                WriteRecord.forUpsert(
+                        DATA1_TABLE_INFO_PK,
+                        PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                        row,
+                        key,
+                        key,
+                        WriteFormat.COMPACTED_KV,
+                        null),
+                future::complete,
+                metadataUpdater.getCluster(),
+                0,
+                false);
+        sender.runOnce();
+        finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 
SCHEMA_NOT_EXIST));
+        assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
+
+        // retry to put kv request again
+        sender.runOnce();
+        assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(1);
+        finishRequest(tableBucket, 0, createPutKvResponse(tableBucket, 1));
+        assertThat(sender.numOfInFlightBatches(tableBucket)).isEqualTo(0);
+        assertThat(future.get()).isNull();
+    }
+
     private TestingMetadataUpdater initializeMetadataUpdater() {
-        return new TestingMetadataUpdater(
-                Collections.singletonMap(DATA1_TABLE_PATH, DATA1_TABLE_INFO));
+        Map<TablePath, TableInfo> tableInfos = new HashMap<>();
+        tableInfos.put(DATA1_TABLE_PATH, DATA1_TABLE_INFO);
+        tableInfos.put(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK);
+        return new TestingMetadataUpdater(tableInfos);
     }
 
     private void appendToAccumulator(TableBucket tb, GenericRow row, 
WriteCallback writeCallback)
@@ -721,7 +771,7 @@ final class SenderTest {
         return gateway.getRequest(index);
     }
 
-    private void finishProduceLogRequest(TableBucket tb, int index, 
ProduceLogResponse response) {
+    private void finishRequest(TableBucket tb, int index, ApiMessage response) 
{
         TestTabletServerGateway gateway =
                 (TestTabletServerGateway)
                         metadataUpdater.newTabletServerClientForNode(
@@ -762,6 +812,16 @@ final class SenderTest {
                 Collections.singletonList(new ProduceLogResultForBucket(tb, 
error.toApiError())));
     }
 
+    private PutKvResponse createPutKvResponse(TableBucket tb, long endOffset) {
+        return makePutKvResponse(
+                Collections.singletonList(new PutKvResultForBucket(tb, 
endOffset)));
+    }
+
+    private PutKvResponse createPutKvResponse(TableBucket tb, Errors error) {
+        return makePutKvResponse(
+                Collections.singletonList(new PutKvResultForBucket(tb, 
error.toApiError())));
+    }
+
     private Sender setupWithIdempotenceState() {
         return setupWithIdempotenceState(createIdempotenceManager(false));
     }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
index 9a73d98f5..5e02e987a 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/SchemaNotExistException.java
@@ -25,7 +25,7 @@ import org.apache.fluss.annotation.PublicEvolving;
  * @since 0.1
  */
 @PublicEvolving
-public class SchemaNotExistException extends ApiException {
+public class SchemaNotExistException extends RetriableException {
 
     public SchemaNotExistException(String message, Throwable cause) {
         super(message, cause);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
index 68fa741d4..15e59295c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java
@@ -188,7 +188,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
         // add new column
         tEnv.executeSql("alter table sink_test add add_column int").await();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, 
"sink_test"), 2);
         tEnv.executeSql(
                         "INSERT INTO sink_test "
                                 + "VALUES (4, 3504, 'jerry', 4), "
@@ -417,7 +416,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
         // add new column
         tEnv.executeSql("alter table sink_test add add_column int").await();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, 
"sink_test"), 2);
         tEnv.executeSql(
                         "INSERT INTO sink_test "
                                 + "VALUES (4, 3504, 'jerry', 4), "
@@ -505,7 +503,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         CloseableIterator<Row> rowIter = tEnv.executeSql("select * from 
sink_test").collect();
         // add new column
         tEnv.executeSql("alter table sink_test add add_column string").await();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, 
"sink_test"), 2);
         tEnv.executeSql(
                         "INSERT INTO sink_test(add_column, a ) VALUES 
('new_value', 1), ('new_value', 2)")
                 .await();
@@ -819,7 +816,6 @@ abstract class FlinkTableSinkITCase extends 
AbstractTestBase {
         tBatchEnv
                 .executeSql(String.format("alter table %s add new_added_column 
int", tableName))
                 .await();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(TablePath.of(DEFAULT_DB, 
tableName), 2);
         tBatchEnv
                 .executeSql("UPDATE " + tableName + " SET new_added_column = 2 
WHERE a = 4")
                 .await();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
index 9903e7c45..4d136c865 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java
@@ -173,7 +173,6 @@ class FlinkLookupFunctionTest extends FlinkTestBase {
                                         TableChange.ColumnPosition.last())),
                         false)
                 .get();
-        FLUSS_CLUSTER_EXTENSION.waitAllSchemaSync(tablePath, 2);
 
         try (Table table = conn.getTable(tablePath)) {
             UpsertWriter upsertWriter = table.newUpsert().createWriter();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 0dde0a53d..d6cc08666 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -326,7 +326,6 @@ public final class KvTablet {
 
     private void validateSchemaId(short schemaIdOfNewData, short 
latestSchemaId) {
         if (schemaIdOfNewData > latestSchemaId || schemaIdOfNewData < 0) {
-            // TODO: we may need to support retriable exception here
             throw new SchemaNotExistException(
                     "Invalid schema id: "
                             + schemaIdOfNewData
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 8ebcc5748..500d197fc 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -184,7 +184,9 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
 
     @Override
     public CompletableFuture<PutKvResponse> putKv(PutKvRequest request) {
-        return null;
+        CompletableFuture<PutKvResponse> response = new CompletableFuture<>();
+        requests.add(Tuple2.of(request, response));
+        return response;
     }
 
     @Override
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index a1ee83fa6..bab31f10b 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -25,7 +25,6 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.MemorySize;
 import org.apache.fluss.fs.local.LocalFileSystem;
 import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -52,7 +51,6 @@ import org.apache.fluss.server.entity.NotifyLeaderAndIsrData;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
 import org.apache.fluss.server.metadata.ServerInfo;
-import org.apache.fluss.server.metadata.ServerSchemaCache;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.server.replica.ReplicaManager;
@@ -66,7 +64,6 @@ import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.server.zk.data.PartitionAssignment;
 import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
 import org.apache.fluss.server.zk.data.TableAssignment;
-import org.apache.fluss.server.zk.data.TableRegistration;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.clock.SystemClock;
@@ -663,32 +660,6 @@ public final class FlussClusterExtension
                 });
     }
 
-    public void waitAllSchemaSync(TablePath tablePath, int schemaId) {
-        ZooKeeperClient zkClient = getZooKeeperClient();
-        retry(
-                Duration.ofMinutes(1),
-                () -> {
-                    TableRegistration tableRegistration = 
zkClient.getTable(tablePath).get();
-                    int bucketCount = tableRegistration.bucketCount;
-                    long tableId = tableRegistration.tableId;
-                    for (int bucketId = 0; bucketId < bucketCount; bucketId++) 
{
-                        TableBucket tableBucket = new TableBucket(tableId, 
bucketId);
-                        Optional<LeaderAndIsr> leaderAndIsrOpt =
-                                zkClient.getLeaderAndIsr(tableBucket);
-                        assertThat(leaderAndIsrOpt).isPresent();
-                        int leader = leaderAndIsrOpt.get().leader();
-                        TabletServer tabletServer = 
getTabletServerById(leader);
-                        ServerSchemaCache serverSchemaCache =
-                                
tabletServer.getMetadataCache().getServerSchemaCache();
-                        Map<Long, SchemaInfo> latestSchemaByTablePath =
-                                serverSchemaCache.getLatestSchemaByTableId();
-                        
assertThat(latestSchemaByTablePath).containsKey(tableId);
-                        
assertThat(latestSchemaByTablePath.get(tableId).getSchemaId())
-                                .isEqualTo(schemaId);
-                    }
-                });
-    }
-
     /**
      * Wait until some log segments copy to remote. This method can only 
ensure that there are at
      * least one log segment has been copied to remote, but it does not ensure 
that all log segments

Reply via email to