This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch ci-add-column in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 639c492037d6a91d214b1e145ed8574113ced652 Author: Jark Wu <[email protected]> AuthorDate: Mon Dec 1 20:23:30 2025 +0800 WIP --- .../org/apache/fluss/server/RpcServiceBase.java | 2 +- .../fluss/server/coordinator/MetadataManager.java | 4 +- .../server/metadata/TabletServerMetadataCache.java | 3 +- .../fluss/server/replica/AdjustIsrManager.java | 2 +- .../org/apache/fluss/server/replica/Replica.java | 4 +- .../replica/fetcher/ReplicaFetcherManager.java | 2 +- .../replica/fetcher/ReplicaFetcherThread.java | 2 +- .../fluss/server/utils/RpcGatewayManager.java | 2 +- .../fluss/server/utils/ServerRpcMessageUtils.java | 2 +- .../org/apache/fluss/server/utils/timer/Timer.java | 2 +- .../fluss/server/utils/timer/TimingWheel.java | 4 +- .../apache/fluss/server/zk/ZooKeeperClient.java | 50 +++++++++++++++++----- .../server/authorizer/DefaultAuthorizerTest.java | 4 +- .../CompletedSnapshotStoreManagerTest.java | 8 ++-- .../coordinator/CoordinatorChannelManagerTest.java | 6 +-- .../fluss/server/coordinator/TableManagerTest.java | 2 - .../server/kv/prewrite/KvPreWriteBufferTest.java | 4 +- .../kv/snapshot/CompletedSnapshotStoreTest.java | 2 +- .../kv/snapshot/KvTabletSnapshotTargetTest.java | 2 +- .../ZooKeeperCompletedSnapshotStoreTest.java | 4 +- .../apache/fluss/server/log/LogManagerTest.java | 2 +- .../org/apache/fluss/server/log/LogTabletTest.java | 4 +- .../server/log/remote/RemoteLogTabletTest.java | 4 +- .../server/metadata/MetadataUpdateITCase.java | 2 +- .../fluss/server/replica/AdjustIsrITCase.java | 2 +- .../fluss/server/replica/AdjustIsrManagerTest.java | 2 +- .../replica/HighWatermarkPersistenceTest.java | 2 +- .../replica/RemoveOfflineReplicaFromIsrITCase.java | 2 +- .../apache/fluss/server/replica/ReplicaTest.java | 4 +- .../fluss/server/replica/ReplicaTestBase.java | 4 +- .../fluss/server/tablet/TabletServiceITCase.java | 4 +- .../server/testutils/FlussClusterExtension.java | 4 +- fluss-test-coverage/pom.xml | 4 +- 33 files changed, 88 insertions(+), 63 deletions(-) diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index ededf510f..82f3d5ec2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -384,7 +384,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR @Override public CompletableFuture<GetFileSystemSecurityTokenResponse> getFileSystemSecurityToken( GetFileSystemSecurityTokenRequest request) { - // TODO:add ACL for per-table in https://github.com/apache/fluss/issues/752 + // TODO: add ACL for per-table in https://github.com/apache/fluss/issues/752 try { // In order to avoid repeatedly obtaining security token, cache it for a while. long currentTimeMs = System.currentTimeMillis(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index bf2cfc9fb..3d63c8ba1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -576,14 +576,14 @@ public class MetadataManager { zookeeperClient.getTables(tablePaths); // currently, we don't support schema evolution, so all schemas are version 1 Map<TablePath, SchemaInfo> tablePath2SchemaInfos = - zookeeperClient.getV1Schemas(tablePaths); + zookeeperClient.getLatestSchemas(tablePaths); for (TablePath tablePath : tablePaths) { if (!tablePath2TableRegistrations.containsKey(tablePath)) { throw new TableNotExistException("Table '" + tablePath + "' does not exist."); } if (!tablePath2SchemaInfos.containsKey(tablePath)) { throw new SchemaNotExistException( - "Schema for '" + tablePath + "' with schema_id=1 does not exist."); + "Schema for '" + tablePath + "' does not exist."); } TableRegistration tableReg = tablePath2TableRegistrations.get(tablePath); SchemaInfo schemaInfo = tablePath2SchemaInfos.get(tablePath); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java index 666a74aae..f4ecff6f0 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metadata/TabletServerMetadataCache.java @@ -70,7 +70,6 @@ public class TabletServerMetadataCache implements ServerMetadataCache { private final ServerSchemaCache serverSchemaCache; - // todo: replace this in test with schemaMetadataManager. public TabletServerMetadataCache(MetadataManager metadataManager) { this.serverMetadataSnapshot = ServerMetadataSnapshot.empty(); this.metadataManager = metadataManager; @@ -436,7 +435,7 @@ public class TabletServerMetadataCache implements ServerMetadataCache { } @VisibleForTesting - public ServerSchemaCache getSchemaMetadataManager() { + public ServerSchemaCache getServerSchemaCache() { return serverSchemaCache; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java index 251d670b6..d3b5d63c9 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java @@ -77,7 +77,7 @@ public class AdjustIsrManager { public CompletableFuture<LeaderAndIsr> submit( TableBucket tableBucket, LeaderAndIsr leaderAndIsr) { - // TODOadd coordinatorEpoch. + // TODO add coordinatorEpoch. CompletableFuture<LeaderAndIsr> future = new CompletableFuture<>(); AdjustIsrItem adjustIsrItem = new AdjustIsrItem(tableBucket, leaderAndIsr, future); boolean enqueued = unsentAdjustIsrMap.putIfAbsent(tableBucket, adjustIsrItem) == null; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index c1933bd5c..d8c88cbd7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -1518,7 +1518,7 @@ public final class Replica { List<Integer> isrToSend = new ArrayList<>(isrState.isr()); isrToSend.add(newInSyncReplicaId); - // TODOadd server epoch to isr. + // TODO add server epoch to isr. LeaderAndIsr newLeaderAndIsr = new LeaderAndIsr( @@ -1540,7 +1540,7 @@ public final class Replica { // erroneously advance the HW if the `AdjustIsr` were to fail. Hence, the "maximal ISR" // for `PendingShrinkIsr` is the current ISR. - // TODOadd server epoch to isr. + // TODO add server epoch to isr. LeaderAndIsr newLeaderAndIsr = new LeaderAndIsr( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java index 06e94e9ae..9e4479b3e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManager.java @@ -225,7 +225,7 @@ public class ReplicaFetcherManager { fetcherThread.getLeader().leaderServerId(), initialFetchStatusMap); } catch (InterruptedException e) { - LOG.error("Interrupted whileadd buckets to fetcher threads.", e); + LOG.error("Interrupted while add buckets to fetcher threads.", e); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java index 76195a0df..398bfb6ff 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java @@ -80,7 +80,7 @@ final class ReplicaFetcherThread extends ShutdownableThread { private final LeaderEndpoint leader; private final int fetchBackOffMs; - // manuallyadd timout logic in here, todo remove this timeout logic if + // manually add timout logic in here, todo remove this timeout logic if // we support global request timeout in #279 private final int timeoutSeconds = 30; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java index 062eec73d..c89fca6f7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/RpcGatewayManager.java @@ -65,7 +65,7 @@ public class RpcGatewayManager<T extends RpcGateway> implements AutoCloseable { } /** - * Add a server to the manager. It'll create a new gateway for the server andadd it to the + * Add a server to the manager. It'll create a new gateway for the server and add it to the * manager. If the server has already existed, it'll remove the already existing server before * adding the new one. */ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 165fd23e9..815771166 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -339,7 +339,7 @@ public class ServerRpcMessageUtils { .collect(Collectors.toList()); } - private static @Nullable TableChange.ColumnPosition toColumnPosition(int columnPositionType) { + private static TableChange.ColumnPosition toColumnPosition(int columnPositionType) { ColumnPositionType opType = ColumnPositionType.from(columnPositionType); switch (opType) { case LAST: diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java index 270d334ee..ea6aead7b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/Timer.java @@ -35,7 +35,7 @@ public interface Timer { * Add a new task to this executor. It will be executed after the task's delay (beginning from * the time of submission). * - * @param timerTask the task toadd + * @param timerTask the task to add */ void add(TimerTask timerTask); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java index 934777c93..1dab9f229 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java @@ -108,8 +108,8 @@ import java.util.concurrent.atomic.AtomicInteger; * while priority queue based timers takes O(log N) for both insert and delete where N is the number * of items in the queue. * - * <p>This class is not thread-safe. There should not be anyadd calls while advanceClock is - * executing. It is caller's responsibility to enforce it. Simultaneousadd calls are thread-safe. + * <p>This class is not thread-safe. There should not be any add calls while advanceClock is + * executing. It is caller's responsibility to enforce it. Simultaneous add calls are thread-safe. */ @NotThreadSafe final class TimingWheel { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index e27ffd9c3..e80006e60 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -536,18 +536,46 @@ public class ZooKeeperClient implements AutoCloseable { "tables registration"); } - /** Get the v1 schema for given tables in ZK. */ - public Map<TablePath, SchemaInfo> getV1Schemas(Collection<TablePath> tablePaths) + /** Get the latest schema for given tables in ZK. */ + public Map<TablePath, SchemaInfo> getLatestSchemas(Collection<TablePath> tablePaths) throws Exception { - Map<String, TablePath> path2TablePathMap = - tablePaths.stream() - .collect(toMap(p -> SchemaZNode.path(p, DEFAULT_SCHEMA_ID), path -> path)); + Map<String, TablePath> schemaChildren2TablePathMap = + tablePaths.stream().collect(toMap(SchemasZNode::path, path -> path)); + List<ZkGetChildrenResponse> childrenResponses = + getChildrenInBackground(schemaChildren2TablePathMap.keySet()); + // get the schema ids for each table + Map<TablePath, List<String>> schemaIdsForTables = + processGetChildrenResponses( + childrenResponses, + response -> schemaChildren2TablePathMap.get(response.getPath()), + "schema children for tables"); + + // get the schema info for each latest schema id + Map<TablePath, Integer> latestSchemaIdMap = new HashMap<>(); + Map<String, TablePath> path2TablePathMap = new HashMap<>(); + schemaIdsForTables.forEach( + (tp, schemaIds) -> { + int latestSchemaId = + schemaIds.stream().map(Integer::parseInt).reduce(Math::max).orElse(0); + latestSchemaIdMap.put(tp, latestSchemaId); + path2TablePathMap.put(SchemaZNode.path(tp, latestSchemaId), tp); + }); + List<ZkGetDataResponse> responses = getDataInBackground(path2TablePathMap.keySet()); - return processGetDataResponses( - responses, - resp -> path2TablePathMap.get(resp.getPath()), - b -> new SchemaInfo(SchemaZNode.decode(b), DEFAULT_SCHEMA_ID), - "schema"); + Map<TablePath, Schema> schemasForTables = + processGetDataResponses( + responses, + resp -> path2TablePathMap.get(resp.getPath()), + SchemaZNode::decode, + "schema"); + + Map<TablePath, SchemaInfo> result = new HashMap<>(); + schemasForTables.forEach( + (tp, schema) -> { + int schemaId = latestSchemaIdMap.get(tp); + result.put(tp, new SchemaInfo(schema, schemaId)); + }); + return result; } /** Update the table in ZK. */ @@ -810,7 +838,7 @@ public class ZooKeeperClient implements AutoCloseable { /** Register schema to ZK metadata and return the schema id. */ public int registerSchema(TablePath tablePath, Schema schema) throws Exception { - return registerSchema(tablePath, schema, getCurrentSchemaId(tablePath) + 1); + return registerSchema(tablePath, schema, DEFAULT_SCHEMA_ID); } public int registerSchema(TablePath tablePath, Schema schema, int schemaId) throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java index abc575fd5..af69caffe 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java @@ -487,7 +487,7 @@ public class DefaultAuthorizerTest { Set<AccessControlEntry> expectedAcls = new HashSet<>(); for (int i = 0; i < 50; i++) { - // each task represent that addColumn acl to different user on same resource. + // each task represent that add acl to different user on same resource. final AccessControlEntry accessControlEntry = createAclEntry(String.valueOf(i), "host-1", READ); int finalI = i; @@ -550,7 +550,7 @@ public class DefaultAuthorizerTest { // generate 50 concurrent acl operation tasks. List<Runnable> concurrentTasks = new ArrayList<>(); for (int i = 0; i < 50; i++) { - // each task represent that addColumn acl to same user on same resource. + // each task represent that add acl to same user on same resource. Runnable runnable = () -> { addAcls( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java index 4460cfc26..b8fec59ca 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java @@ -99,13 +99,13 @@ class CompletedSnapshotStoreManagerTest { CompletedSnapshotStoreManager completedSnapshotStoreManager = createCompletedSnapshotStoreManager(maxNumberOfSnapshotsToRetain); - // addColumn snapshots for a series of buckets + // add snapshots for a series of buckets Set<TableBucket> tableBuckets = createTableBuckets(2, 3); int snapshotNum = 3; Map<TableBucket, CompletedSnapshot> tableBucketLatestCompletedSnapshots = new HashMap<>(); for (TableBucket tableBucket : tableBuckets) { - // addColumn some snapshots + // add some snapshots for (int snapshot = 0; snapshot < snapshotNum; snapshot++) { CompletedSnapshot completedSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, snapshot); @@ -137,7 +137,7 @@ class CompletedSnapshotStoreManagerTest { assertThat(completedSnapshot) .isEqualTo(tableBucketLatestCompletedSnapshots.get(tableBucket)); - // addColumn a new snapshot + // add a new snapshot long snapshotId = completedSnapshot.getSnapshotID() + 1; completedSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, snapshotId); addCompletedSnapshot(completedSnapshotStoreManager, completedSnapshot); @@ -169,7 +169,7 @@ class CompletedSnapshotStoreManagerTest { Set<TableBucket> tableBuckets = createTableBuckets(1, 2); int snapshotNum = 3; for (TableBucket tableBucket : tableBuckets) { - // addColumn some snapshots + // add some snapshots for (int snapshot = 0; snapshot < snapshotNum; snapshot++) { CompletedSnapshot completedSnapshot = KvTestUtils.mockCompletedSnapshot(tempDir, tableBucket, snapshot); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java index 581edafcc..bd4467c0a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java @@ -69,14 +69,14 @@ class CoordinatorChannelManagerTest { // now, shouldn't send as we already remove the tablet server checkSendRequest(coordinatorChannelManager, server0.id(), false); - // test addColumn tablet server - // before addColumn, shouldn't send + // test add tablet server + // before add, shouldn't send ServerNode server1 = tabletServersNode.get(1); checkSendRequest(coordinatorChannelManager, server1.id(), false); coordinatorChannelManager.addTabletServer(server1); - // after addColumn the tablet server, should send + // after add the tablet server, should send // try to send message checkSendRequest(coordinatorChannelManager, server1.id(), true); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index b8b348bdb..3c417efe8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -211,8 +211,6 @@ class TableManagerTest { assertThat(coordinatorContext.getAllReplicasForTable(tableId)).isEmpty(); } - void testSchemaChange() throws Exception {} - @Test void testResumeDeletionAfterRestart() throws Exception { // first, create a table diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java index 74d145bda..b39ec39ac 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java @@ -163,13 +163,13 @@ class KvPreWriteBufferTest { } assertThat(getValue(buffer, "key6")).isNull(); - // addColumn delete records. + // add delete records. elementCount = 5; bufferDelete(buffer, "key4", elementCount++); bufferDelete(buffer, "key3", elementCount++); assertThat(getValue(buffer, "key3")).isNull(); - // addColumn update records + // add update records bufferPut(buffer, "key2", "value2-1", elementCount++); bufferPut(buffer, "key1", "value1-1", elementCount++); assertThat(getValue(buffer, "key1")).isEqualTo("value1-1"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java index 87fb23259..8d45628d8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStoreTest.java @@ -161,7 +161,7 @@ class CompletedSnapshotStoreTest { final CompletedSnapshot ckp = getSnapshot(ckpId); assertThatThrownBy(() -> completedSnapshotStore.add(ckp)) - .as("We should get an exception when addColumn snapshot to failed..") + .as("We should get an exception when add snapshot to failed..") .hasMessageContaining(errMsg) .isInstanceOf(FlussException.class); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java index 2451e7ecc..9f9ea078d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/KvTabletSnapshotTargetTest.java @@ -247,7 +247,7 @@ class KvTabletSnapshotTargetTest { final String errMsg = "Add to snapshot handle failed."; AtomicBoolean shouldFail = new AtomicBoolean(true); // we use a store will fail when the variable shouldFail is true - // addColumn the snapshot to store will fail + // add the snapshot to store will fail CompletedSnapshotHandleStore completedSnapshotHandleStore = TestCompletedSnapshotHandleStore.newBuilder() .setAddFunction( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java index 5b890acd6..c6e90ff63 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/ZooKeeperCompletedSnapshotStoreTest.java @@ -94,8 +94,8 @@ class ZooKeeperCompletedSnapshotStoreTest { } /** - * Tests that the snapshot does not exist in the store when we fail to addColumn it into the - * store (i.e., there exists an exception thrown by the method). + * Tests that the snapshot does not exist in the store when we fail to add it into the store + * (i.e., there exists an exception thrown by the method). */ @Test void testAddSnapshotWithFailedRemove(@TempDir Path tmpDir) throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java index f4e536b86..2bbdd2a1c 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java @@ -79,7 +79,7 @@ final class LogManagerTest extends LogTestBase { private TableBucket tableBucket2; private LogManager logManager; - // TODOadd more tests refer to kafka's LogManagerTest. + // TODO add more tests refer to kafka's LogManagerTest. @BeforeAll static void baseBeforeAll() { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java index 7af086823..7b88a53a5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java @@ -73,7 +73,7 @@ final class LogTabletTest extends LogTestBase { private FlussScheduler scheduler; private File logDir; - // TODOadd more tests refer to kafka's UnifiedLogTest. + // TODO add more tests refer to kafka's UnifiedLogTest. @BeforeEach public void setup() throws Exception { @@ -374,7 +374,7 @@ final class LogTabletTest extends LogTestBase { log.updateHighWatermark(log.localLogEndOffset()); - // TODOadd delete log segment logic. + // TODO add delete log segment logic. } @Test diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java index 5bedd921e..0d571e1ea 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java @@ -94,7 +94,7 @@ class RemoteLogTabletTest extends RemoteLogTestBase { assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(0); assertThat(remoteLogTablet.getRemoteLogEndOffset()).isEqualTo(OptionalLong.of(30)); - // delete first remote log segment and addColumn another one remote log segments. + // delete first remote log segment and add another one remote log segments. remoteLogTablet.addAndDeleteLogSegments( Collections.singletonList(remoteLogSegmentList.get(3)), Collections.singletonList(remoteLogSegmentList.get(0))); @@ -102,7 +102,7 @@ class RemoteLogTabletTest extends RemoteLogTestBase { assertThat(remoteLogTablet.getRemoteLogStartOffset()).isEqualTo(10); assertThat(remoteLogTablet.getRemoteLogEndOffset()).isEqualTo(OptionalLong.of(40)); - // delete all exist and append one. we will first addColumn then delete. + // delete all exist and append one. we will first add then delete. remoteLogTablet.addAndDeleteLogSegments( Collections.singletonList(remoteLogSegmentList.get(4)), remoteLogSegmentList); assertThat(remoteLogTablet.getIdToRemoteLogSegmentMap()).hasSize(0); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java index 35c36d30c..b6fdd9e93 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/MetadataUpdateITCase.java @@ -157,7 +157,7 @@ class MetadataUpdateITCase { expectedTablePathById, Collections.emptyMap())); - // addColumn back tablet server2 + // add back tablet server2 FLUSS_CLUSTER_EXTENSION.startTabletServer(2); retry( Duration.ofMinutes(1), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java index bb2138881..933abd231 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java @@ -140,7 +140,7 @@ public class AdjustIsrITCase { isr.add(stopFollower); FLUSS_CLUSTER_EXTENSION.notifyLeaderAndIsr( stopFollower, DATA1_TABLE_PATH, tb, newLeaderAndIsr, isr); - // retry until the stop follower addColumn back to ISR. + // retry until the stop follower add back to ISR. retry( Duration.ofMinutes(1), () -> diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java index 75fb9e248..3f8623569 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java @@ -32,7 +32,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link AdjustIsrManager}. */ class AdjustIsrManagerTest { - // TODO addColumn more test refer to kafka AlterPartitionManagerTest, See: FLUSS-56278513 + // TODO add more test refer to kafka AlterPartitionManagerTest, See: FLUSS-56278513 @Test void testSubmitShrinkIsr() throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java index b089efd30..b093787f0 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java @@ -92,7 +92,7 @@ final class HighWatermarkPersistenceTest extends ReplicaTestBase { assertThat(highWatermark0).isEqualTo(10L); assertThat(replica0.getLogTablet().getHighWatermark()).isEqualTo(10L); - // addColumn another replica and set highWatermark. + // add another replica and set highWatermark. TableBucket tableBucket1 = new TableBucket(DATA2_TABLE_ID, 0); replicaManager.becomeLeaderOrFollower( INITIAL_COORDINATOR_EPOCH, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java index a7c65acda..9e13b1688 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/RemoveOfflineReplicaFromIsrITCase.java @@ -130,7 +130,7 @@ class RemoveOfflineReplicaFromIsrITCase { FLUSS_CLUSTER_EXTENSION.startTabletServer(follower); isr.add(follower); - // make sure the stopped follower can addColumn back to isr after restart + // make sure the stopped follower can add back to isr after restart retry( Duration.ofMinutes(1), () -> diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index a18d76442..c8e26ea7b 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -95,8 +95,8 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link Replica}. */ final class ReplicaTest extends ReplicaTestBase { - // TODO addColumn more tests refer to kafka's PartitionTest. - // TODO addColumn more tests to cover partition table + // TODO add more tests refer to kafka's PartitionTest. + // TODO add more tests to cover partition table @Test void testMakeLeader() throws Exception { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java index c438da529..80d998a4d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java @@ -342,7 +342,7 @@ public class ReplicaTestBase { } // TODO this is only for single tablet server unit test. - // TODO addColumn more test cases for partition table which make leader by this method. + // TODO add more test cases for partition table which make leader by this method. protected void makeLogTableAsLeader(int bucketId) { makeLogTableAsLeader(new TableBucket(DATA1_TABLE_ID, bucketId), false); } @@ -375,7 +375,7 @@ public class ReplicaTestBase { } // TODO this is only for single tablet server unit test. - // TODO addColumn more test cases for partition table which make leader by this method. + // TODO add more test cases for partition table which make leader by this method. protected void makeKvTableAsLeader(long tableId, TablePath tablePath, int bucketId) { makeKvTableAsLeader( new TableBucket(tableId, bucketId), tablePath, INITIAL_LEADER_EPOCH, false); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 8b0b75c35..01cf2701a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -177,7 +177,7 @@ public class TabletServiceITCase { } @Test - @Disabled("TODO: addColumn back in https://github.com/apache/fluss/issues/771") + @Disabled("TODO: add back in https://github.com/apache/fluss/issues/771") void testProduceLogResponseReturnInOrder() throws Exception { long tableId = createTable( @@ -334,7 +334,7 @@ public class TabletServiceITCase { } @Test - @Disabled("TODO: addColumn back in https://github.com/apache/fluss/issues/777") + @Disabled("TODO: add back in https://github.com/apache/fluss/issues/777") void testFetchLogWithMinFetchSizeAndTimeout() throws Exception { long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 8cd737e51..a1ee83fa6 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -286,7 +286,7 @@ public final class FlussClusterExtension } private void startTabletServers() throws Exception { - // addColumn tablet server to make generate assignment for table possible + // add tablet server to make generate assignment for table possible for (int i = 0; i < initialNumOfTabletServers; i++) { startTabletServer(i); } @@ -679,7 +679,7 @@ public final class FlussClusterExtension int leader = leaderAndIsrOpt.get().leader(); TabletServer tabletServer = getTabletServerById(leader); ServerSchemaCache serverSchemaCache = - tabletServer.getMetadataCache().getSchemaMetadataManager(); + tabletServer.getMetadataCache().getServerSchemaCache(); Map<Long, SchemaInfo> latestSchemaByTablePath = serverSchemaCache.getLatestSchemaByTableId(); assertThat(latestSchemaByTablePath).containsKey(tableId); diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index ca167c2d9..0c6f54cd5 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -164,7 +164,7 @@ <excludes> <exclude>fluss-test-coverage/**</exclude> <exclude>fluss-test-utils/**</exclude> - <!-- exclude adapter classes to avoid Jacoco error: "Can'tadd different class with same name" --> + <!-- exclude adapter classes to avoid Jacoco error: "Can't add different class with same name" --> <exclude>fluss-flink/**/target/classes/org/apache/fluss/flink/adapter/**</exclude> </excludes> </resource> @@ -206,7 +206,7 @@ <excludes> <exclude>fluss-test-coverage/**</exclude> <exclude>fluss-test-utils/**</exclude> - <!-- exclude adapter classes to avoid Jacoco error: "Can'tadd different class with same name" --> + <!-- exclude adapter classes to avoid Jacoco error: "Can't add different class with same name" --> <exclude>fluss-flink/**/target/classes/org/apache/fluss/flink/adapter/**</exclude> </excludes> </resource>
