This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c5257fd23 [client] Get TableInfo via the Admin API instead of via 
metadata updater (#2016)
c5257fd23 is described below

commit c5257fd23281a560469f06caac422a48796092a2
Author: yunhong <[email protected]>
AuthorDate: Sat Nov 29 23:33:12 2025 +0800

    [client] Get TableInfo via the Admin API instead of via metadata updater 
(#2016)
---
 .../org/apache/fluss/client/FlussConnection.java   | 24 +++++---
 .../org/apache/fluss/client/admin/FlussAdmin.java  |  5 +-
 .../fluss/client/metadata/MetadataUpdater.java     | 21 -------
 .../client/table/writer/AppendWriterImpl.java      |  6 +-
 .../client/table/writer/UpsertWriterImpl.java      | 12 +++-
 .../fluss/client/write/ArrowLogWriteBatch.java     |  5 ++
 .../client/write/DynamicPartitionCreator.java      | 10 ++--
 .../fluss/client/write/IndexedLogWriteBatch.java   |  5 ++
 .../apache/fluss/client/write/KvWriteBatch.java    |  5 ++
 .../fluss/client/write/RecordAccumulator.java      | 66 ++++++++++------------
 .../java/org/apache/fluss/client/write/Sender.java | 26 ++++++---
 .../org/apache/fluss/client/write/WriteBatch.java  |  8 +++
 .../org/apache/fluss/client/write/WriteRecord.java | 32 ++++++++++-
 .../apache/fluss/client/write/WriterClient.java    | 13 +++--
 .../fluss/client/admin/FlussAdminITCase.java       |  2 +-
 .../fluss/client/write/ArrowLogWriteBatchTest.java |  2 +-
 .../client/write/IndexedLogWriteBatchTest.java     |  2 +-
 .../fluss/client/write/KvWriteBatchTest.java       |  7 ++-
 .../fluss/client/write/RecordAccumulatorTest.java  |  4 +-
 .../org/apache/fluss/client/write/SenderTest.java  |  2 +-
 .../java/org/apache/fluss/cluster/Cluster.java     | 54 +++++-------------
 .../java/org/apache/fluss/cluster/ClusterTest.java | 14 -----
 22 files changed, 170 insertions(+), 155 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java 
b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
index e6a31b9f3..f29d90ba9 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java
@@ -33,7 +33,6 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.fs.FileSystem;
-import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.GatewayClientProxy;
@@ -61,6 +60,7 @@ public final class FlussConnection implements Connection {
     private volatile LookupClient lookupClient;
     private volatile RemoteFileDownloader remoteFileDownloader;
     private volatile SecurityTokenManager securityTokenManager;
+    private volatile Admin admin;
 
     FlussConnection(Configuration conf) {
         this(conf, MetricRegistry.create(conf, null));
@@ -93,19 +93,16 @@ public final class FlussConnection implements Connection {
 
     @Override
     public Admin getAdmin() {
-        return new FlussAdmin(rpcClient, metadataUpdater);
+        return getOrCreateAdmin();
     }
 
     @Override
     public Table getTable(TablePath tablePath) {
-        // force to update the table info from server to avoid stale data in 
cache
+        // force to update the table info from server to avoid stale data in 
cache.
         metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
-        TableInfo tableInfo = 
metadataUpdater.getTableInfoOrElseThrow(tablePath);
-        return new FlussTable(this, tablePath, tableInfo);
-    }
 
-    public RpcClient getRpcClient() {
-        return rpcClient;
+        Admin admin = getOrCreateAdmin();
+        return new FlussTable(this, tablePath, 
admin.getTableInfo(tablePath).join());
     }
 
     public MetadataUpdater getMetadataUpdater() {
@@ -140,6 +137,17 @@ public final class FlussConnection implements Connection {
         return lookupClient;
     }
 
+    public Admin getOrCreateAdmin() {
+        if (admin == null) {
+            synchronized (this) {
+                if (admin == null) {
+                    admin = new FlussAdmin(rpcClient, metadataUpdater);
+                }
+            }
+        }
+        return admin;
+    }
+
     public RemoteFileDownloader getOrCreateRemoteFileDownloader() {
         if (remoteFileDownloader == null) {
             synchronized (this) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 3dc4c16c9..57dc52008 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -420,7 +420,8 @@ public class FlussAdmin implements Admin {
             OffsetSpec offsetSpec) {
         Long partitionId = null;
         
metadataUpdater.updateTableOrPartitionMetadata(physicalTablePath.getTablePath(),
 null);
-        long tableId = 
metadataUpdater.getTableId(physicalTablePath.getTablePath());
+        TableInfo tableInfo = 
getTableInfo(physicalTablePath.getTablePath()).join();
+
         // if partition name is not null, we need to check and update 
partition metadata
         if (physicalTablePath.getPartitionName() != null) {
             
metadataUpdater.updatePhysicalTableMetadata(Collections.singleton(physicalTablePath));
@@ -428,7 +429,7 @@ public class FlussAdmin implements Admin {
         }
         Map<Integer, ListOffsetsRequest> requestMap =
                 prepareListOffsetsRequests(
-                        metadataUpdater, tableId, partitionId, buckets, 
offsetSpec);
+                        metadataUpdater, tableInfo.getTableId(), partitionId, 
buckets, offsetSpec);
         Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = 
MapUtils.newConcurrentHashMap();
         for (int bucket : buckets) {
             bucketToOffsetMap.put(bucket, new CompletableFuture<>());
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
index 831a82436..b47e709b8 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java
@@ -32,7 +32,6 @@ import org.apache.fluss.exception.RetriableException;
 import org.apache.fluss.exception.StaleMetadataException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePartition;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.GatewayClientProxy;
@@ -87,10 +86,6 @@ public class MetadataUpdater {
         return cluster.getCoordinatorServer();
     }
 
-    public long getTableId(TablePath tablePath) {
-        return cluster.getTableId(tablePath);
-    }
-
     public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
         return cluster.getPartitionId(physicalTablePath);
     }
@@ -99,26 +94,10 @@ public class MetadataUpdater {
         return cluster.getPartitionIdOrElseThrow(physicalTablePath);
     }
 
-    public TableInfo getTableInfoOrElseThrow(TablePath tablePath) {
-        return cluster.getTableOrElseThrow(tablePath);
-    }
-
     public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) 
{
         return cluster.getBucketLocation(tableBucket);
     }
 
-    private Optional<TableInfo> getTableInfo(TablePath tablePath) {
-        return cluster.getTable(tablePath);
-    }
-
-    public TableInfo getTableInfoOrElseThrow(long tableId) {
-        return getTableInfo(cluster.getTablePathOrElseThrow(tableId))
-                .orElseThrow(
-                        () ->
-                                new FlussRuntimeException(
-                                        "Table not found for table id: " + 
tableId));
-    }
-
     public int leaderFor(TableBucket tableBucket) {
         Integer serverNode = cluster.leaderFor(tableBucket);
         if (serverNode == null) {
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
index e2e4ac47e..d702e9621 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java
@@ -45,6 +45,7 @@ class AppendWriterImpl extends AbstractTableWriter implements 
AppendWriter {
     private final LogFormat logFormat;
     private final IndexedRowEncoder indexedRowEncoder;
     private final FieldGetter[] fieldGetters;
+    private final TableInfo tableInfo;
 
     AppendWriterImpl(TablePath tablePath, TableInfo tableInfo, WriterClient 
writerClient) {
         super(tablePath, tableInfo, writerClient);
@@ -60,6 +61,7 @@ class AppendWriterImpl extends AbstractTableWriter implements 
AppendWriter {
         this.logFormat = tableInfo.getTableConfig().getLogFormat();
         this.indexedRowEncoder = new IndexedRowEncoder(tableInfo.getRowType());
         this.fieldGetters = 
InternalRow.createFieldGetters(tableInfo.getRowType());
+        this.tableInfo = tableInfo;
     }
 
     /**
@@ -77,10 +79,10 @@ class AppendWriterImpl extends AbstractTableWriter 
implements AppendWriter {
         final WriteRecord record;
         if (logFormat == LogFormat.INDEXED) {
             IndexedRow indexedRow = encodeIndexedRow(row);
-            record = WriteRecord.forIndexedAppend(physicalPath, indexedRow, 
bucketKey);
+            record = WriteRecord.forIndexedAppend(tableInfo, physicalPath, 
indexedRow, bucketKey);
         } else {
             // ARROW format supports general internal row
-            record = WriteRecord.forArrowAppend(physicalPath, row, bucketKey);
+            record = WriteRecord.forArrowAppend(tableInfo, physicalPath, row, 
bucketKey);
         }
         return send(record).thenApply(ignored -> APPEND_SUCCESS);
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
index bdfc48364..904b85c2a 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java
@@ -52,6 +52,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements 
UpsertWriter {
     private final KvFormat kvFormat;
     private final RowEncoder rowEncoder;
     private final FieldGetter[] fieldGetters;
+    private final TableInfo tableInfo;
 
     UpsertWriterImpl(
             TablePath tablePath,
@@ -75,6 +76,7 @@ class UpsertWriterImpl extends AbstractTableWriter implements 
UpsertWriter {
         this.kvFormat = tableInfo.getTableConfig().getKvFormat();
         this.rowEncoder = RowEncoder.create(kvFormat, rowType);
         this.fieldGetters = InternalRow.createFieldGetters(rowType);
+        this.tableInfo = tableInfo;
     }
 
     private static void sanityCheck(
@@ -129,7 +131,12 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
                 bucketKeyEncoder == primaryKeyEncoder ? key : 
bucketKeyEncoder.encodeKey(row);
         WriteRecord record =
                 WriteRecord.forUpsert(
-                        getPhysicalPath(row), encodeRow(row), key, bucketKey, 
targetColumns);
+                        tableInfo,
+                        getPhysicalPath(row),
+                        encodeRow(row),
+                        key,
+                        bucketKey,
+                        targetColumns);
         return send(record).thenApply(ignored -> UPSERT_SUCCESS);
     }
 
@@ -146,7 +153,8 @@ class UpsertWriterImpl extends AbstractTableWriter 
implements UpsertWriter {
         byte[] bucketKey =
                 bucketKeyEncoder == primaryKeyEncoder ? key : 
bucketKeyEncoder.encodeKey(row);
         WriteRecord record =
-                WriteRecord.forDelete(getPhysicalPath(row), key, bucketKey, 
targetColumns);
+                WriteRecord.forDelete(
+                        tableInfo, getPhysicalPath(row), key, bucketKey, 
targetColumns);
         return send(record).thenApply(ignored -> DELETE_SUCCESS);
     }
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
index dec1f55d1..578ff2a25 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/ArrowLogWriteBatch.java
@@ -62,6 +62,11 @@ public class ArrowLogWriteBatch extends WriteBatch {
                 MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, 
outputView, true);
     }
 
+    @Override
+    public boolean isLogBatch() {
+        return true;
+    }
+
     @Override
     public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) 
throws Exception {
         InternalRow row = writeRecord.getRow();
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
index d5cc14ff8..d1d23d39f 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/DynamicPartitionCreator.java
@@ -23,7 +23,6 @@ import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.ResolvedPartitionSpec;
-import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.ExceptionUtils;
 
@@ -64,7 +63,8 @@ public class DynamicPartitionCreator {
         this.fatalErrorHandler = fatalErrorHandler;
     }
 
-    public void checkAndCreatePartitionAsync(PhysicalTablePath 
physicalTablePath) {
+    public void checkAndCreatePartitionAsync(
+            PhysicalTablePath physicalTablePath, List<String> partitionKeys) {
         String partitionName = physicalTablePath.getPartitionName();
         if (partitionName == null) {
             // no need to check and create partition
@@ -89,7 +89,7 @@ public class DynamicPartitionCreator {
                     // if the partition is not in inflightPartitionsToCreate, 
we should create it.
                     // this means that the partition is not being created by 
other threads.
                     LOG.info("Dynamically creating partition partition for 
{}", physicalTablePath);
-                    createPartition(physicalTablePath);
+                    createPartition(physicalTablePath, partitionKeys);
                 } else {
                     // if the partition is already in 
inflightPartitionsToCreate, we should skip
                     // creating it.
@@ -121,12 +121,10 @@ public class DynamicPartitionCreator {
         return idExist;
     }
 
-    private void createPartition(PhysicalTablePath physicalTablePath) {
+    private void createPartition(PhysicalTablePath physicalTablePath, 
List<String> partitionKeys) {
         String partitionName = physicalTablePath.getPartitionName();
         TablePath tablePath = physicalTablePath.getTablePath();
         checkArgument(partitionName != null, "Partition name shouldn't be 
null.");
-        TableInfo tableInfo = 
metadataUpdater.getTableInfoOrElseThrow(tablePath);
-        List<String> partitionKeys = tableInfo.getPartitionKeys();
         ResolvedPartitionSpec resolvedPartitionSpec =
                 ResolvedPartitionSpec.fromPartitionName(partitionKeys, 
partitionName);
 
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
index 5d1e27080..fb9655658 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IndexedLogWriteBatch.java
@@ -61,6 +61,11 @@ public final class IndexedLogWriteBatch extends WriteBatch {
                 MemoryLogRecordsIndexedBuilder.builder(schemaId, writeLimit, 
outputView, true);
     }
 
+    @Override
+    public boolean isLogBatch() {
+        return true;
+    }
+
     @Override
     public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) 
throws Exception {
         checkNotNull(callback, "write callback must be not null");
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
index 63652db22..963e9866b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/KvWriteBatch.java
@@ -67,6 +67,11 @@ public class KvWriteBatch extends WriteBatch {
         this.targetColumns = targetColumns;
     }
 
+    @Override
+    public boolean isLogBatch() {
+        return false;
+    }
+
     @Override
     public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) 
throws Exception {
         // currently, we throw exception directly when the target columns of 
the write record is
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 2cfd74da3..9dd8bade6 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -171,17 +171,16 @@ public final class RecordAccumulator {
             boolean abortIfBatchFull)
             throws Exception {
         PhysicalTablePath physicalTablePath = 
writeRecord.getPhysicalTablePath();
-
-        TableInfo tableInfo = 
cluster.getTableOrElseThrow(physicalTablePath.getTablePath());
+        TableInfo tableInfo = writeRecord.getTableInfo();
+        // The metadata may return null for the partition id, but it is fine 
to pass null here,
+        // because we will fill the partitionId in bucketReady() before send 
the batch.
         Optional<Long> partitionIdOpt = 
cluster.getPartitionId(physicalTablePath);
         BucketAndWriteBatches bucketAndWriteBatches =
                 writeBatches.computeIfAbsent(
                         physicalTablePath,
                         k ->
                                 new BucketAndWriteBatches(
-                                        tableInfo.getTableId(),
-                                        partitionIdOpt.orElse(null),
-                                        tableInfo.isPartitioned()));
+                                        partitionIdOpt.orElse(null), 
tableInfo.isPartitioned()));
 
         // We keep track of the number of appending thread to make sure we do 
not miss batches in
         // abortIncompleteBatches().
@@ -209,13 +208,7 @@ public final class RecordAccumulator {
             synchronized (dq) {
                 RecordAppendResult appendResult =
                         appendNewBatch(
-                                writeRecord,
-                                callback,
-                                bucketId,
-                                tableInfo,
-                                dq,
-                                memorySegments,
-                                cluster);
+                                writeRecord, callback, bucketId, tableInfo, 
dq, memorySegments);
                 if (appendResult.newBatchCreated) {
                     memorySegments = Collections.emptyList();
                 }
@@ -333,7 +326,6 @@ public final class RecordAccumulator {
                         physicalTablePath,
                         k ->
                                 new BucketAndWriteBatches(
-                                        tableBucket.getTableId(),
                                         tableBucket.getPartitionId(),
                                         physicalTablePath.getPartitionName() 
!= null));
         return bucketAndWriteBatches.batches.computeIfAbsent(
@@ -507,22 +499,28 @@ public final class RecordAccumulator {
             }
 
             int bucketId = entry.getKey();
-            TableBucket tableBucket = 
cluster.getTableBucket(physicalTablePath, bucketId);
-            Integer leader = cluster.leaderFor(tableBucket);
-            if (leader == null) {
-                // This is a bucket for which leader is not known, but 
messages are
-                // available to send. Note that entries are currently not 
removed from
-                // batches when deque is empty.
+            Optional<Long> tableIdOpt = 
cluster.getTableId(physicalTablePath.getTablePath());
+            if (!tableIdOpt.isPresent()) {
                 unknownLeaderTables.add(physicalTablePath);
             } else {
-                nextReadyCheckDelayMs =
-                        batchReady(
-                                exhausted,
-                                leader,
-                                waitedTimeMs,
-                                full,
-                                readyNodes,
-                                nextReadyCheckDelayMs);
+                TableBucket tableBucket =
+                        cluster.getTableBucket(tableIdOpt.get(), 
physicalTablePath, bucketId);
+                Integer leader = cluster.leaderFor(tableBucket);
+                if (leader == null) {
+                    // This is a bucket for which leader is not known, but 
messages are
+                    // available to send. Note that entries are currently not 
removed from
+                    // batches when deque is empty.
+                    unknownLeaderTables.add(physicalTablePath);
+                } else {
+                    nextReadyCheckDelayMs =
+                            batchReady(
+                                    exhausted,
+                                    leader,
+                                    waitedTimeMs,
+                                    full,
+                                    readyNodes,
+                                    nextReadyCheckDelayMs);
+                }
             }
         }
 
@@ -570,8 +568,7 @@ public final class RecordAccumulator {
             int bucketId,
             TableInfo tableInfo,
             Deque<WriteBatch> deque,
-            List<MemorySegment> segments,
-            Cluster cluster)
+            List<MemorySegment> segments)
             throws Exception {
         RecordAppendResult appendResult = tryAppend(writeRecord, callback, 
deque);
         if (appendResult != null) {
@@ -591,7 +588,7 @@ public final class RecordAccumulator {
                     new KvWriteBatch(
                             bucketId,
                             physicalTablePath,
-                            schemaId,
+                            tableInfo.getSchemaId(),
                             tableInfo.getTableConfig().getKvFormat(),
                             outputView.getPreAllocatedSize(),
                             outputView,
@@ -609,7 +606,7 @@ public final class RecordAccumulator {
                     new ArrowLogWriteBatch(
                             bucketId,
                             physicalTablePath,
-                            schemaId,
+                            tableInfo.getSchemaId(),
                             arrowWriter,
                             outputView,
                             clock.milliseconds());
@@ -618,7 +615,7 @@ public final class RecordAccumulator {
                     new IndexedLogWriteBatch(
                             bucketId,
                             physicalTablePath,
-                            schemaId,
+                            tableInfo.getSchemaId(),
                             outputView.getPreAllocatedSize(),
                             outputView,
                             clock.milliseconds());
@@ -928,15 +925,12 @@ public final class RecordAccumulator {
 
     /** Per table bucket and write batches. */
     private static class BucketAndWriteBatches {
-        public final long tableId;
         public final boolean isPartitionedTable;
         public volatile @Nullable Long partitionId;
         // Write batches for each bucket in queue.
         public final Map<Integer, Deque<WriteBatch>> batches = new 
CopyOnWriteMap<>();
 
-        public BucketAndWriteBatches(
-                long tableId, @Nullable Long partitionId, boolean 
isPartitionedTable) {
-            this.tableId = tableId;
+        public BucketAndWriteBatches(@Nullable Long partitionId, boolean 
isPartitionedTable) {
             this.partitionId = partitionId;
             this.isPartitionedTable = isPartitionedTable;
         }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index a81c69ee7..e9c285306 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -30,7 +30,6 @@ import org.apache.fluss.exception.RetriableException;
 import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.PbProduceLogRespForBucket;
 import org.apache.fluss.rpc.messages.PbPutKvRespForBucket;
@@ -56,6 +55,7 @@ import java.util.Set;
 
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeProduceLogRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makePutKvRequest;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /* This file is based on source code of Apache Kafka Project 
(https://kafka.apache.org/), licensed by the Apache
@@ -377,18 +377,17 @@ public class Sender implements Runnable {
         } else {
             writeBatchByTable.forEach(
                     (tableId, writeBatches) -> {
-                        TableInfo tableInfo = 
metadataUpdater.getTableInfoOrElseThrow(tableId);
-                        if (tableInfo.hasPrimaryKey()) {
-                            sendPutKvRequestAndHandleResponse(
+                        if (isLogBatches(writeBatches)) {
+                            sendProduceLogRequestAndHandleResponse(
                                     gateway,
-                                    makePutKvRequest(
+                                    makeProduceLogRequest(
                                             tableId, acks, 
maxRequestTimeoutMs, writeBatches),
                                     tableId,
                                     recordsByBucket);
                         } else {
-                            sendProduceLogRequestAndHandleResponse(
+                            sendPutKvRequestAndHandleResponse(
                                     gateway,
-                                    makeProduceLogRequest(
+                                    makePutKvRequest(
                                             tableId, acks, 
maxRequestTimeoutMs, writeBatches),
                                     tableId,
                                     recordsByBucket);
@@ -397,6 +396,19 @@ public class Sender implements Runnable {
         }
     }
 
+    /**
+     * Check whether the given batches are log batches. We assume all the 
batches are of the same
+     * type.
+     *
+     * @param batches the batches to check, must not be empty.
+     * @return true if the given batches are log batches, false if they are kv 
batches.
+     */
+    private boolean isLogBatches(List<ReadyWriteBatch> batches) {
+        checkArgument(!batches.isEmpty(), "batches must not be empty");
+        ReadyWriteBatch batch = batches.get(0);
+        return batch.writeBatch().isLogBatch();
+    }
+
     private void sendProduceLogRequestAndHandleResponse(
             TabletServerGateway gateway,
             ProduceLogRequest request,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
index 49766612c..a35a9f58b 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java
@@ -62,6 +62,14 @@ public abstract class WriteBatch {
         this.recordCount = 0;
     }
 
+    /**
+     * Check if the batch is log batch, e.g., ArrowLogBatch or 
IndexedLogBatch, and should use
+     * ProduceLog request. Otherwise, it is a kv batch, and should use PutKv 
request.
+     *
+     * @return true if log batch, false if kv batch
+     */
+    public abstract boolean isLogBatch();
+
     /**
      * try to append one write record to the record batch.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
index 0c1427e6f..e1ab2ec24 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java
@@ -19,6 +19,7 @@ package org.apache.fluss.client.write;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.record.DefaultKvRecord;
 import org.apache.fluss.record.DefaultKvRecordBatch;
 import org.apache.fluss.record.IndexedLogRecord;
@@ -41,6 +42,7 @@ public final class WriteRecord {
 
     /** Create a write record for upsert operation and partial-upsert 
operation. */
     public static WriteRecord forUpsert(
+            TableInfo tableInfo,
             PhysicalTablePath tablePath,
             BinaryRow row,
             byte[] key,
@@ -52,6 +54,7 @@ public final class WriteRecord {
         int estimatedSizeInBytes =
                 DefaultKvRecord.sizeOf(key, row) + 
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
         return new WriteRecord(
+                tableInfo,
                 tablePath,
                 key,
                 bucketKey,
@@ -63,6 +66,7 @@ public final class WriteRecord {
 
     /** Create a write record for delete operation and partial-delete update. 
*/
     public static WriteRecord forDelete(
+            TableInfo tableInfo,
             PhysicalTablePath tablePath,
             byte[] key,
             byte[] bucketKey,
@@ -72,6 +76,7 @@ public final class WriteRecord {
         int estimatedSizeInBytes =
                 DefaultKvRecord.sizeOf(key, null) + 
DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
         return new WriteRecord(
+                tableInfo,
                 tablePath,
                 key,
                 bucketKey,
@@ -83,11 +88,15 @@ public final class WriteRecord {
 
     /** Create a write record for append operation for indexed format. */
     public static WriteRecord forIndexedAppend(
-            PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] 
bucketKey) {
+            TableInfo tableInfo,
+            PhysicalTablePath tablePath,
+            IndexedRow row,
+            @Nullable byte[] bucketKey) {
         checkNotNull(row);
         int estimatedSizeInBytes =
                 IndexedLogRecord.sizeOf(row) + 
recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
         return new WriteRecord(
+                tableInfo,
                 tablePath,
                 null,
                 bucketKey,
@@ -99,13 +108,23 @@ public final class WriteRecord {
 
     /** Creates a write record for append operation for Arrow format. */
     public static WriteRecord forArrowAppend(
-            PhysicalTablePath tablePath, InternalRow row, @Nullable byte[] 
bucketKey) {
+            TableInfo tableInfo,
+            PhysicalTablePath tablePath,
+            InternalRow row,
+            @Nullable byte[] bucketKey) {
         checkNotNull(row);
         // the write row maybe GenericRow, can't estimate the size.
         // it is not necessary to estimate size for Arrow format.
         int estimatedSizeInBytes = -1;
         return new WriteRecord(
-                tablePath, null, bucketKey, row, WriteFormat.ARROW_LOG, null, 
estimatedSizeInBytes);
+                tableInfo,
+                tablePath,
+                null,
+                bucketKey,
+                row,
+                WriteFormat.ARROW_LOG,
+                null,
+                estimatedSizeInBytes);
     }
 
     // 
------------------------------------------------------------------------------------------
@@ -120,8 +139,10 @@ public final class WriteRecord {
     // will be null if it's not for partial update
     private final @Nullable int[] targetColumns;
     private final int estimatedSizeInBytes;
+    private final TableInfo tableInfo;
 
     private WriteRecord(
+            TableInfo tableInfo,
             PhysicalTablePath physicalTablePath,
             @Nullable byte[] key,
             @Nullable byte[] bucketKey,
@@ -129,6 +150,7 @@ public final class WriteRecord {
             WriteFormat writeFormat,
             @Nullable int[] targetColumns,
             int estimatedSizeInBytes) {
+        this.tableInfo = tableInfo;
         this.physicalTablePath = physicalTablePath;
         this.key = key;
         this.bucketKey = bucketKey;
@@ -142,6 +164,10 @@ public final class WriteRecord {
         return physicalTablePath;
     }
 
+    public TableInfo getTableInfo() {
+        return tableInfo;
+    }
+
     public @Nullable byte[] getKey() {
         return key;
     }
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
index e91c04843..962369ef7 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java
@@ -126,8 +126,7 @@ public class WriterClient {
                             "Failed to construct writer. Max request size: %d 
bytes, Idempotence enabled: %b",
                             maxRequestSizeLocal,
                             idempotenceManagerLocal != null
-                                    ? 
idempotenceManagerLocal.idempotenceEnabled()
-                                    : false),
+                                    && 
idempotenceManagerLocal.idempotenceEnabled()),
                     t);
         }
     }
@@ -174,15 +173,17 @@ public class WriterClient {
         try {
             throwIfWriterClosed();
 
+            TableInfo tableInfo = record.getTableInfo();
             PhysicalTablePath physicalTablePath = 
record.getPhysicalTablePath();
-            
dynamicPartitionCreator.checkAndCreatePartitionAsync(physicalTablePath);
+            dynamicPartitionCreator.checkAndCreatePartitionAsync(
+                    physicalTablePath, tableInfo.getPartitionKeys());
 
             // maybe create bucket assigner.
             Cluster cluster = metadataUpdater.getCluster();
             BucketAssigner bucketAssigner =
                     bucketAssignerMap.computeIfAbsent(
                             physicalTablePath,
-                            k -> createBucketAssigner(physicalTablePath, conf, 
cluster));
+                            k -> createBucketAssigner(tableInfo, 
physicalTablePath, conf));
 
             // Append the record to the accumulator.
             int bucketId = bucketAssigner.assignBucket(record.getBucketKey(), 
cluster);
@@ -332,6 +333,7 @@ public class WriterClient {
         if (sender != null) {
             sender.forceClose();
         }
+
         LOG.info("Writer closed.");
     }
 
@@ -340,8 +342,7 @@ public class WriterClient {
     }
 
     private BucketAssigner createBucketAssigner(
-            PhysicalTablePath physicalTablePath, Configuration conf, Cluster 
cluster) {
-        TableInfo tableInfo = 
cluster.getTableOrElseThrow(physicalTablePath.getTablePath());
+            TableInfo tableInfo, PhysicalTablePath physicalTablePath, 
Configuration conf) {
         int bucketNumber = tableInfo.getNumBuckets();
         List<String> bucketKeys = tableInfo.getBucketKeys();
         if (!bucketKeys.isEmpty()) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index 5273b5895..10890f947 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -129,7 +129,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
     void testMultiClient() throws Exception {
         Admin admin1 = conn.getAdmin();
         Admin admin2 = conn.getAdmin();
-        assertThat(admin1).isNotSameAs(admin2);
+        assertThat(admin1).isEqualTo(admin2);
 
         TableInfo t1 = admin1.getTableInfo(DEFAULT_TABLE_PATH).get();
         TableInfo t2 = admin2.getTableInfo(DEFAULT_TABLE_PATH).get();
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
index 04ba2d8b7..6b7eb6fe7 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/ArrowLogWriteBatchTest.java
@@ -288,7 +288,7 @@ public class ArrowLogWriteBatchTest {
     }
 
     private WriteRecord createWriteRecord(GenericRow row) {
-        return WriteRecord.forArrowAppend(DATA1_PHYSICAL_TABLE_PATH, row, 
null);
+        return WriteRecord.forArrowAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
     private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int 
maxSizeInBytes) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
index 07544e8da..3081814a5 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java
@@ -199,7 +199,7 @@ public class IndexedLogWriteBatchTest {
     }
 
     private WriteRecord createWriteRecord() {
-        return WriteRecord.forIndexedAppend(DATA1_PHYSICAL_TABLE_PATH, row, 
null);
+        return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
     private IndexedLogWriteBatch createLogWriteBatch(TableBucket tb, long 
baseLogOffset)
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
index d9e7272b8..5321da0f2 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/KvWriteBatchTest.java
@@ -200,7 +200,12 @@ class KvWriteBatchTest {
 
     protected WriteRecord createWriteRecord() {
         return WriteRecord.forUpsert(
-                PhysicalTablePath.of(DATA1_TABLE_PATH_PK), row, key, key, 
null);
+                DATA1_TABLE_INFO_PK,
+                PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
+                row,
+                key,
+                key,
+                null);
     }
 
     private KvWriteBatch createKvWriteBatch(TableBucket tb) throws Exception {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index e5d71a927..c721e974c 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -242,7 +242,7 @@ class RecordAccumulatorTest {
         while (true) {
             GenericRow row = row(1, RandomStringUtils.random(10));
             PhysicalTablePath tablePath = 
PhysicalTablePath.of(ZSTD_TABLE_INFO.getTablePath());
-            WriteRecord record = WriteRecord.forArrowAppend(tablePath, row, 
null);
+            WriteRecord record = WriteRecord.forArrowAppend(ZSTD_TABLE_INFO, 
tablePath, row, null);
             // append until the batch is full
             if (accum.append(record, writeCallback, cluster, bucketId, 
false).batchIsFull) {
                 break;
@@ -543,7 +543,7 @@ class RecordAccumulatorTest {
      * format , see {@link #updateCluster(List)}.
      */
     private WriteRecord createRecord(IndexedRow row) {
-        return WriteRecord.forIndexedAppend(DATA1_PHYSICAL_TABLE_PATH, row, 
null);
+        return WriteRecord.forIndexedAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null);
     }
 
     private Cluster updateCluster(List<BucketLocation> bucketLocations) {
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 7209a315d..5618d224d 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -707,7 +707,7 @@ final class SenderTest {
     private void appendToAccumulator(TableBucket tb, GenericRow row, 
WriteCallback writeCallback)
             throws Exception {
         accumulator.append(
-                WriteRecord.forArrowAppend(DATA1_PHYSICAL_TABLE_PATH, row, 
null),
+                WriteRecord.forArrowAppend(DATA1_TABLE_INFO, 
DATA1_PHYSICAL_TABLE_PATH, row, null),
                 writeCallback,
                 metadataUpdater.getCluster(),
                 tb.getBucket(),
diff --git a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java 
b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
index 9aef86261..ccd73e22c 100644
--- a/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
+++ b/fluss-common/src/main/java/org/apache/fluss/cluster/Cluster.java
@@ -20,7 +20,6 @@ package org.apache.fluss.cluster;
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
@@ -159,17 +158,6 @@ public final class Cluster {
         return aliveTabletServers;
     }
 
-    /**
-     * Get the table id for this table.
-     *
-     * @param tablePath the table path
-     * @return the table id, if metadata cache contains the table path, return 
the table path,
-     *     otherwise return {@link TableInfo#UNKNOWN_TABLE_ID}
-     */
-    public long getTableId(TablePath tablePath) {
-        return tableIdByPath.getOrDefault(tablePath, 
TableInfo.UNKNOWN_TABLE_ID);
-    }
-
     /** Get the table path for this table id. */
     public Optional<TablePath> getTablePath(long tableId) {
         return Optional.ofNullable(pathByTableId.get(tableId));
@@ -224,41 +212,31 @@ public final class Cluster {
         return availableLocationsByPath.getOrDefault(physicalTablePath, 
Collections.emptyList());
     }
 
-    /** Get the table info for this table. */
+    /**
+     * Get the table info for this table.
+     *
+     * <p>TODO this method need to be remove, use Admin getTableInfo instead.
+     */
     public Optional<TableInfo> getTable(TablePath tablePath) {
         return Optional.ofNullable(tableInfoByPath.get(tablePath));
     }
 
+    public Optional<Long> getTableId(TablePath tablePath) {
+        return Optional.ofNullable(tableIdByPath.get(tablePath));
+    }
+
     /** Get the partition id for this partition. */
     public Optional<Long> getPartitionId(PhysicalTablePath physicalTablePath) {
         return Optional.ofNullable(partitionsIdByPath.get(physicalTablePath));
     }
 
-    /** Return whether the cluster contains the given physical table path or 
not. */
-    public boolean contains(PhysicalTablePath physicalTablePath) {
-        if (physicalTablePath.getPartitionName() == null) {
-            return getTable(physicalTablePath.getTablePath()).isPresent();
-        } else {
-            return getPartitionId(physicalTablePath).isPresent();
-        }
-    }
-
-    public TableInfo getTableOrElseThrow(TablePath tablePath) {
-        return getTable(tablePath)
-                .orElseThrow(
-                        () ->
-                                new IllegalArgumentException(
-                                        String.format(
-                                                "table: %s not found in 
cluster", tablePath)));
-    }
-
-    public TableBucket getTableBucket(PhysicalTablePath physicalTablePath, int 
bucketId) {
-        TableInfo tableInfo = 
getTableOrElseThrow(physicalTablePath.getTablePath());
+    public TableBucket getTableBucket(
+            long tableId, PhysicalTablePath physicalTablePath, int bucketId) {
         if (physicalTablePath.getPartitionName() != null) {
             Long partitionId = getPartitionIdOrElseThrow(physicalTablePath);
-            return new TableBucket(tableInfo.getTableId(), partitionId, 
bucketId);
+            return new TableBucket(tableId, partitionId, bucketId);
         } else {
-            return new TableBucket(tableInfo.getTableId(), bucketId);
+            return new TableBucket(tableId, bucketId);
         }
     }
 
@@ -286,12 +264,6 @@ public final class Cluster {
         return Optional.ofNullable(partitionNameById.get(partitionId));
     }
 
-    /** Get the latest schema for the given table. */
-    public Optional<SchemaInfo> getSchema(TablePath tablePath) {
-        return getTable(tablePath)
-                .map(tableInfo -> new SchemaInfo(tableInfo.getSchema(), 
tableInfo.getSchemaId()));
-    }
-
     /** Get the table path to table id map. */
     public Map<TablePath, Long> getTableIdByPath() {
         return tableIdByPath;
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java 
b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
index 943163e44..458a79eb1 100644
--- a/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
+++ b/fluss-common/src/test/java/org/apache/fluss/cluster/ClusterTest.java
@@ -18,7 +18,6 @@
 package org.apache.fluss.cluster;
 
 import org.apache.fluss.metadata.PhysicalTablePath;
-import org.apache.fluss.metadata.SchemaInfo;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 
@@ -34,11 +33,9 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
-import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_INFO;
 import static org.apache.fluss.record.TestData.DATA2_TABLE_PATH;
@@ -87,17 +84,6 @@ class ClusterTest {
                 .isInstanceOf(UnsupportedOperationException.class);
     }
 
-    @Test
-    void testGetTable() {
-        Cluster cluster = createCluster(aliveTabletServersById);
-        
assertThat(cluster.getTable(DATA1_TABLE_PATH).get()).isEqualTo(DATA1_TABLE_INFO);
-        
assertThat(cluster.getTable(DATA2_TABLE_PATH).get()).isEqualTo(DATA2_TABLE_INFO);
-        assertThat(cluster.getSchema(DATA1_TABLE_PATH).get())
-                .isEqualTo(new SchemaInfo(DATA1_SCHEMA, 1));
-        assertThat(cluster.getSchema(DATA2_TABLE_PATH).get())
-                .isEqualTo(new SchemaInfo(DATA2_SCHEMA, 1));
-    }
-
     @Test
     void testInvalidMetaAndUpdate() {
         Cluster cluster = createCluster(aliveTabletServersById);


Reply via email to