platinumhamburg commented on code in PR #2809:
URL: https://github.com/apache/fluss/pull/2809#discussion_r3004800975
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java:
##########
@@ -138,6 +145,32 @@ public BatchScanner createBatchScanner(TableBucket
tableBucket) {
limit);
}
+ @Override
+ public BatchScanner createBatchScanner() {
Review Comment:
The Javadoc says "only supported for Primary Key Tables", but there's no
actual check. If someone calls this on a Log Table, it'll travel all the way to
the server's `getLeaderKvTablet()` before failing with `InvalidTableException`
— a wasted RPC and a confusing error message.
A quick guard at the top would save everyone some headaches:
```java
if (!tableInfo.hasPrimaryKey()) {
throw new UnsupportedOperationException(
"createBatchScanner() without TableBucket is only supported for
Primary Key Tables. "
+ "Table: " + tableInfo.getTablePath());
}
```
##########
fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java:
##########
@@ -242,10 +245,15 @@ public enum Errors {
REBALANCE_FAILURE_EXCEPTION(61, "The rebalance task failure.",
RebalanceFailureException::new),
NO_REBALANCE_IN_PROGRESS_EXCEPTION(
62, "No rebalance task in progress.",
NoRebalanceInProgressException::new),
- INVALID_PRODUCER_ID_EXCEPTION(
- 63,
- "The client has attempted to perform an operation with an invalid
producer ID.",
- InvalidProducerIdException::new);
+ SCANNER_EXPIRED(
Review Comment:
The diff removes `INVALID_PRODUCER_ID_EXCEPTION(63, ...)` entirely and
replaces it with new scanner error codes starting at 64. Two problems stack up
here:
First, `InvalidProducerIdException` is still actively used in
`ProducerOffsetsManager` (11+ references). Without the error code mapping, all
those exceptions will degrade to `UNKNOWN_SERVER_ERROR` on the wire — clients
won't be able to tell what went wrong.
Second, and more critically — current main already has `CONFIG_EXCEPTION(64,
...)`. This PR adds `SCANNER_EXPIRED(64, ...)`. The `Errors` static initializer
detects duplicate codes and throws, so TabletServer won't even start.
Looks like the PR was branched before `CONFIG_EXCEPTION` landed on main. A
rebase should fix it — restore code 63, and start the new codes from 65:
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
+import org.apache.fluss.server.utils.ResourceGuard;
+
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Server-side state for a single KV full-scan session.
+ *
+ * <p>A {@code ScannerContext} holds a point-in-time RocksDB {@link Snapshot},
the {@link
+ * ReadOptions} pinning it, and a cursor ({@link RocksIterator}) that persists
across multiple
+ * batched-fetch RPCs from the same client. It also holds a {@link
ResourceGuard.Lease} that
+ * prevents the underlying RocksDB instance from being closed while the scan
is in progress.
+ *
+ * <p>Instances are created by {@link
org.apache.fluss.server.kv.KvTablet#openScan} and registered
+ * by {@link ScannerManager}. They must be closed when the scan completes, the
client requests an
+ * explicit close, or the session expires due to inactivity.
+ *
+ * <p><b>Thread safety:</b> The iterator cursor ({@link #advance()}, {@link
#isValid()}, {@link
+ * #currentValue()}) must be driven by only one thread at a time. {@link
#close()} is thread-safe.
+ */
+@NotThreadSafe
+public class ScannerContext implements Closeable {
+ private final String scannerId;
+ private final byte[] scannerIdBytes;
+ private final TableBucket tableBucket;
+ private final RocksDBKv rocksDBKv;
+ private final RocksIterator iterator;
+ private final ReadOptions readOptions;
+ private final Snapshot snapshot;
+ private final ResourceGuard.Lease resourceLease;
+ private long remainingLimit;
+ private int callSeqId = -1;
Review Comment:
The initial value `-1` works correctly with the client's first continuation
sending `0` (`-1 + 1 = 0`), but it's a subtle implicit contract. A one-line
comment would save the next person from having to trace through both sides to
convince themselves it's right.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerManager.java:
##########
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.TooManyScannersException;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.KvTablet;
+import org.apache.fluss.utils.AutoCloseableAsync;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.clock.SystemClock;
+import org.apache.fluss.utils.concurrent.FutureUtils;
+import org.apache.fluss.utils.concurrent.Scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manages server-side KV full-scan sessions ({@link ScannerContext}).
+ *
+ * <p>Each KV full scan opens a persistent server-side session that holds a
point-in-time RocksDB
+ * snapshot and a cursor. Sessions are keyed by a server-assigned UUID-based
scanner ID and persist
+ * across multiple batched-fetch RPCs from the same client.
+ *
+ * <h3>Concurrency limits</h3>
+ *
+ * <ul>
+ * <li><b>Per-bucket:</b> at most {@code maxPerBucket} concurrent sessions
on any single bucket.
+ * <li><b>Per-server:</b> at most {@code maxPerServer} concurrent sessions
across all buckets.
+ * </ul>
+ *
+ * <p>Limit enforcement is two-phase: a fast pre-check guards the common case;
the subsequent atomic
+ * increment with re-check and rollback prevents the TOCTOU race from
permanently breaching the
+ * configured limits. Exceeding either limit causes {@link
TooManyScannersException}.
+ *
+ * <h3>Empty bucket handling</h3>
+ *
+ * <p>If the target bucket contains no rows at the time the scan is opened,
{@link
+ * #createScanner(KvTablet, TableBucket, Long)} returns {@code null} without
consuming a limit slot.
+ * The caller should return an empty response immediately.
+ *
+ * <h3>TTL eviction</h3>
+ *
+ * <p>A background reaper task runs every {@code
kv.scanner.expiration-interval} and evicts sessions
+ * idle longer than {@code kv.scanner.ttl}. Recently evicted IDs are retained
for {@code 2 × ttl} so
+ * callers can distinguish "expired" from "never existed."
+ *
+ * <h3>Leadership change</h3>
+ *
+ * {@link #closeScannersForBucket(TableBucket)} must be called when a bucket
loses leadership to
+ * release all RocksDB snapshot/iterator resources for that bucket promptly.
+ */
+public class ScannerManager implements AutoCloseableAsync {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ScannerManager.class);
+
+ private final Map<String, ScannerContext> scanners =
MapUtils.newConcurrentHashMap();
+ private final Map<String, Long> recentlyExpiredIds =
MapUtils.newConcurrentHashMap();
+
+ /** Per-bucket active scanner count, used for O(1) per-bucket limit
enforcement. */
+ private final Map<TableBucket, AtomicInteger> perBucketCount =
MapUtils.newConcurrentHashMap();
+
+ /** Total active scanner count across all buckets on this tablet server. */
+ private final AtomicInteger totalScanners = new AtomicInteger(0);
+
+ private final Clock clock;
+ private final long scannerTtlMs;
+ private final long recentlyExpiredRetentionMs;
+ private final int maxPerBucket;
+ private final int maxPerServer;
+
+ @Nullable private ScheduledFuture<?> cleanupTask;
+
+ public ScannerManager(Configuration conf, Scheduler scheduler) {
+ this(conf, scheduler, SystemClock.getInstance());
+ }
+
+ @VisibleForTesting
+ ScannerManager(Configuration conf, Scheduler scheduler, Clock clock) {
+ this.clock = clock;
+ this.scannerTtlMs = conf.get(ConfigOptions.KV_SCANNER_TTL).toMillis();
+ this.recentlyExpiredRetentionMs = 2 * scannerTtlMs;
+ this.maxPerBucket = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_BUCKET);
+ this.maxPerServer = conf.get(ConfigOptions.KV_SCANNER_MAX_PER_SERVER);
+
+ long expirationIntervalMs =
+
conf.get(ConfigOptions.KV_SCANNER_EXPIRATION_INTERVAL).toMillis();
+ this.cleanupTask =
+ scheduler.schedule(
+ "scanner-expiration",
+ this::cleanupExpiredScanners,
+ expirationIntervalMs,
+ expirationIntervalMs);
+
+ LOG.info(
+ "Started ScannerManager: ttl={}ms, expirationInterval={}ms, "
+ + "maxPerBucket={}, maxPerServer={}",
+ scannerTtlMs,
+ expirationIntervalMs,
+ maxPerBucket,
+ maxPerServer);
+ }
+
+ /**
+ * Creates a new scan session for the given bucket, taking a point-in-time
RocksDB snapshot.
+ *
+ * <p>Returns {@code null} if the bucket is empty (no rows to scan). In
that case no session
+ * slot is consumed and the caller should return an empty response
immediately.
+ *
+ * <p><b>Limit enforcement is two-phase:</b> a fast pre-check guards the
common case; the
+ * subsequent atomic increment + re-check prevents the TOCTOU race from
permanently breaching
+ * configured limits. If registration fails after the snapshot is already
opened, the context is
+ * closed and the exception is re-thrown to avoid leaking resources.
+ *
+ * @param kvTablet the {@link KvTablet} for the bucket; used to open the
snapshot
+ * @param tableBucket the bucket being scanned
+ * @param limit optional row-count limit ({@code null} or ≤ 0 means
unlimited)
+ * @return the newly registered {@link ScannerContext}, or {@code null} if
the bucket is empty
+ * @throws TooManyScannersException if the per-bucket or per-server limit
is exceeded
+ * @throws IOException if the underlying {@link
org.apache.fluss.server.utils.ResourceGuard} is
+ * already closed (the bucket is shutting down)
+ */
+ @Nullable
+ public ScannerContext createScanner(
+ KvTablet kvTablet, TableBucket tableBucket, @Nullable Long limit)
throws IOException {
+ checkLimits(tableBucket);
+
+ String scannerId = generateScannerId();
+ ScannerContext context =
+ kvTablet.openScan(scannerId, limit != null ? limit : -1L,
clock.milliseconds());
+ if (context == null) {
+ // Bucket is empty — no session slot consumed.
+ return null;
+ }
+
+ try {
+ registerContext(context, tableBucket);
+ } catch (TooManyScannersException e) {
+ // Limit was exceeded between the initial check and registration
(race window).
+ // Close the already-opened context to avoid leaking the snapshot
and lease.
+ closeScannerContext(context);
+ throw e;
+ }
+ return context;
+ }
+
+ /**
+ * Looks up an existing scanner session by its raw ID bytes and refreshes
its last-access
+ * timestamp.
+ *
+ * @return the {@link ScannerContext}, or {@code null} if not found (may
have expired or never
+ * existed)
+ */
+ @Nullable
+ public ScannerContext getScanner(byte[] scannerId) {
+ ScannerContext context = scanners.get(new String(scannerId,
StandardCharsets.UTF_8));
+ if (context != null) {
+ context.updateLastAccessTime(clock.milliseconds());
+ }
+ return context;
+ }
+
+ /**
+ * Returns {@code true} if the given scanner ID belongs to a session that
was recently evicted
+ * by the TTL reaper (within the last {@code 2 × ttlMs}).
+ *
+ * <p>Callers can use this to distinguish "scanner expired" from "unknown
scanner ID."
+ */
+ public boolean isRecentlyExpired(byte[] scannerId) {
+ return recentlyExpiredIds.containsKey(new String(scannerId,
StandardCharsets.UTF_8));
+ }
+
+ /**
+ * Removes and closes a known scanner context directly, avoiding a map
lookup.
+ *
+ * <p>Uses a conditional remove ({@link
java.util.concurrent.ConcurrentHashMap#remove(Object,
+ * Object)}) so that concurrent calls — e.g. from the TTL reaper and a
close-scanner RPC
+ * arriving simultaneously — result in exactly one winner closing the
context, preventing
+ * double-release of the non-idempotent {@link
+ * org.apache.fluss.server.utils.ResourceGuard.Lease}.
+ */
+ public void removeScanner(ScannerContext context) {
+ if (scanners.remove(context.getId(), context)) {
Review Comment:
`removeScanner(byte[])` uses `scanners.remove(key)` (unconditional), while
`removeScanner(ScannerContext)` uses `scanners.remove(key, value)`
(conditional). If the TTL reaper fires at the same time, `decrementCounts` can
get called twice for the same scanner, pushing the counter negative. Using
conditional removal consistently would close this gap.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -1128,6 +1162,14 @@ public class ConfigOptions {
+ "will still be returned to ensure that
the fetch can make progress. As such, "
+ "this is not a absolute maximum.");
+ public static final ConfigOption<MemorySize>
CLIENT_SCANNER_KV_FETCH_MAX_BYTES =
Review Comment:
Config is defined but never read
##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/TableScan.java:
##########
@@ -138,6 +145,32 @@ public BatchScanner createBatchScanner(TableBucket
tableBucket) {
limit);
}
+ @Override
+ public BatchScanner createBatchScanner() {
+ long tableId = tableInfo.getTableId();
+ int numBuckets = tableInfo.getNumBuckets();
+ List<TableBucket> buckets = new ArrayList<>();
+ if (!tableInfo.isPartitioned()) {
+ for (int b = 0; b < numBuckets; b++) {
+ buckets.add(new TableBucket(tableId, b));
+ }
+ } else {
+ try {
+ List<PartitionInfo> partitions =
+
conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get();
Review Comment:
```java
List<PartitionInfo> partitions =
conn.getAdmin().listPartitionInfos(tableInfo.getTablePath()).get();
```
Bare `.get()` with no timeout. If the coordinator is down, this hangs
indefinitely. Adding a timeout is a small change that avoids a nasty surprise:
```java
.get(30, TimeUnit.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/scan/ScannerContext.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.kv.scan;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.kv.rocksdb.RocksDBKv;
+import org.apache.fluss.server.utils.ResourceGuard;
+
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Snapshot;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Server-side state for a single KV full-scan session.
+ *
+ * <p>A {@code ScannerContext} holds a point-in-time RocksDB {@link Snapshot},
the {@link
+ * ReadOptions} pinning it, and a cursor ({@link RocksIterator}) that persists
across multiple
+ * batched-fetch RPCs from the same client. It also holds a {@link
ResourceGuard.Lease} that
+ * prevents the underlying RocksDB instance from being closed while the scan
is in progress.
+ *
+ * <p>Instances are created by {@link
org.apache.fluss.server.kv.KvTablet#openScan} and registered
+ * by {@link ScannerManager}. They must be closed when the scan completes, the
client requests an
+ * explicit close, or the session expires due to inactivity.
+ *
+ * <p><b>Thread safety:</b> The iterator cursor ({@link #advance()}, {@link
#isValid()}, {@link
+ * #currentValue()}) must be driven by only one thread at a time. {@link
#close()} is thread-safe.
+ */
+@NotThreadSafe
+public class ScannerContext implements Closeable {
+ private final String scannerId;
+ private final byte[] scannerIdBytes;
+ private final TableBucket tableBucket;
+ private final RocksDBKv rocksDBKv;
+ private final RocksIterator iterator;
+ private final ReadOptions readOptions;
+ private final Snapshot snapshot;
+ private final ResourceGuard.Lease resourceLease;
Review Comment:
`ScannerContext` holds a `ResourceGuard.Lease`, and `KvTablet.close()`
ultimately calls `resourceGuard.close()`, which blocks until all leases are
released. If a scanner has a long TTL and the client doesn't close it,
`KvTablet.close()` will just hang.
The good news is `makeFollowers` and `stopReplica` both call
`closeScannersForBucket` already, so the main paths are covered. But if
KvTablet gets closed through some other path (e.g., exception recovery),
there's still a risk of getting stuck. Worth adding a cleanup step in
`Replica.close()` or `KvTablet.close()` as a safety net.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]