platinumhamburg commented on code in PR #2651:
URL: https://github.com/apache/fluss/pull/2651#discussion_r2795891242


##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/TabletState.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.snapshot;
+
+import org.apache.fluss.server.kv.autoinc.AutoIncIDRange;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * TabletState represents the state of a kv tablet at a certain log offset. It 
contains the flushed
+ * log offset, the row count of the tablet at that log offset, and the 
auto-increment ID ranges of
+ * the tablet at that log offset. The row count and auto-increment ID ranges 
are optional, and may
+ * be null if the information is not available or not needed for a particular 
use case.
+ */
+public class TabletState {

Review Comment:
   Consider adding `toString()` for easier debugging and logging.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java:
##########
@@ -218,9 +220,11 @@ public void flush(long exclusiveUpToLogSequenceNumber) 
throws IOException {
             Value value = entry.getValue();
             if (value.value != null) {
                 flushedCount += 1;
+                rowCountDiff += 1;

Review Comment:
   The `KvPreWriteBuffer` cannot distinguish between INSERT (new key) and 
UPDATE (overwriting existing key) - both are PUT operations. Row count will be 
inflated over time as more UPDATEs occur. A low-cost fix would be to track 
operation type (INSERT vs UPDATE) when writing to the buffer, since 
`processUpsert()` already queries `oldValue` to determine this.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/autoinc/AutoIncIDRange.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.server.kv.autoinc;
+
+import java.util.Objects;
+
+/**
+ * Represents a range of IDs allocated for auto-increment purposes. This class 
encapsulates the
+ * auto-increment column id, the start and end of the ID range. The range is 
[start, end]. There is
+ * possible that the start > end which means the range is empty.
+ */
+public class AutoIncIDRange {
+
+    private final int columnId;
+    private final long start;
+    private final long end;
+
+    public AutoIncIDRange(int columnId, long start, long end) {
+        this.columnId = columnId;
+        this.start = start;
+        this.end = end;
+    }
+
+    /**
+     * Returns the column ID of the auto-increment column that associated with 
this auto-increment
+     * ID range.
+     */
+    public int getColumnId() {
+        return columnId;
+    }
+
+    /** Returns the starting ID of the range (inclusive). */
+    public long getStart() {
+        return start;
+    }
+
+    /** Returns the ending ID of the range (inclusive). */
+    public long getEnd() {
+        return end;
+    }
+
+    /** Checks if the ID range is empty (i.e., start is greater than end). */
+    public boolean isEmpty() {
+        return start > end;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AutoIncIDRange autoIncIdRange = (AutoIncIDRange) o;
+        return start == autoIncIdRange.start && end == autoIncIdRange.end;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(start, end);

Review Comment:
   `columnId` is missing in both `equals()` and `hashCode()`.



##########
fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java:
##########
@@ -247,20 +316,169 @@ public KeyValueAndLogOffset(byte[] key, byte[] value, 
long logOffset) {
     public static class KvRecoverContext {
 
         private final TablePath tablePath;
-        private final TableBucket tableBucket;
 
         private final ZooKeeperClient zkClient;
         private final int maxFetchLogSizeInRecoverKv;
 
         public KvRecoverContext(
-                TablePath tablePath,
-                TableBucket tableBucket,
-                ZooKeeperClient zkClient,
-                int maxFetchLogSizeInRecoverKv) {
+                TablePath tablePath, ZooKeeperClient zkClient, int 
maxFetchLogSizeInRecoverKv) {
             this.tablePath = tablePath;
-            this.tableBucket = tableBucket;
             this.zkClient = zkClient;
             this.maxFetchLogSizeInRecoverKv = maxFetchLogSizeInRecoverKv;
         }
     }
+
+    // 
------------------------------------------------------------------------------------------
+    // Below are some helpers for recovering tablet state from log
+    // 
------------------------------------------------------------------------------------------
+
+    /** A helper to update the latest row count during recovering from log. */
+    private interface RowCountUpdater {
+
+        /** Apply the change to the row count according to the change type. */
+        void applyChange(ChangeType changeType);
+
+        /** Get the latest row count. Returns -1 if this table doesn't support 
row count. */
+        long getRowCount();
+    }
+
+    /**
+     * A simple implementation of {@link RowCountUpdater} which maintains a 
row count by applying
+     * the change type of each log record.
+     */
+    private static class RowCountUpdaterImpl implements RowCountUpdater {
+        private long rowCount;
+
+        public RowCountUpdaterImpl(long initialRowCount) {
+            this.rowCount = initialRowCount;
+        }
+
+        @Override
+        public void applyChange(ChangeType changeType) {
+            if (changeType == ChangeType.INSERT || changeType == 
ChangeType.UPDATE_AFTER) {
+                rowCount++;
+            } else if (changeType == ChangeType.DELETE || changeType == 
ChangeType.UPDATE_BEFORE) {
+                rowCount--;
+            }
+        }
+
+        @Override
+        public long getRowCount() {
+            return rowCount;
+        }
+    }
+
+    /**
+     * A no-op implementation of {@link RowCountUpdater} for the table which 
doesn't support row
+     * count.
+     */
+    private static class NoOpRowCountUpdater implements RowCountUpdater {
+
+        @Override
+        public void applyChange(ChangeType changeType) {
+            // do nothing
+        }
+
+        @Override
+        public long getRowCount() {
+            return -1;
+        }
+    }
+
+    /** A helper to update the auto inc id range during recovering from log. */
+    private interface AutoIncIdRangeUpdater {

Review Comment:
   class `AutoIncIDRange` uses uppercase "ID", but interface 
`AutoIncIdRangeUpdater` uses camelCase "Id". Consider renaming to 
`AutoIncIDRangeUpdater` for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to