hudi-agent commented on code in PR #18821:
URL: https://github.com/apache/hudi/pull/18821#discussion_r3291595991


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -37,18 +38,21 @@
 @Slf4j
 public class RocksDBIndexBackend implements GlobalIndexBackend {
   private static final String COLUMN_FAMILY = "index_cache";
+  private static final String BASE_PATH = "hudi-index-backend";

Review Comment:
   🤖 nit: `BASE_PATH` reads like a filesystem path, but the value 
`"hudi-index-backend"` is clearly a logical factory key. Something like 
`FACTORY_KEY` or `INDEX_BACKEND_KEY` would make its purpose immediately obvious 
to a future reader.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -37,18 +38,21 @@
 @Slf4j
 public class RocksDBIndexBackend implements GlobalIndexBackend {
   private static final String COLUMN_FAMILY = "index_cache";
+  private static final String BASE_PATH = "hudi-index-backend";
 
   private final RocksDBDAO rocksDBDAO;
+  private final String rocksDbBasePath;
   private transient FlinkRocksDBIndexMetrics rocksDBIndexMetrics;
 
   public RocksDBIndexBackend(String rocksDbBasePath, boolean 
isPartitionedTable) {
+    this.rocksDbBasePath = rocksDbBasePath;
     // Register custom serializer for HoodieRecordGlobalLocation to minimize 
storage overhead
     ConcurrentHashMap<String, CustomSerializer<?>> serializers = new 
ConcurrentHashMap<>();
     serializers.put(COLUMN_FAMILY, isPartitionedTable
         ? new CodedRecordGlobalLocationSerializer()
         : new RecordGlobalLocationSerializer());
 
-    this.rocksDBDAO = new RocksDBDAO("hudi-index-backend", rocksDbBasePath, 
serializers, true);
+    this.rocksDBDAO = RocksDBDAOFactory.getOrCreate(BASE_PATH, 
rocksDbBasePath, serializers, true);
     this.rocksDBDAO.addColumnFamily(COLUMN_FAMILY);

Review Comment:
   🤖 If `addColumnFamily` throws here (e.g. `RocksDBException` wrapped as 
`HoodieException`, or the `checkArgument(!closed)` on a racing close), 
`getOrCreate` has already incremented the factory's refCount but the 
constructor fails so the caller never gets a backend to release. The cached 
entry would then never be evicted. Could you wrap the call in a try/catch that 
calls `RocksDBDAOFactory.release(...)` on failure?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/RocksDBIndexBackend.java:
##########
@@ -94,6 +98,6 @@ public double getRatioMetric(TickerType hitTicker, TickerType 
missTicker) {
 
   @Override
   public void close() throws IOException {
-    this.rocksDBDAO.close();
+    RocksDBDAOFactory.release(BASE_PATH, rocksDbBasePath);

Review Comment:
   🤖 `close()` is no longer idempotent — the previous `rocksDBDAO.close()` was 
guarded by `RocksDBDAO`'s internal `closed` flag. With multiple references via 
the factory, a double `close()` on this backend would decrement the refCount 
twice and could prematurely evict/close the shared DAO while another backend 
still holds a reference. Could you guard with a local `closed` flag so 
accidental double-close stays a no-op (per the `Closeable` contract)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAOFactory.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.common.serialization.CustomSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Factory that creates and caches {@link RocksDBDAO} singletons keyed by 
{@code (basePath, rocksDBBasePath)}.
+ *
+ * <p>Multiple callers that supply identical parameters share a single 
underlying RocksDB instance.
+ * The factory tracks a reference count; the instance is physically closed and 
evicted only when
+ * every holder has called {@link #release}.
+ *
+ * <p>Callers <em>must</em> call {@link #release} (not {@link 
RocksDBDAO#close}) when they are
+ * done with the instance, so that the reference count is decremented 
correctly.
+ *
+ * <p>Note: column-family serializers and the {@code disableWAL} flag are 
applied only on the
+ * first creation for a given key. Subsequent callers that pass different 
values for these
+ * parameters will share the instance created by the first caller.
+ */
+public class RocksDBDAOFactory {
+  private static final Object LOCK = new Object();
+  private static final Map<String, Entry> INSTANCES = new HashMap<>();
+
+  private RocksDBDAOFactory() {
+  }
+
+  /**
+   * Returns a shared {@link RocksDBDAO} for the given parameters, creating 
one if absent.
+   *
+   * <p>Increments the internal reference count. Callers must invoke {@link 
#release} with the
+   * same {@code basePath} and {@code rocksDBBasePath} when the instance is no 
longer needed.
+   *
+   * @param basePath                logical base path identifying the Hudi 
table
+   * @param rocksDBBasePath         filesystem path under which RocksDB stores 
its data
+   * @param columnFamilySerializers per-column-family serializers (applied 
only on first creation)
+   * @param disableWAL              whether to disable the write-ahead log 
(applied only on first creation)
+   * @return shared {@link RocksDBDAO} instance
+   */
+  public static RocksDBDAO getOrCreate(
+      String basePath,
+      String rocksDBBasePath,
+      ConcurrentHashMap<String, CustomSerializer<?>> columnFamilySerializers,

Review Comment:
   🤖 nit: could you widen this parameter type to `Map<String, 
CustomSerializer<?>>` instead of `ConcurrentHashMap`? The method only reads 
from the map, so tying the public API to a specific implementation forces 
callers to use `ConcurrentHashMap` even if they hold a plain `Map`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to