Copilot commented on code in PR #10256:
URL: https://github.com/apache/seatunnel/pull/10256#discussion_r2652267002


##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/storage/HazelcastStateStore.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.storage;
+
+import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.MapListener;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collection;
+import java.util.EventListener;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+public class HazelcastStateStore<K, V> implements StateStore<K, V> {
+    private final IMap<K, V> iMap;
+
+    public HazelcastStateStore(IMap<K, V> iMap) {
+        this.iMap = iMap;
+    }
+
+    @Override
+    public V putIfAbsent(K key, V value) {
+        return iMap.putIfAbsent(key, value);
+    }
+
+    @Override
+    public V compute(K key, BiFunction<K, V, V> remappingFunction) {
+        return iMap.compute(key, remappingFunction);
+    }
+
+    @Override
+    public void remove(Object key) {
+        iMap.remove(key);
+    }
+
+    @Override
+    public V get(Object key) {
+        return iMap.get(key);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        iMap.put(key, value);
+    }
+
+    @Override
+    public void set(K key, V value) {
+        iMap.put(key, value);
+    }
+
+    @Override
+    public Set<Map.Entry<K, V>> entrySet() {
+        return iMap.entrySet();
+    }
+
+    @Override
+    public void forEach(BiConsumer<? super K, ? super V> action) {
+        iMap.forEach(action);
+    }
+
+    @Override
+    public UUID addEntryListener(@Nonnull EventListener listener, boolean 
includeValue) {

Review Comment:
   The addEntryListener method accepts a generic EventListener but casts it to 
MapListener, which is Hazelcast-specific. This breaks the abstraction since 
StateStore is meant to be storage-agnostic. Consider creating a 
storage-agnostic listener interface or adding type constraints to make this 
dependency explicit.
   ```suggestion
       public UUID addEntryListener(@Nonnull EventListener listener, boolean 
includeValue) {
           if (!(listener instanceof MapListener)) {
               throw new IllegalArgumentException(
                       "HazelcastStateStore requires a listener that implements 
MapListener");
           }
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -810,18 +808,20 @@ public JobMetrics getJobMetrics(long jobId) {
             return jobHistoryService.getJobMetrics(jobId);
         }
         JobMetrics jobMetrics = 
JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
-        JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
-        return jobMetricsImap != JobMetrics.empty() ? 
jobMetricsImap.merge(jobMetrics) : jobMetrics;
+        JobMetrics jobMetricsRecord = jobHistoryService.getJobMetrics(jobId);
+        return jobMetricsRecord != JobMetrics.empty()
+                ? jobMetricsRecord.merge(jobMetrics)
+                : jobMetrics;

Review Comment:
   Inconsistent naming pattern. The variable is renamed from "jobMetricsImap" 
to "jobMetricsRecord", but it should be "jobMetricsFromHistory" or similar to 
clearly indicate it's fetching historical metrics. The name "jobMetricsRecord" 
is ambiguous and doesn't convey its purpose.



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/storage/DistributedStoreManager.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.storage;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+public class DistributedStoreManager {
+    private final StateStoreFactory stateStoreFactory;
+
+    public DistributedStoreManager(NodeEngineImpl nodeEngine) {
+        this.stateStoreFactory = new HazelcastStateStoreFactory(nodeEngine);
+    }
+
+    public static <K, V> StateStore<K, V> getMap(NodeEngineImpl nodeEngine, 
String mapName) {
+        return new HazelcastStateStoreFactory(nodeEngine).getMap(mapName);
+    }
+

Review Comment:
   The static getMap method creates a new HazelcastStateStoreFactory instance 
on every call, which is inefficient. This static method should be removed and 
callers should use the instance method through DistributedStoreManager instead 
to avoid unnecessary object creation.
   ```suggestion
   
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -862,9 +862,9 @@ public Map<Long, JobMetrics> getRunningJobMetrics() {
 
         longJobMetricsMap.forEach(
                 (jobId, jobMetrics) -> {
-                    JobMetrics jobMetricsImap = 
jobHistoryService.getJobMetrics(jobId);
-                    if (jobMetricsImap != JobMetrics.empty()) {
-                        longJobMetricsMap.put(jobId, 
jobMetricsImap.merge(jobMetrics));
+                    JobMetrics jobMetricsRecord = 
jobHistoryService.getJobMetrics(jobId);
+                    if (jobMetricsRecord != JobMetrics.empty()) {
+                        longJobMetricsMap.put(jobId, 
jobMetricsRecord.merge(jobMetrics));

Review Comment:
   Inconsistent naming pattern. The variable is renamed from "jobMetricsImap" 
to "jobMetricsRecord", but this is inconsistent with the purpose and context. 
It should be "jobMetricsFromHistory" or "historicalJobMetrics" to clearly 
indicate it's fetching historical metrics from storage.
   ```suggestion
                       JobMetrics historicalJobMetrics = 
jobHistoryService.getJobMetrics(jobId);
                       if (historicalJobMetrics != JobMetrics.empty()) {
                           longJobMetricsMap.put(jobId, 
historicalJobMetrics.merge(jobMetrics));
   ```



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:
##########
@@ -810,18 +808,20 @@ public JobMetrics getJobMetrics(long jobId) {
             return jobHistoryService.getJobMetrics(jobId);
         }
         JobMetrics jobMetrics = 
JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics());
-        JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
-        return jobMetricsImap != JobMetrics.empty() ? 
jobMetricsImap.merge(jobMetrics) : jobMetrics;
+        JobMetrics jobMetricsRecord = jobHistoryService.getJobMetrics(jobId);
+        return jobMetricsRecord != JobMetrics.empty()
+                ? jobMetricsRecord.merge(jobMetrics)
+                : jobMetrics;
     }
 
     public Map<Long, JobMetrics> getRunningJobMetrics() {
         final Set<Long> runningJobIds = runningJobMasterMap.keySet();
 
         Set<Address> addresses = new HashSet<>();
-        ownedSlotProfilesIMap.forEach(
-                (pipelineLocation, ownedSlotProfilesIMap) -> {
+        ownedSlotProfilesStore.forEach(
+                (pipelineLocation, slotProfileMap) -> {
                     if (runningJobIds.contains(pipelineLocation.getJobId())) {
-                        ownedSlotProfilesIMap
+                        slotProfileMap
                                 .values()
                                 .forEach(
                                         ownedSlotProfile -> {

Review Comment:
   Inconsistent naming pattern in lambda parameter. The parameter is renamed 
from "ownedSlotProfilesIMap" to "slotProfileMap", which is good for clarity. 
However, this creates a shadow variable issue where "slotProfileMap" is used 
both as the forEach parameter and later refers to the outer variable. Consider 
renaming to "taskSlotProfiles" or "taskGroupSlots" for better clarity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to