This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new e66acccfd fix(store): improve some potential lock & type cast issues 
(#2895)
e66acccfd is described below

commit e66acccfda1a47a6a5cc5bb1f3ad9762c88f7069
Author: Soyan <[email protected]>
AuthorDate: Sat Nov 1 04:08:14 2025 +0800

    fix(store): improve some potential lock & type cast issues (#2895)
    
    * update(store): fix some problem and clean up code
    
    - chore(store): clean some comments
    - chore(store): using Slf4j instead of System.out to print log
    - update(store): update more reasonable timeout setting
    - update(store): add close method for CopyOnWriteCache to avoid potential 
memory leak
    - update(store): delete duplicated beginTx() statement
    - update(store): extract parameter for compaction thread pool(move to 
configuration file in the future)
    - update(store): add default logic in AggregationFunctions
    - update(store): fix potential concurrency problem in QueryExecutor
    
    * Update 
hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
    
    ---------
    
    Co-authored-by: Peng Junzhi <[email protected]>
---
 .../org/apache/hugegraph/pd/client/PDConfig.java   |  2 +-
 .../org/apache/hugegraph/store/cli/cmd/Load.java   | 13 +++-------
 .../apache/hugegraph/store/cli/cmd/MultiQuery.java |  2 +-
 .../apache/hugegraph/store/cli/cmd/ScanTable.java  |  2 +-
 .../store/client/query/QueryExecutor.java          | 30 +++++++++++++++-------
 .../store/query/func/AggregationFunctions.java     |  7 +++--
 .../store/business/BusinessHandlerImpl.java        | 12 +++++++--
 .../store/cmd/request/DestroyRaftRequest.java      |  2 +-
 .../hugegraph/store/util/CopyOnWriteCache.java     | 24 ++++++++++++++++-
 .../org/apache/hugegraph/store/node/AppConfig.java |  4 +--
 .../node/grpc/query/AggregativeQueryObserver.java  |  6 +----
 11 files changed, 70 insertions(+), 34 deletions(-)

diff --git 
a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
 
b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
index a14c32425..5555bae30 100644
--- 
a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
+++ 
b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java
@@ -103,7 +103,7 @@ public final class PDConfig {
     public PDConfig setAuthority(String userName, String pwd) {
         this.userName = userName;
         String auth = userName + ':' + pwd;
-        this.authority = new 
String(Base64.getEncoder().encode(auth.getBytes(UTF_8)));
+        this.authority = 
Base64.getEncoder().encodeToString(auth.getBytes(UTF_8));
         return this;
     }
 
diff --git 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
index 0fbe10d01..a134d689c 100644
--- 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
+++ 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java
@@ -121,10 +121,9 @@ public class Load extends Command {
         for (int i = 0; i < readerSize; i++) {
             int fi = i;
             new Thread(() -> {
-                try {
-                    InputStreamReader isr = new InputStreamReader(new 
FileInputStream(split[fi]),
-                                                                  
StandardCharsets.UTF_8);
-                    BufferedReader reader = new BufferedReader(isr);
+                try(InputStreamReader isr = new InputStreamReader(new 
FileInputStream(split[fi]),
+                                                                       
StandardCharsets.UTF_8);
+                    BufferedReader reader = new BufferedReader(isr)) {
                     long count = 0;
                     String line;
                     try {
@@ -146,9 +145,6 @@ public class Load extends Command {
                         }
                     } catch (Exception e) {
                         throw new RuntimeException(e);
-                    } finally {
-                        isr.close();
-                        reader.close();
                     }
                 } catch (Exception e) {
                     log.error("send data with error:", e);
@@ -158,13 +154,12 @@ public class Load extends Command {
             }).start();
         }
         latch.await();
-        loadThread.join();
         completed.set(true);
+        loadThread.join();
     }
 
     public boolean put(String table, List<String> keys) {
         HgStoreSession session = storeClient.openSession(graph);
-        session.beginTx();
         try {
             session.beginTx();
             for (String key : keys) {
diff --git 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
index 6bcc4e3d9..2128e7fe0 100644
--- 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
+++ 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java
@@ -101,7 +101,7 @@ public class MultiQuery extends Command {
                                             current = (HgOwnerKey) 
queue[finalI].poll(1,
                                                                                
       TimeUnit.SECONDS);
                                         } catch (InterruptedException e) {
-                                            //
+                                            Thread.currentThread().interrupt();
                                         }
                                     }
                                     if (current == null) {
diff --git 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
index e46e59795..e8ebda772 100644
--- 
a/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
+++ 
b/hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java
@@ -73,7 +73,7 @@ public class ScanTable extends Command {
                             if (iterator.hasNext()) {
                                 iterator.next();
                                 position = iterator.position();
-                                System.out.println("count is " + count);
+                                log.info("count is {}", count);
                             } else {
                                 position = null;
                             }
diff --git 
a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
 
b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
index e5e667273..b4102bb37 100644
--- 
a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
+++ 
b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java
@@ -73,12 +73,15 @@ public class QueryExecutor {
 
     private final HugeGraphSupplier supplier;
 
-    private long timeout = 1800_000;
+    /**
+     * Timeout duration for StreamObserver receiving response
+     */
+    private long timeout = 60_000;
 
     /**
      * Used for testing single machine
      */
-    public static String filterStore = null;
+    private static final ThreadLocal<String> filterStore = new ThreadLocal<>();
 
     public QueryExecutor(HgStoreNodePartitioner nodePartitioner, 
HugeGraphSupplier supplier,
                          Long timeout) {
@@ -123,12 +126,20 @@ public class QueryExecutor {
                 if (o1 == null && o2 == null) {
                     return 0;
                 }
-
-                if (o1 != null) {
-                    return ((KvElement) o1).compareTo((KvElement) o2);
+                if (o1 != null && o2 != null) {
+                    if (o1 instanceof KvElement && o2 instanceof KvElement) {
+                        return ((KvElement) o1).compareTo((KvElement) o2);
+                    }
+                    if (!(o1 instanceof KvElement)) {
+                        throw new IllegalStateException(
+                                "Expected KvElement but got: " + 
o1.getClass().getName());
+                    }
+                    // !(o2 instanceof KvElement)
+                    throw new IllegalStateException(
+                            "Expected KvElement but got: " + 
o2.getClass().getName());
                 }
 
-                return 0;
+                return o1 != null ? 1 : -1;
             });
 
             iterator = new StreamFinalAggregationIterator<>(iterator, 
query.getFuncList());
@@ -277,9 +288,10 @@ public class QueryExecutor {
             }
         }
 
-        if (filterStore != null) {
-            return tasks.containsKey(filterStore) ?
-                   List.of(Tuple2.of(filterStore, tasks.get(filterStore))) : 
List.of();
+        if (filterStore.get() != null) {
+            String filterStoreStr = filterStore.get();
+            return tasks.containsKey(filterStoreStr) ?
+                   List.of(Tuple2.of(filterStoreStr, 
tasks.get(filterStoreStr))) : List.of();
         }
 
         return tasks.entrySet().stream()
diff --git 
a/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
 
b/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
index ee84f8789..23157b1e4 100644
--- 
a/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
+++ 
b/hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java
@@ -79,8 +79,11 @@ public class AggregationFunctions {
                         ((AtomicFloat) buffer).getAndAdd((Float) record);
                         break;
                     default:
-                        // throw new Exception ?
-                        break;
+                        // throw new Exception
+                        throw new IllegalStateException(
+                                "Unsupported buffer type: " + 
buffer.getClass().getName() +
+                                ". Supported types: AtomicLong, AtomicInteger, 
AtomicDouble, AtomicFloat"
+                        );
                 }
             }
         }
diff --git 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
index 307e5fc57..9287bfe26 100644
--- 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
+++ 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java
@@ -130,13 +130,17 @@ public class BusinessHandlerImpl implements 
BusinessHandler {
     }};
     private static final Map<Integer, String> dbNames = new 
ConcurrentHashMap<>();
     private static HugeGraphSupplier mockGraphSupplier = null;
-    private static final int compactionThreadCount = 64;
     private static final ConcurrentMap<String, AtomicInteger> pathLock = new 
ConcurrentHashMap<>();
     private static final ConcurrentMap<Integer, AtomicInteger> compactionState 
=
             new ConcurrentHashMap<>();
+    // Default core thread count
+    private static final int compactionThreadCount = 64;
+    private static final int compactionMaxThreadCount = 256;
+    // Max size of compaction queue
+    private static final int compactionQueueSize = 1000;
     private static final ThreadPoolExecutor compactionPool =
             ExecutorUtil.createExecutor(PoolNames.COMPACT, 
compactionThreadCount,
-                                        compactionThreadCount * 4, 
Integer.MAX_VALUE);
+                                        compactionMaxThreadCount, 
compactionQueueSize);
     private static final int timeoutMillis = 6 * 3600 * 1000;
     private final BinaryElementSerializer serializer = 
BinaryElementSerializer.getInstance();
     private final DirectBinarySerializer directBinarySerializer = new 
DirectBinarySerializer();
@@ -1667,4 +1671,8 @@ public class BusinessHandlerImpl implements 
BusinessHandler {
             };
         }
     }
+
+    public static void clearCache() {
+        GRAPH_SUPPLIER_CACHE.clear();
+    }
 }
diff --git 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
index ecd7e7cf0..b9e61837d 100644
--- 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
+++ 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java
@@ -27,7 +27,7 @@ import lombok.Data;
 @Data
 public class DestroyRaftRequest extends HgCmdBase.BaseRequest {
 
-    private List<String> graphNames = new ArrayList<>();
+    private final List<String> graphNames = new ArrayList<>();
 
     public void addGraphName(String graphName) {
         graphNames.add(graphName);
diff --git 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
index f07a5a018..b20eac39f 100644
--- 
a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
+++ 
b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java
@@ -20,6 +20,7 @@ package org.apache.hugegraph.store.util;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
@@ -29,7 +30,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.jetbrains.annotations.NotNull;
 
-//FIXME Missing shutdown method
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
 public class CopyOnWriteCache<K, V> implements ConcurrentMap<K, V> {
 
     // Scheduled executor service for periodically clearing the cache.
@@ -263,4 +266,23 @@ public class CopyOnWriteCache<K, V> implements 
ConcurrentMap<K, V> {
             return null;
         }
     }
+
+    public void close(){
+        scheduledExecutor.shutdown();
+        try {
+            boolean isTerminated = scheduledExecutor.awaitTermination(30, 
TimeUnit.SECONDS);
+            if (!isTerminated) {
+                List<Runnable> runnables = scheduledExecutor.shutdownNow();
+                log.info("CopyOnWriteCache shutting down with {} tasks left", 
runnables.size());
+
+                boolean isNowTerminated = 
scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS);
+                if (!isNowTerminated) {
+                    log.warn("Failed to shutdown CopyOnWriteCache thread 
pool");
+                }
+            }
+        }catch (InterruptedException e) {
+            scheduledExecutor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
 }
diff --git 
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
 
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
index a8a122327..3f1624c08 100644
--- 
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
+++ 
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java
@@ -278,9 +278,9 @@ public class AppConfig {
         private int fetchBatchSize;
 
         /**
-         * the timeout of request fetch
+         * the timeout of request fetch (ms)
          */
-        @Value("${query.push-down.fetch_timeout:3600000}")
+        @Value("${query.push-down.fetch_timeout:300000}")
         private long fetchTimeOut;
 
         /**
diff --git 
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
 
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
index 199d3ba55..0ba569cb9 100644
--- 
a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
+++ 
b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/query/AggregativeQueryObserver.java
@@ -60,10 +60,6 @@ public class AggregativeQueryObserver implements 
StreamObserver<QueryRequest> {
     private final AtomicInteger consumeCount = new AtomicInteger(0);
     private final AtomicInteger sendCount = new AtomicInteger(0);
     private final AtomicBoolean clientCanceled = new AtomicBoolean(false);
-    //    private final ThreadLocal<QueryResponse.Builder> localBuilder = 
ThreadLocal.withInitial
-    //    (QueryResponse::newBuilder);
-//    private final ThreadLocal<Kv.Builder> localKvBuilder = 
ThreadLocal.withInitial
-//    (Kv::newBuilder);
     private final BinaryElementSerializer serializer = 
BinaryElementSerializer.getInstance();
     private final StreamObserver<QueryResponse> sender;
     private volatile ScanIterator iterator = null;
@@ -328,7 +324,7 @@ public class AggregativeQueryObserver implements 
StreamObserver<QueryRequest> {
             try {
                 recordCount++;
                 executePipeline(itr.next());
-                if (System.currentTimeMillis() - current > timeout * 1000) {
+                if (System.nanoTime() - current > timeout * 1_000_000) {
                     throw new RuntimeException("execution timeout");
                 }
             } catch (EarlyStopException ignore) {

Reply via email to