This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch new_win_metric
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8ceca3be1193f53e1c44696d958e83759c98531c
Author: HTHou <[email protected]>
AuthorDate: Thu Mar 12 15:53:31 2026 +0800

    Implement WindowsDiskMetricsManager
---
 .../metricsets/disk/WindowsDiskMetricsManager.java | 476 ++++++++++++++++++++-
 1 file changed, 474 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
index 975576ac904..09a179e00a5 100644
--- 
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
+++ 
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
@@ -19,5 +19,477 @@
 
 package org.apache.iotdb.metrics.metricsets.disk;
 
-/** Disk Metrics Manager for Windows system, not implemented yet. */
-public class WindowsDiskMetricsManager implements IDiskMetricsManager {}
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Disk metrics manager for Windows system.
+ *
+ * <p>Windows does not expose Linux-like cumulative counters through procfs, 
so this implementation
+ * periodically samples Win32 performance counters and accumulates the 
observed per-second values
+ * into totals that match the Linux manager contract as closely as possible.
+ */
+public class WindowsDiskMetricsManager implements IDiskMetricsManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WindowsDiskMetricsManager.class);
+
+  private static final double BYTES_PER_KB = 1024.0;
+  private static final long UPDATE_SMALLEST_INTERVAL = 10000L;
+  private static final String POWER_SHELL = "powershell";
+  private static final String POWER_SHELL_NO_PROFILE = "-NoProfile";
+  private static final String POWER_SHELL_COMMAND = "-Command";
+  private static final String TOTAL_DISK_INSTANCE = "_Total";
+  private static final Charset WINDOWS_SHELL_CHARSET = 
getWindowsShellCharset();
+  private static final String DISK_QUERY =
+      "Get-CimInstance Win32_PerfFormattedData_PerfDisk_PhysicalDisk | "
+          + "Where-Object { $_.Name -ne '_Total' } | "
+          + "ForEach-Object { "
+          + "\"$($_.Name)$([char]9)"
+          + "$($_.DiskReadsPerSec)$([char]9)"
+          + "$($_.DiskWritesPerSec)$([char]9)"
+          + "$($_.DiskReadBytesPerSec)$([char]9)"
+          + "$($_.DiskWriteBytesPerSec)$([char]9)"
+          + "$($_.AvgDisksecPerRead)$([char]9)"
+          + "$($_.AvgDisksecPerWrite)$([char]9)"
+          + "$($_.PercentDiskTime)$([char]9)"
+          + "$($_.AvgDiskQueueLength)\" }";
+  private static final String PROCESS_QUERY_TEMPLATE =
+      "Get-CimInstance Win32_PerfFormattedData_PerfProc_Process | "
+          + "Where-Object { $_.IDProcess -eq %s } | "
+          + "ForEach-Object { "
+          + "\"$($_.IOReadOperationsPerSec)$([char]9)"
+          + "$($_.IOWriteOperationsPerSec)$([char]9)"
+          + "$($_.IOReadBytesPerSec)$([char]9)"
+          + "$($_.IOWriteBytesPerSec)\" }";
+
+  private final String processId;
+  private final Set<String> diskIdSet = new HashSet<>();
+
+  private long lastUpdateTime = 0L;
+  private long updateInterval = 1L;
+
+  private final Map<String, Long> lastReadOperationCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastWriteOperationCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastReadTimeCostForDisk = new HashMap<>();
+  private final Map<String, Long> lastWriteTimeCostForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedReadCountForDisk = new HashMap<>();
+  private final Map<String, Long> lastMergedWriteCountForDisk = new 
HashMap<>();
+  private final Map<String, Long> lastReadSizeForDisk = new HashMap<>();
+  private final Map<String, Long> lastWriteSizeForDisk = new HashMap<>();
+  private final Map<String, Double> lastIoUtilsPercentageForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastQueueSizeForDisk = new HashMap<>();
+  private final Map<String, Double> lastAvgReadCostTimeOfEachOpsForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgWriteCostTimeOfEachOpsForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgSizeOfEachReadForDisk = new 
HashMap<>();
+  private final Map<String, Double> lastAvgSizeOfEachWriteForDisk = new 
HashMap<>();
+
+  private long lastReallyReadSizeForProcess = 0L;
+  private long lastReallyWriteSizeForProcess = 0L;
+  private long lastAttemptReadSizeForProcess = 0L;
+  private long lastAttemptWriteSizeForProcess = 0L;
+  private long lastReadOpsCountForProcess = 0L;
+  private long lastWriteOpsCountForProcess = 0L;
+
+  public WindowsDiskMetricsManager() {
+    processId = 
String.valueOf(MetricConfigDescriptor.getInstance().getMetricConfig().getPid());
+    collectDiskId();
+  }
+
+  @Override
+  public Map<String, Double> getReadDataSizeForDisk() {
+    checkUpdate();
+    return toKbMap(lastReadSizeForDisk);
+  }
+
+  @Override
+  public Map<String, Double> getWriteDataSizeForDisk() {
+    checkUpdate();
+    return toKbMap(lastWriteSizeForDisk);
+  }
+
+  @Override
+  public Map<String, Long> getReadOperationCountForDisk() {
+    checkUpdate();
+    return lastReadOperationCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getWriteOperationCountForDisk() {
+    checkUpdate();
+    return lastWriteOperationCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getReadCostTimeForDisk() {
+    checkUpdate();
+    return lastReadTimeCostForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getWriteCostTimeForDisk() {
+    checkUpdate();
+    return lastWriteTimeCostForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getIoUtilsPercentage() {
+    checkUpdate();
+    return lastIoUtilsPercentageForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgReadCostTimeOfEachOpsForDisk() {
+    checkUpdate();
+    return lastAvgReadCostTimeOfEachOpsForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgWriteCostTimeOfEachOpsForDisk() {
+    checkUpdate();
+    return lastAvgWriteCostTimeOfEachOpsForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgSizeOfEachReadForDisk() {
+    checkUpdate();
+    return lastAvgSizeOfEachReadForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getAvgSizeOfEachWriteForDisk() {
+    checkUpdate();
+    return lastAvgSizeOfEachWriteForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getMergedWriteOperationForDisk() {
+    checkUpdate();
+    return lastMergedWriteCountForDisk;
+  }
+
+  @Override
+  public Map<String, Long> getMergedReadOperationForDisk() {
+    checkUpdate();
+    return lastMergedReadCountForDisk;
+  }
+
+  @Override
+  public Map<String, Double> getQueueSizeForDisk() {
+    checkUpdate();
+    return lastQueueSizeForDisk;
+  }
+
+  @Override
+  public double getActualReadDataSizeForProcess() {
+    checkUpdate();
+    return lastReallyReadSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public double getActualWriteDataSizeForProcess() {
+    checkUpdate();
+    return lastReallyWriteSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public long getReadOpsCountForProcess() {
+    checkUpdate();
+    return lastReadOpsCountForProcess;
+  }
+
+  @Override
+  public long getWriteOpsCountForProcess() {
+    checkUpdate();
+    return lastWriteOpsCountForProcess;
+  }
+
+  @Override
+  public double getAttemptReadSizeForProcess() {
+    checkUpdate();
+    return lastAttemptReadSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public double getAttemptWriteSizeForProcess() {
+    checkUpdate();
+    return lastAttemptWriteSizeForProcess / BYTES_PER_KB;
+  }
+
+  @Override
+  public Set<String> getDiskIds() {
+    checkUpdate();
+    return diskIdSet;
+  }
+
+  private void collectDiskId() {
+    Map<String, String[]> diskInfoMap = queryDiskInfo();
+    if (diskInfoMap.isEmpty()) {
+      return;
+    }
+    diskIdSet.clear();
+    diskIdSet.addAll(diskInfoMap.keySet());
+  }
+
+  private Map<String, Double> toKbMap(Map<String, Long> source) {
+    Map<String, Double> result = new HashMap<>(source.size());
+    for (Map.Entry<String, Long> entry : source.entrySet()) {
+      result.put(entry.getKey(), entry.getValue() / BYTES_PER_KB);
+    }
+    return result;
+  }
+
+  private void updateInfo() {
+    long currentTime = System.currentTimeMillis();
+    updateInterval = lastUpdateTime == 0L ? 0L : currentTime - lastUpdateTime;
+    lastUpdateTime = currentTime;
+    updateDiskInfo();
+    updateProcessInfo();
+  }
+
+  private void updateDiskInfo() {
+    Map<String, String[]> diskInfoMap = queryDiskInfo();
+    if (diskInfoMap.isEmpty()) {
+      return;
+    }
+
+    diskIdSet.clear();
+    diskIdSet.addAll(diskInfoMap.keySet());
+
+    for (Map.Entry<String, String[]> entry : diskInfoMap.entrySet()) {
+      String diskId = entry.getKey();
+      String[] diskInfo = entry.getValue();
+      long readOpsPerSec = parseLong(diskInfo[0]);
+      long writeOpsPerSec = parseLong(diskInfo[1]);
+      long readBytesPerSec = parseLong(diskInfo[2]);
+      long writeBytesPerSec = parseLong(diskInfo[3]);
+      double avgDiskSecPerRead = parseDouble(diskInfo[4]);
+      double avgDiskSecPerWrite = parseDouble(diskInfo[5]);
+      double percentDiskTime = parseDouble(diskInfo[6]);
+      double avgDiskQueueLength = parseDouble(diskInfo[7]);
+
+      long intervalMillis = updateInterval;
+      lastReadOperationCountForDisk.put(
+          diskId,
+          accumulate(lastReadOperationCountForDisk.get(diskId), readOpsPerSec, 
intervalMillis));
+      lastWriteOperationCountForDisk.put(
+          diskId,
+          accumulate(lastWriteOperationCountForDisk.get(diskId), 
writeOpsPerSec, intervalMillis));
+      lastMergedReadCountForDisk.put(diskId, 0L);
+      lastMergedWriteCountForDisk.put(diskId, 0L);
+      lastReadSizeForDisk.put(
+          diskId, accumulate(lastReadSizeForDisk.get(diskId), readBytesPerSec, 
intervalMillis));
+      lastWriteSizeForDisk.put(
+          diskId, accumulate(lastWriteSizeForDisk.get(diskId), 
writeBytesPerSec, intervalMillis));
+      lastReadTimeCostForDisk.put(
+          diskId,
+          accumulateTimeCost(
+              lastReadTimeCostForDisk.get(diskId),
+              avgDiskSecPerRead,
+              readOpsPerSec,
+              intervalMillis));
+      lastWriteTimeCostForDisk.put(
+          diskId,
+          accumulateTimeCost(
+              lastWriteTimeCostForDisk.get(diskId),
+              avgDiskSecPerWrite,
+              writeOpsPerSec,
+              intervalMillis));
+      lastIoUtilsPercentageForDisk.put(diskId, percentDiskTime / 100.0);
+      lastQueueSizeForDisk.put(diskId, avgDiskQueueLength);
+      lastAvgReadCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerRead * 
1000.0);
+      lastAvgWriteCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerWrite * 
1000.0);
+      lastAvgSizeOfEachReadForDisk.put(
+          diskId, readOpsPerSec == 0 ? 0.0 : ((double) readBytesPerSec) / 
readOpsPerSec);
+      lastAvgSizeOfEachWriteForDisk.put(
+          diskId, writeOpsPerSec == 0 ? 0.0 : ((double) writeBytesPerSec) / 
writeOpsPerSec);
+    }
+  }
+
+  private void updateProcessInfo() {
+    String processInfo = queryProcessInfo();
+    if (processInfo == null || processInfo.isEmpty()) {
+      return;
+    }
+
+    String[] processMetricArray = processInfo.split("\t");
+    if (processMetricArray.length < 4) {
+      LOGGER.warn("Unexpected windows process io info format: {}", 
processInfo);
+      return;
+    }
+
+    long readOpsPerSec = parseLong(processMetricArray[0]);
+    long writeOpsPerSec = parseLong(processMetricArray[1]);
+    long readBytesPerSec = parseLong(processMetricArray[2]);
+    long writeBytesPerSec = parseLong(processMetricArray[3]);
+
+    lastReadOpsCountForProcess =
+        accumulate(lastReadOpsCountForProcess, readOpsPerSec, updateInterval);
+    lastWriteOpsCountForProcess =
+        accumulate(lastWriteOpsCountForProcess, writeOpsPerSec, 
updateInterval);
+    lastReallyReadSizeForProcess =
+        accumulate(lastReallyReadSizeForProcess, readBytesPerSec, 
updateInterval);
+    lastReallyWriteSizeForProcess =
+        accumulate(lastReallyWriteSizeForProcess, writeBytesPerSec, 
updateInterval);
+
+    // Windows does not expose attempted read/write sizes directly in these 
counters.
+    lastAttemptReadSizeForProcess = lastReallyReadSizeForProcess;
+    lastAttemptWriteSizeForProcess = lastReallyWriteSizeForProcess;
+  }
+
+  private Map<String, String[]> queryDiskInfo() {
+    Map<String, String[]> result = new HashMap<>();
+    for (String line : executePowerShell(DISK_QUERY)) {
+      if (line == null || line.isEmpty()) {
+        continue;
+      }
+      String[] values = line.split("\t");
+      if (values.length < 9) {
+        LOGGER.warn("Unexpected windows disk io info format: {}", line);
+        continue;
+      }
+      String diskId = values[0].trim();
+      if (diskId.isEmpty() || TOTAL_DISK_INSTANCE.equals(diskId)) {
+        continue;
+      }
+      String[] metricArray = new String[8];
+      System.arraycopy(values, 1, metricArray, 0, metricArray.length);
+      result.put(diskId, metricArray);
+    }
+    return result;
+  }
+
+  private String queryProcessInfo() {
+    for (String line :
+        executePowerShell(
+            String.format(PROCESS_QUERY_TEMPLATE, 
escapeSingleQuotedPowerShell(processId)))) {
+      if (line != null && !line.isEmpty()) {
+        return line;
+      }
+    }
+    return null;
+  }
+
+  private String escapeSingleQuotedPowerShell(String value) {
+    return value.replace("'", "''");
+  }
+
+  private long accumulate(Long previousValue, long valuePerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue == null ? 0L : previousValue;
+    }
+    return (previousValue == null ? 0L : previousValue) + valuePerSec * 
intervalMillis / 1000L;
+  }
+
+  private long accumulate(long previousValue, long valuePerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue;
+    }
+    return previousValue + valuePerSec * intervalMillis / 1000L;
+  }
+
+  private long accumulateTimeCost(
+      Long previousValue, double avgTimeInSecond, long opsPerSec, long 
intervalMillis) {
+    if (intervalMillis <= 0L) {
+      return previousValue == null ? 0L : previousValue;
+    }
+    long previous = previousValue == null ? 0L : previousValue;
+    double operationCount = opsPerSec * intervalMillis / 1000.0;
+    return previous + Math.round(avgTimeInSecond * operationCount * 1000.0);
+  }
+
+  private long parseLong(String value) {
+    try {
+      return Math.round(Double.parseDouble(value.trim()));
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Failed to parse long value from windows disk metrics: {}", 
value, e);
+      return 0L;
+    }
+  }
+
+  private double parseDouble(String value) {
+    try {
+      return Double.parseDouble(value.trim());
+    } catch (NumberFormatException e) {
+      LOGGER.warn("Failed to parse double value from windows disk metrics: 
{}", value, e);
+      return 0.0;
+    }
+  }
+
+  private List<String> executePowerShell(String command) {
+    List<String> result = new ArrayList<>();
+    List<String> rawOutput = new ArrayList<>();
+    Process process = null;
+    try {
+      process =
+          new ProcessBuilder(POWER_SHELL, POWER_SHELL_NO_PROFILE, 
POWER_SHELL_COMMAND, command)
+              .redirectErrorStream(true)
+              .start();
+      try (BufferedReader reader =
+          new BufferedReader(
+              new InputStreamReader(process.getInputStream(), 
WINDOWS_SHELL_CHARSET))) {
+        String line;
+        while ((line = reader.readLine()) != null) {
+          String trimmedLine = line.trim();
+          if (!trimmedLine.isEmpty()) {
+            rawOutput.add(trimmedLine);
+          }
+        }
+      }
+      int exitCode = process.waitFor();
+      if (exitCode != 0) {
+        LOGGER.warn(
+            "Failed to collect windows disk metrics, powershell exit code: {}, 
command {}, output {}",
+            exitCode,
+            command,
+            String.join(" | ", rawOutput));
+      } else {
+        result.addAll(rawOutput);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Failed to execute powershell for windows disk metrics", e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted while collecting windows disk metrics", e);
+    } finally {
+      if (process != null) {
+        process.destroy();
+      }
+    }
+    return result;
+  }
+
+  private static Charset getWindowsShellCharset() {
+    String nativeEncoding = System.getProperty("sun.jnu.encoding");
+    if (nativeEncoding != null && Charset.isSupported(nativeEncoding)) {
+      return Charset.forName(nativeEncoding);
+    }
+
+    String fileEncoding = System.getProperty("file.encoding");
+    if (fileEncoding != null && Charset.isSupported(fileEncoding)) {
+      return Charset.forName(fileEncoding);
+    }
+
+    if (Charset.isSupported("GBK")) {
+      return Charset.forName("GBK");
+    }
+    return Charset.defaultCharset();
+  }
+
+  private void checkUpdate() {
+    if (System.currentTimeMillis() - lastUpdateTime > 
UPDATE_SMALLEST_INTERVAL) {
+      updateInfo();
+    }
+  }
+}

Reply via email to