This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 063794bf577 Implement and fix Windows disk metrics parsing (#17290)
063794bf577 is described below

commit 063794bf577865090246b6d6b673ae8067721efa
Author: Haonan <[email protected]>
AuthorDate: Fri Mar 20 10:15:22 2026 +0800

    Implement and fix Windows disk metrics parsing (#17290)
    
    * Implement WindowsDiskMetricsManager
    
    * Fix Windows disk metric parsing
    
    * previous work
    
    * Fix PowerShell disk metric parsing
    
    * Fix Windows disk metrics parsing
    
    * Prune stale disk metric maps
    
    * fix IT
    
    * fix IT
    
    * fix IT
---
 .../iotdb/session/it/IoTDBConnectionsIT.java       |   1 +
 .../iotdb/session/it/pool/SessionPoolIT.java       |   6 +-
 .../metrics/file/SystemRelatedFileMetrics.java     |  24 +-
 .../metricsets/disk/WindowsDiskMetricsManager.java | 504 ++++++++++++++++++++-
 .../metricsets/net/WindowsNetMetricManager.java    | 199 +++++++-
 5 files changed, 727 insertions(+), 7 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
index 80aa854e66c..591980548e2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
@@ -341,6 +341,7 @@ public class IoTDBConnectionsIT {
         TimeUnit.SECONDS.sleep(1);
       }
     }
+    TimeUnit.SECONDS.sleep(5);
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
index d8fb9bb4c50..f42d4237ea9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
@@ -242,7 +242,7 @@ public class SessionPoolIT {
   }
 
   @Test
-  public void tryIfTheServerIsRestart() {
+  public void tryIfTheServerIsRestart() throws InterruptedException {
     ISessionPool pool = EnvFactory.getEnv().getSessionPool(3);
     SessionDataSetWrapper wrapper = null;
     BaseNodeWrapper node = EnvFactory.getEnv().getDataNodeWrapper(0);
@@ -268,6 +268,7 @@ public class SessionPoolIT {
           .ensureNodeStatus(
               Collections.singletonList(node), 
Collections.singletonList(NodeStatus.Running));
       pool = EnvFactory.getEnv().getSessionPool(3);
+      TimeUnit.SECONDS.sleep(5);
       correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
       pool.close();
       return;
@@ -335,7 +336,7 @@ public class SessionPoolIT {
   }
 
   @Test
-  public void restart() {
+  public void restart() throws InterruptedException {
     ISessionPool pool = EnvFactory.getEnv().getSessionPool(1);
     write10Data(pool, true);
     // stop the server.
@@ -353,6 +354,7 @@ public class SessionPoolIT {
     EnvFactory.getEnv()
         .ensureNodeStatus(
             Collections.singletonList(node), 
Collections.singletonList(NodeStatus.Running));
+    TimeUnit.SECONDS.sleep(5);
     write10Data(pool, true);
     pool.close();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
index 3fac0d10a16..b9b9e89e82c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
@@ -29,6 +29,11 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 import org.apache.iotdb.metrics.utils.SystemType;
 
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.platform.win32.Kernel32;
+import com.sun.jna.platform.win32.WinNT;
+import com.sun.jna.ptr.IntByReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +57,9 @@ public class SystemRelatedFileMetrics implements IMetricSet {
 
   @Override
   public void bindTo(AbstractMetricService metricService) {
-    if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType() 
== SystemType.MAC)
+    if ((CONFIG.getSystemType() == SystemType.LINUX
+            || CONFIG.getSystemType() == SystemType.MAC
+            || CONFIG.getSystemType() == SystemType.WINDOWS)
         && !CONFIG.getPid().isEmpty()) {
       this.getOpenFileNumberCommand =
           new String[] {"/bin/sh", "-c", String.format("lsof -p %s | wc -l", 
CONFIG.getPid())};
@@ -88,6 +95,11 @@ public class SystemRelatedFileMetrics implements IMetricSet {
           }
         }
         fdCount = Long.parseLong(result.toString().trim());
+      } else if (CONFIG.getSystemType() == SystemType.WINDOWS) {
+        WinNT.HANDLE hProcess = Kernel32.INSTANCE.GetCurrentProcess();
+        IntByReference handleCount = new IntByReference();
+        boolean success = Kernel32Ext.INSTANCE.GetProcessHandleCount(hProcess, 
handleCount);
+        return success ? handleCount.getValue() : 0L;
       }
     } catch (IOException e) {
       LOGGER.warn("Failed to get open file number, because ", e);
@@ -97,7 +109,9 @@ public class SystemRelatedFileMetrics implements IMetricSet {
 
   @Override
   public void unbindFrom(AbstractMetricService metricService) {
-    if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType() 
== SystemType.MAC)
+    if ((CONFIG.getSystemType() == SystemType.LINUX
+            || CONFIG.getSystemType() == SystemType.MAC
+            || CONFIG.getSystemType() == SystemType.WINDOWS)
         && !CONFIG.getPid().isEmpty()) {
       metricService.remove(
           MetricType.AUTO_GAUGE,
@@ -106,4 +120,10 @@ public class SystemRelatedFileMetrics implements 
IMetricSet {
           "open_file_handlers");
     }
   }
+
+  public interface Kernel32Ext extends Library {
+    Kernel32Ext INSTANCE = Native.load("kernel32", Kernel32Ext.class);
+
+    boolean GetProcessHandleCount(WinNT.HANDLE hProcess, IntByReference 
pdwHandleCount);
+  }
 }
diff --git 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
index 975576ac904..b83e0c5aa17 100644
--- 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
+++ 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
@@ -19,5 +19,505 @@
 
 package org.apache.iotdb.metrics.metricsets.disk;
 
-/** Disk Metrics Manager for Windows system, not implemented yet. */
-public class WindowsDiskMetricsManager implements IDiskMetricsManager {}
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Disk metrics manager for Windows system.
+ *
+ * <p>Windows does not expose Linux-like cumulative counters through procfs, 
so this implementation
+ * periodically samples Win32 performance counters and accumulates the 
observed per-second values
+ * into totals that match the Linux manager contract as closely as possible.
+ */
+public class WindowsDiskMetricsManager implements IDiskMetricsManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WindowsDiskMetricsManager.class);
+
+  private static final double BYTES_PER_KB = 1024.0;
+  private static final long UPDATE_SMALLEST_INTERVAL = 10000L;
+  private static final String POWER_SHELL = "powershell";
+  private static final String POWER_SHELL_NO_PROFILE = "-NoProfile";
+  private static final String POWER_SHELL_COMMAND = "-Command";
+  private static final String TOTAL_DISK_INSTANCE = "_Total";
+  private static final Charset WINDOWS_SHELL_CHARSET = 
getWindowsShellCharset();
+  private static final String DISK_QUERY =
+      "Get-CimInstance Win32_PerfFormattedData_PerfDisk_PhysicalDisk | "
+          + "Where-Object { $_.Name -ne '_Total' } | "
+          + "ForEach-Object { "
+          + "[string]::Concat("
+          + "$_.Name, [char]9, "
+          + "$_.DiskReadsPerSec, [char]9, "
+          + "$_.DiskWritesPerSec, [char]9, "
+          + "$_.DiskReadBytesPerSec, [char]9, "
+          + "$_.DiskWriteBytesPerSec, [char]9, "
+          + "$_.AvgDisksecPerRead, [char]9, "
+          + "$_.AvgDisksecPerWrite, [char]9, "
+          + "$_.PercentIdleTime, [char]9, "
+          + "$_.AvgDiskQueueLength) }";
+  private static final String PROCESS_QUERY_TEMPLATE =
+      "Get-CimInstance Win32_PerfFormattedData_PerfProc_Process | "
+          + "Where-Object { $_.IDProcess -eq %s } | "
+          + "ForEach-Object { "
+          + "[string]::Concat("
+          + "$_.IOReadOperationsPerSec, [char]9, "
+          + "$_.IOWriteOperationsPerSec, [char]9, "
+          + "$_.IOReadBytesPerSec, [char]9, "
+          + "$_.IOWriteBytesPerSec) }";
+
+  private final String processId;
+  private final Set<String> diskIdSet = new HashSet<>();
+
+  private long lastUpdateTime = 0L;
+  private long updateInterval = 1L;
+
+  private final Map<String, Long> lastReadOperationCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastWriteOperationCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastReadTimeCostForDisk = new HashMap<>();
+  private final Map<String, Long> lastWriteTimeCostForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedReadCountForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedWriteCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastReadSizeForDisk = new HashMap<>();
+  private final Map<String, Long> lastWriteSizeForDisk = new HashMap<>();
+  private final Map<String, Double> lastIoUtilsPercentageForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastQueueSizeForDisk = new HashMap<>();
+  private final Map<String, Double> lastAvgReadCostTimeOfEachOpsForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgWriteCostTimeOfEachOpsForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgSizeOfEachReadForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgSizeOfEachWriteForDisk = new 
HashMap<>();
+
+  private long lastReallyReadSizeForProcess = 0L;
+  private long lastReallyWriteSizeForProcess = 0L;
+  private long lastAttemptReadSizeForProcess = 0L;
+  private long lastAttemptWriteSizeForProcess = 0L;
+  private long lastReadOpsCountForProcess = 0L;
+  private long lastWriteOpsCountForProcess = 0L;
+
+  public WindowsDiskMetricsManager() {
+    processId = 
String.valueOf(MetricConfigDescriptor.getInstance().getMetricConfig().getPid());
+    collectDiskId();
+  }
+
+  @Override
+  public Map<String, Double> getReadDataSizeForDisk() {
+    checkUpdate();
+    return toKbMap(lastReadSizeForDisk);
+  }
+
+  @Override
+  public Map<String, Double> getWriteDataSizeForDisk() {
+    checkUpdate();
+    return toKbMap(lastWriteSizeForDisk);
+  }
+
+  @Override
+  public Map<String, Long> getReadOperationCountForDisk() {
+    checkUpdate();
+    return lastReadOperationCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getWriteOperationCountForDisk() {
+    checkUpdate();
+    return lastWriteOperationCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getReadCostTimeForDisk() {
+    checkUpdate();
+    return lastReadTimeCostForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getWriteCostTimeForDisk() {
+    checkUpdate();
+    return lastWriteTimeCostForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getIoUtilsPercentage() {
+    checkUpdate();
+    return lastIoUtilsPercentageForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgReadCostTimeOfEachOpsForDisk() {
+    checkUpdate();
+    return lastAvgReadCostTimeOfEachOpsForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgWriteCostTimeOfEachOpsForDisk() {
+    checkUpdate();
+    return lastAvgWriteCostTimeOfEachOpsForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgSizeOfEachReadForDisk() {
+    checkUpdate();
+    return lastAvgSizeOfEachReadForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgSizeOfEachWriteForDisk() {
+    checkUpdate();
+    return lastAvgSizeOfEachWriteForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getMergedWriteOperationForDisk() {
+    checkUpdate();
+    return lastMergedWriteCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getMergedReadOperationForDisk() {
+    checkUpdate();
+    return lastMergedReadCountForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getQueueSizeForDisk() {
+    checkUpdate();
+    return lastQueueSizeForDisk;
+  }
+
+  @Override
+  public double getActualReadDataSizeForProcess() {
+    checkUpdate();
+    return lastReallyReadSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public double getActualWriteDataSizeForProcess() {
+    checkUpdate();
+    return lastReallyWriteSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public long getReadOpsCountForProcess() {
+    checkUpdate();
+    return lastReadOpsCountForProcess;
+  }
+
+  @Override
+  public long getWriteOpsCountForProcess() {
+    checkUpdate();
+    return lastWriteOpsCountForProcess;
+  }
+
+  @Override
+  public double getAttemptReadSizeForProcess() {
+    checkUpdate();
+    return lastAttemptReadSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public double getAttemptWriteSizeForProcess() {
+    checkUpdate();
+    return lastAttemptWriteSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public Set<String> getDiskIds() {
+    checkUpdate();
+    return diskIdSet;
+  }
+
+  private void collectDiskId() {
+    Map<String, String[]> diskInfoMap = queryDiskInfo();
+    if (diskInfoMap.isEmpty()) {
+      return;
+    }
+    diskIdSet.clear();
+    diskIdSet.addAll(diskInfoMap.keySet());
+  }
+
+  private Map<String, Double> toKbMap(Map<String, Long> source) {
+    Map<String, Double> result = new HashMap<>(source.size());
+    for (Map.Entry<String, Long> entry : source.entrySet()) {
+      result.put(entry.getKey(), entry.getValue() / BYTES_PER_KB);
+    }
+    return result;
+  }
+
+  private void updateInfo() {
+    long currentTime = System.currentTimeMillis();
+    updateInterval = lastUpdateTime == 0L ? 0L : currentTime - lastUpdateTime;
+    lastUpdateTime = currentTime;
+    updateDiskInfo();
+    updateProcessInfo();
+  }
+
+  private void updateDiskInfo() {
+    Map<String, String[]> diskInfoMap = queryDiskInfo();
+    if (diskInfoMap.isEmpty()) {
+      return;
+    }
+
+    diskIdSet.clear();
+    diskIdSet.addAll(diskInfoMap.keySet());
+    pruneDiskMetricMaps();
+
+    for (Map.Entry<String, String[]> entry : diskInfoMap.entrySet()) {
+      String diskId = entry.getKey();
+      String[] diskInfo = entry.getValue();
+      long readOpsPerSec = parseLong(diskInfo[0]);
+      long writeOpsPerSec = parseLong(diskInfo[1]);
+      long readBytesPerSec = parseLong(diskInfo[2]);
+      long writeBytesPerSec = parseLong(diskInfo[3]);
+      double avgDiskSecPerRead = parseDouble(diskInfo[4]);
+      double avgDiskSecPerWrite = parseDouble(diskInfo[5]);
+      double percentIdleTime = parseDouble(diskInfo[6]);
+      double avgDiskQueueLength = parseDouble(diskInfo[7]);
+
+      long intervalMillis = updateInterval;
+      lastReadOperationCountForDisk.put(
+          diskId,
+          accumulate(lastReadOperationCountForDisk.get(diskId), readOpsPerSec, 
intervalMillis));
+      lastWriteOperationCountForDisk.put(
+          diskId,
+          accumulate(lastWriteOperationCountForDisk.get(diskId), 
writeOpsPerSec, intervalMillis));
+      lastMergedReadCountForDisk.put(diskId, 0L);
+      lastMergedWriteCountForDisk.put(diskId, 0L);
+      lastReadSizeForDisk.put(
+          diskId, accumulate(lastReadSizeForDisk.get(diskId), readBytesPerSec, 
intervalMillis));
+      lastWriteSizeForDisk.put(
+          diskId, accumulate(lastWriteSizeForDisk.get(diskId), 
writeBytesPerSec, intervalMillis));
+      lastReadTimeCostForDisk.put(
+          diskId,
+          accumulateTimeCost(
+              lastReadTimeCostForDisk.get(diskId),
+              avgDiskSecPerRead,
+              readOpsPerSec,
+              intervalMillis));
+      lastWriteTimeCostForDisk.put(
+          diskId,
+          accumulateTimeCost(
+              lastWriteTimeCostForDisk.get(diskId),
+              avgDiskSecPerWrite,
+              writeOpsPerSec,
+              intervalMillis));
+      lastIoUtilsPercentageForDisk.put(diskId, clampPercentage(1.0 - 
percentIdleTime / 100.0));
+      lastQueueSizeForDisk.put(diskId, avgDiskQueueLength);
+      lastAvgReadCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerRead * 
1000.0);
+      lastAvgWriteCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerWrite * 
1000.0);
+      lastAvgSizeOfEachReadForDisk.put(
+          diskId, readOpsPerSec == 0 ? 0.0 : ((double) readBytesPerSec) / 
readOpsPerSec);
+      lastAvgSizeOfEachWriteForDisk.put(
+          diskId, writeOpsPerSec == 0 ? 0.0 : ((double) writeBytesPerSec) / 
writeOpsPerSec);
+    }
+  }
+
+  private void pruneDiskMetricMaps() {
+    pruneDiskMetricMap(lastReadOperationCountForDisk);
+    pruneDiskMetricMap(lastWriteOperationCountForDisk);
+    pruneDiskMetricMap(lastReadTimeCostForDisk);
+    pruneDiskMetricMap(lastWriteTimeCostForDisk);
+    pruneDiskMetricMap(lastMergedReadCountForDisk);
+    pruneDiskMetricMap(lastMergedWriteCountForDisk);
+    pruneDiskMetricMap(lastReadSizeForDisk);
+    pruneDiskMetricMap(lastWriteSizeForDisk);
+    pruneDiskMetricMap(lastIoUtilsPercentageForDisk);
+    pruneDiskMetricMap(lastQueueSizeForDisk);
+    pruneDiskMetricMap(lastAvgReadCostTimeOfEachOpsForDisk);
+    pruneDiskMetricMap(lastAvgWriteCostTimeOfEachOpsForDisk);
+    pruneDiskMetricMap(lastAvgSizeOfEachReadForDisk);
+    pruneDiskMetricMap(lastAvgSizeOfEachWriteForDisk);
+  }
+
+  private <T> void pruneDiskMetricMap(Map<String, T> metricMap) {
+    metricMap.keySet().retainAll(diskIdSet);
+  }
+
+  private void updateProcessInfo() {
+    String processInfo = queryProcessInfo();
+    if (processInfo == null || processInfo.isEmpty()) {
+      return;
+    }
+
+    String[] processMetricArray = processInfo.split("\t");
+    if (processMetricArray.length < 4) {
+      LOGGER.warn("Unexpected windows process io info format: {}", 
processInfo);
+      return;
+    }
+
+    long readOpsPerSec = parseLong(processMetricArray[0]);
+    long writeOpsPerSec = parseLong(processMetricArray[1]);
+    long readBytesPerSec = parseLong(processMetricArray[2]);
+    long writeBytesPerSec = parseLong(processMetricArray[3]);
+
+    lastReadOpsCountForProcess =
+        accumulate(lastReadOpsCountForProcess, readOpsPerSec, updateInterval);
+    lastWriteOpsCountForProcess =
+        accumulate(lastWriteOpsCountForProcess, writeOpsPerSec, 
updateInterval);
+    lastReallyReadSizeForProcess =
+        accumulate(lastReallyReadSizeForProcess, readBytesPerSec, 
updateInterval);
+    lastReallyWriteSizeForProcess =
+        accumulate(lastReallyWriteSizeForProcess, writeBytesPerSec, 
updateInterval);
+
+    // Windows does not expose attempted read/write sizes directly in these 
counters.
+    lastAttemptReadSizeForProcess = lastReallyReadSizeForProcess;
+    lastAttemptWriteSizeForProcess = lastReallyWriteSizeForProcess;
+  }
+
+  private Map<String, String[]> queryDiskInfo() {
+    Map<String, String[]> result = new HashMap<>();
+    for (String line : executePowerShell(DISK_QUERY)) {
+      if (line == null || line.isEmpty()) {
+        continue;
+      }
+      String[] values = line.split("\t");
+      if (values.length < 9) {
+        LOGGER.warn("Unexpected windows disk io info format: {}", line);
+        continue;
+      }
+      String diskId = values[0].trim();
+      if (diskId.isEmpty() || TOTAL_DISK_INSTANCE.equals(diskId)) {
+        continue;
+      }
+      String[] metricArray = new String[8];
+      System.arraycopy(values, 1, metricArray, 0, metricArray.length);
+      result.put(diskId, metricArray);
+    }
+    return result;
+  }
+
+  private String queryProcessInfo() {
+    for (String line :
+        executePowerShell(
+            String.format(PROCESS_QUERY_TEMPLATE, 
escapeSingleQuotedPowerShell(processId)))) {
+      if (line != null && !line.isEmpty()) {
+        return line;
+      }
+    }
+    return null;
+  }
+
+  private String escapeSingleQuotedPowerShell(String value) {
+    return value.replace("'", "''");
+  }
+
+  private long accumulate(Long previousValue, long valuePerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue == null ? 0L : previousValue;
+    }
+    return (previousValue == null ? 0L : previousValue) + valuePerSec * 
intervalMillis / 1000L;
+  }
+
+  private long accumulate(long previousValue, long valuePerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue;
+    }
+    return previousValue + valuePerSec * intervalMillis / 1000L;
+  }
+
+  private long accumulateTimeCost(
+      Long previousValue, double avgTimeInSecond, long opsPerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue == null ? 0L : previousValue;
+    }
+    long previous = previousValue == null ? 0L : previousValue;
+    double operationCount = opsPerSec * intervalMillis / 1000.0;
+    return previous + Math.round(avgTimeInSecond * operationCount * 1000.0);
+  }
+
+  private long parseLong(String value) {
+    try {
+      return Math.round(Double.parseDouble(value.trim()));
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Failed to parse long value from windows disk metrics: {}", 
value, e);
+      return 0L;
+    }
+  }
+
+  private double parseDouble(String value) {
+    try {
+      return Double.parseDouble(value.trim());
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Failed to parse double value from windows disk metrics: 
{}", value, e);
+      return 0.0;
+    }
+  }
+
+  private double clampPercentage(double value) {
+    return Math.max(0.0, Math.min(1.0, value));
+  }
+
+  private List<String> executePowerShell(String command) {
+    List<String> result = new ArrayList<>();
+    List<String> rawOutput = new ArrayList<>();
+    Process process = null;
+    try {
+      process =
+          new ProcessBuilder(POWER_SHELL, POWER_SHELL_NO_PROFILE, 
POWER_SHELL_COMMAND, command)
+              .redirectErrorStream(true)
+              .start();
+      try (BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(process.getInputStream(), 
WINDOWS_SHELL_CHARSET))) {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          String trimmedLine = line.trim();
+          if (!trimmedLine.isEmpty()) {
+            rawOutput.add(trimmedLine);
+          }
+        }
+      }
+      int exitCode = process.waitFor();
+      if (exitCode != 0) {
+        LOGGER.warn(
+            "Failed to collect windows disk metrics, powershell exit code: {}, 
command {}, output {}",
+            exitCode,
+            command,
+            String.join(" | ", rawOutput));
+      } else {
+        result.addAll(rawOutput);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to execute powershell for windows disk metrics", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while collecting windows disk metrics", e);
+    } finally {
+      if (process != null) {
+        process.destroy();
+      }
+    }
+    return result;
+  }
+
+  private static Charset getWindowsShellCharset() {
+    String nativeEncoding = System.getProperty("sun.jnu.encoding");
+    if (nativeEncoding != null && Charset.isSupported(nativeEncoding)) {
+      return Charset.forName(nativeEncoding);
+    }
+
+    String fileEncoding = System.getProperty("file.encoding");
+    if (fileEncoding != null && Charset.isSupported(fileEncoding)) {
+      return Charset.forName(fileEncoding);
+    }
+
+    if (Charset.isSupported("GBK")) {
+      return Charset.forName("GBK");
+    }
+    return Charset.defaultCharset();
+  }
+
+  private void checkUpdate() {
+    if (System.currentTimeMillis() - lastUpdateTime > 
UPDATE_SMALLEST_INTERVAL) {
+      updateInfo();
+    }
+  }
+}
diff --git 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
index c3ecb4b8d50..7fb82ffaa0d 100644
--- 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
+++ 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
@@ -19,4 +19,201 @@
 
 package org.apache.iotdb.metrics.metricsets.net;
 
-public class WindowsNetMetricManager implements INetMetricManager {}
+import org.apache.iotdb.metrics.MetricConstant;
+import org.apache.iotdb.metrics.config.MetricConfig;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class WindowsNetMetricManager implements INetMetricManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WindowsNetMetricManager.class);
+
+  private static final MetricConfig METRIC_CONFIG =
+      MetricConfigDescriptor.getInstance().getMetricConfig();
+
+  private long lastUpdateTime = 0L;
+
+  private Set<String> ifaceSet = new HashSet<>();
+
+  private final Map<String, Long> receivedBytesMapForIface = new HashMap<>();
+
+  private final Map<String, Long> transmittedBytesMapForIface = new 
HashMap<>();
+
+  private final Map<String, Long> receivedPacketsMapForIface = new HashMap<>();
+
+  private final Map<String, Long> transmittedPacketsMapForIface = new 
HashMap<>();
+
+  private int connectionNum = 0;
+
+  public WindowsNetMetricManager() {
+    checkUpdate();
+  }
+
+  @Override
+  public Set<String> getIfaceSet() {
+    checkUpdate();
+    return ifaceSet;
+  }
+
+  @Override
+  public Map<String, Long> getReceivedByte() {
+    checkUpdate();
+    return receivedBytesMapForIface;
+  }
+
+  @Override
+  public Map<String, Long> getTransmittedBytes() {
+    checkUpdate();
+    return transmittedBytesMapForIface;
+  }
+
+  @Override
+  public Map<String, Long> getReceivedPackets() {
+    checkUpdate();
+    return receivedPacketsMapForIface;
+  }
+
+  @Override
+  public Map<String, Long> getTransmittedPackets() {
+    checkUpdate();
+    return transmittedPacketsMapForIface;
+  }
+
+  @Override
+  public int getConnectionNum() {
+    checkUpdate();
+    return connectionNum;
+  }
+
+  private void checkUpdate() {
+    if (System.currentTimeMillis() - lastUpdateTime >= 
MetricConstant.UPDATE_INTERVAL) {
+      updateNetStatus();
+    }
+  }
+
+  private void updateNetStatus() {
+    lastUpdateTime = System.currentTimeMillis();
+    if (ifaceSet.isEmpty()) {
+      updateInterfaces();
+    }
+    updateStatistics();
+    if (MetricLevel.higherOrEqual(MetricLevel.NORMAL, 
METRIC_CONFIG.getMetricLevel())) {
+      updateConnectionNum();
+    }
+  }
+
+  private void updateInterfaces() {
+    try {
+      ifaceSet.clear();
+      Process process =
+          Runtime.getRuntime()
+              .exec(
+                  "cmd.exe /c chcp 65001 > nul & powershell.exe -Command 
\"Get-NetAdapter -IncludeHidden | Select Name | Format-List \"");
+      BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(process.getInputStream(), 
StandardCharsets.UTF_8));
+      String line;
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (line.startsWith("Name :")) {
+          ifaceSet.add(line.substring("Name : ".length()).trim());
+        }
+      }
+      int exitCode = process.waitFor();
+      if (exitCode != 0) {
+        LOGGER.error("Failed to get interfaces, exit code: {}", exitCode);
+      }
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      LOGGER.error("Error updating interfaces", e);
+      ifaceSet.clear();
+    }
+  }
+
+  private void updateStatistics() {
+    try {
+      receivedBytesMapForIface.clear();
+      transmittedBytesMapForIface.clear();
+      receivedPacketsMapForIface.clear();
+      transmittedPacketsMapForIface.clear();
+      Process process =
+          Runtime.getRuntime()
+              .exec(
+                  "cmd.exe /c chcp 65001 > nul & powershell.exe -Command 
\"Get-NetAdapterStatistics -IncludeHidden | Format-List 
Name,ReceivedBytes,SentBytes,ReceivedUnicastPackets,SentUnicastPackets \"");
+      BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(process.getInputStream(), 
StandardCharsets.UTF_8));
+      String line;
+      String currentName = null;
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (line.startsWith("Name ")) {
+          currentName = line.substring(line.indexOf(": ") + 2).trim();
+        } else if (line.startsWith("ReceivedBytes ") && currentName != null) {
+          long value = Long.parseLong(line.substring(line.indexOf(": ") + 
2).trim());
+          receivedBytesMapForIface.put(currentName, value);
+        } else if (line.startsWith("SentBytes ") && currentName != null) {
+          long value = Long.parseLong(line.substring(line.indexOf(": ") + 
2).trim());
+          transmittedBytesMapForIface.put(currentName, value);
+        } else if (line.startsWith("ReceivedUnicastPackets ") && currentName 
!= null) {
+          long value = Long.parseLong(line.substring(line.indexOf(": ") + 
2).trim());
+          receivedPacketsMapForIface.put(currentName, value);
+        } else if (line.startsWith("SentUnicastPackets ") && currentName != 
null) {
+          long value = Long.parseLong(line.substring(line.indexOf(": ") + 
2).trim());
+          transmittedPacketsMapForIface.put(currentName, value);
+          currentName = null; // Reset after processing an interface
+        }
+      }
+      int exitCode = process.waitFor();
+      if (exitCode != 0) {
+        LOGGER.error("Failed to get statistics, exit code: {}", exitCode);
+      }
+    } catch (IOException | InterruptedException | NumberFormatException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      LOGGER.error("Error updating statistics", e);
+    }
+  }
+
+  private void updateConnectionNum() {
+    try {
+      Process process = Runtime.getRuntime().exec("netstat -ano");
+      BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()));
+      int count = 0;
+      String line;
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (!line.isEmpty() && !line.startsWith("Active") && 
!line.startsWith("Proto")) {
+          String[] parts = line.split("\\s+");
+          if (parts.length >= 5 && parts[parts.length - 
1].equals(METRIC_CONFIG.getPid())) {
+            count++;
+          }
+        }
+      }
+      this.connectionNum = count;
+      int exitCode = process.waitFor();
+      if (exitCode != 0) {
+        LOGGER.error("Failed to get connection num, exit code: {}", exitCode);
+      }
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      LOGGER.error("Error updating connection num", e);
+    }
+  }
+}

Reply via email to