This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 063794bf577 Implement and fix Windows disk metrics parsing (#17290)
063794bf577 is described below
commit 063794bf577865090246b6d6b673ae8067721efa
Author: Haonan <[email protected]>
AuthorDate: Fri Mar 20 10:15:22 2026 +0800
Implement and fix Windows disk metrics parsing (#17290)
* Implement WindowsDiskMetricsManager
* Fix Windows disk metric parsing
* previous work
* Fix PowerShell disk metric parsing
* Fix Windows disk metrics parsing
* Prune stale disk metric maps
* fix IT
* fix IT
* fix IT
---
.../iotdb/session/it/IoTDBConnectionsIT.java | 1 +
.../iotdb/session/it/pool/SessionPoolIT.java | 6 +-
.../metrics/file/SystemRelatedFileMetrics.java | 24 +-
.../metricsets/disk/WindowsDiskMetricsManager.java | 504 ++++++++++++++++++++-
.../metricsets/net/WindowsNetMetricManager.java | 199 +++++++-
5 files changed, 727 insertions(+), 7 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
index 80aa854e66c..591980548e2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBConnectionsIT.java
@@ -341,6 +341,7 @@ public class IoTDBConnectionsIT {
TimeUnit.SECONDS.sleep(1);
}
}
+ TimeUnit.SECONDS.sleep(5);
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
index d8fb9bb4c50..f42d4237ea9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/pool/SessionPoolIT.java
@@ -242,7 +242,7 @@ public class SessionPoolIT {
}
@Test
- public void tryIfTheServerIsRestart() {
+ public void tryIfTheServerIsRestart() throws InterruptedException {
ISessionPool pool = EnvFactory.getEnv().getSessionPool(3);
SessionDataSetWrapper wrapper = null;
BaseNodeWrapper node = EnvFactory.getEnv().getDataNodeWrapper(0);
@@ -268,6 +268,7 @@ public class SessionPoolIT {
.ensureNodeStatus(
Collections.singletonList(node),
Collections.singletonList(NodeStatus.Running));
pool = EnvFactory.getEnv().getSessionPool(3);
+ TimeUnit.SECONDS.sleep(5);
correctQuery(pool, DEFAULT_QUERY_TIMEOUT);
pool.close();
return;
@@ -335,7 +336,7 @@ public class SessionPoolIT {
}
@Test
- public void restart() {
+ public void restart() throws InterruptedException {
ISessionPool pool = EnvFactory.getEnv().getSessionPool(1);
write10Data(pool, true);
// stop the server.
@@ -353,6 +354,7 @@ public class SessionPoolIT {
EnvFactory.getEnv()
.ensureNodeStatus(
Collections.singletonList(node),
Collections.singletonList(NodeStatus.Running));
+ TimeUnit.SECONDS.sleep(5);
write10Data(pool, true);
pool.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
index 3fac0d10a16..b9b9e89e82c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java
@@ -29,6 +29,11 @@ import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.metrics.utils.MetricType;
import org.apache.iotdb.metrics.utils.SystemType;
+import com.sun.jna.Library;
+import com.sun.jna.Native;
+import com.sun.jna.platform.win32.Kernel32;
+import com.sun.jna.platform.win32.WinNT;
+import com.sun.jna.ptr.IntByReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +57,9 @@ public class SystemRelatedFileMetrics implements IMetricSet {
@Override
public void bindTo(AbstractMetricService metricService) {
- if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType()
== SystemType.MAC)
+ if ((CONFIG.getSystemType() == SystemType.LINUX
+ || CONFIG.getSystemType() == SystemType.MAC
+ || CONFIG.getSystemType() == SystemType.WINDOWS)
&& !CONFIG.getPid().isEmpty()) {
this.getOpenFileNumberCommand =
new String[] {"/bin/sh", "-c", String.format("lsof -p %s | wc -l",
CONFIG.getPid())};
@@ -88,6 +95,11 @@ public class SystemRelatedFileMetrics implements IMetricSet {
}
}
fdCount = Long.parseLong(result.toString().trim());
+ } else if (CONFIG.getSystemType() == SystemType.WINDOWS) {
+ WinNT.HANDLE hProcess = Kernel32.INSTANCE.GetCurrentProcess();
+ IntByReference handleCount = new IntByReference();
+ boolean success = Kernel32Ext.INSTANCE.GetProcessHandleCount(hProcess,
handleCount);
+ return success ? handleCount.getValue() : 0L;
}
} catch (IOException e) {
LOGGER.warn("Failed to get open file number, because ", e);
@@ -97,7 +109,9 @@ public class SystemRelatedFileMetrics implements IMetricSet {
@Override
public void unbindFrom(AbstractMetricService metricService) {
- if ((CONFIG.getSystemType() == SystemType.LINUX || CONFIG.getSystemType()
== SystemType.MAC)
+ if ((CONFIG.getSystemType() == SystemType.LINUX
+ || CONFIG.getSystemType() == SystemType.MAC
+ || CONFIG.getSystemType() == SystemType.WINDOWS)
&& !CONFIG.getPid().isEmpty()) {
metricService.remove(
MetricType.AUTO_GAUGE,
@@ -106,4 +120,10 @@ public class SystemRelatedFileMetrics implements
IMetricSet {
"open_file_handlers");
}
}
+
+ public interface Kernel32Ext extends Library {
+ Kernel32Ext INSTANCE = Native.load("kernel32", Kernel32Ext.class);
+
+ boolean GetProcessHandleCount(WinNT.HANDLE hProcess, IntByReference
pdwHandleCount);
+ }
}
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
index 975576ac904..b83e0c5aa17 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java
@@ -19,5 +19,505 @@
package org.apache.iotdb.metrics.metricsets.disk;
-/** Disk Metrics Manager for Windows system, not implemented yet. */
-public class WindowsDiskMetricsManager implements IDiskMetricsManager {}
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Disk metrics manager for Windows system.
+ *
+ * <p>Windows does not expose Linux-like cumulative counters through procfs,
so this implementation
+ * periodically samples Win32 performance counters and accumulates the
observed per-second values
+ * into totals that match the Linux manager contract as closely as possible.
+ */
+public class WindowsDiskMetricsManager implements IDiskMetricsManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WindowsDiskMetricsManager.class);
+
+ private static final double BYTES_PER_KB = 1024.0;
+ private static final long UPDATE_SMALLEST_INTERVAL = 10000L;
+ private static final String POWER_SHELL = "powershell";
+ private static final String POWER_SHELL_NO_PROFILE = "-NoProfile";
+ private static final String POWER_SHELL_COMMAND = "-Command";
+ private static final String TOTAL_DISK_INSTANCE = "_Total";
+ private static final Charset WINDOWS_SHELL_CHARSET =
getWindowsShellCharset();
+ private static final String DISK_QUERY =
+ "Get-CimInstance Win32_PerfFormattedData_PerfDisk_PhysicalDisk | "
+ + "Where-Object { $_.Name -ne '_Total' } | "
+ + "ForEach-Object { "
+ + "[string]::Concat("
+ + "$_.Name, [char]9, "
+ + "$_.DiskReadsPerSec, [char]9, "
+ + "$_.DiskWritesPerSec, [char]9, "
+ + "$_.DiskReadBytesPerSec, [char]9, "
+ + "$_.DiskWriteBytesPerSec, [char]9, "
+ + "$_.AvgDisksecPerRead, [char]9, "
+ + "$_.AvgDisksecPerWrite, [char]9, "
+ + "$_.PercentIdleTime, [char]9, "
+ + "$_.AvgDiskQueueLength) }";
+ private static final String PROCESS_QUERY_TEMPLATE =
+ "Get-CimInstance Win32_PerfFormattedData_PerfProc_Process | "
+ + "Where-Object { $_.IDProcess -eq %s } | "
+ + "ForEach-Object { "
+ + "[string]::Concat("
+ + "$_.IOReadOperationsPerSec, [char]9, "
+ + "$_.IOWriteOperationsPerSec, [char]9, "
+ + "$_.IOReadBytesPerSec, [char]9, "
+ + "$_.IOWriteBytesPerSec) }";
+
+ private final String processId;
+ private final Set<String> diskIdSet = new HashSet<>();
+
+ private long lastUpdateTime = 0L;
+ private long updateInterval = 1L;
+
+ private final Map<String, Long> lastReadOperationCountForDisk = new
HashMap<>();
+ private final Map<String, Long> lastWriteOperationCountForDisk = new
HashMap<>();
+ private final Map<String, Long> lastReadTimeCostForDisk = new HashMap<>();
+ private final Map<String, Long> lastWriteTimeCostForDisk = new HashMap<>();
+ private final Map<String, Long> lastMergedReadCountForDisk = new HashMap<>();
+ private final Map<String, Long> lastMergedWriteCountForDisk = new
HashMap<>();
+ private final Map<String, Long> lastReadSizeForDisk = new HashMap<>();
+ private final Map<String, Long> lastWriteSizeForDisk = new HashMap<>();
+ private final Map<String, Double> lastIoUtilsPercentageForDisk = new
HashMap<>();
+ private final Map<String, Double> lastQueueSizeForDisk = new HashMap<>();
+ private final Map<String, Double> lastAvgReadCostTimeOfEachOpsForDisk = new
HashMap<>();
+ private final Map<String, Double> lastAvgWriteCostTimeOfEachOpsForDisk = new
HashMap<>();
+ private final Map<String, Double> lastAvgSizeOfEachReadForDisk = new
HashMap<>();
+ private final Map<String, Double> lastAvgSizeOfEachWriteForDisk = new
HashMap<>();
+
+ private long lastReallyReadSizeForProcess = 0L;
+ private long lastReallyWriteSizeForProcess = 0L;
+ private long lastAttemptReadSizeForProcess = 0L;
+ private long lastAttemptWriteSizeForProcess = 0L;
+ private long lastReadOpsCountForProcess = 0L;
+ private long lastWriteOpsCountForProcess = 0L;
+
+ public WindowsDiskMetricsManager() {
+ processId =
String.valueOf(MetricConfigDescriptor.getInstance().getMetricConfig().getPid());
+ collectDiskId();
+ }
+
+ @Override
+ public Map<String, Double> getReadDataSizeForDisk() {
+ checkUpdate();
+ return toKbMap(lastReadSizeForDisk);
+ }
+
+ @Override
+ public Map<String, Double> getWriteDataSizeForDisk() {
+ checkUpdate();
+ return toKbMap(lastWriteSizeForDisk);
+ }
+
+ @Override
+ public Map<String, Long> getReadOperationCountForDisk() {
+ checkUpdate();
+ return lastReadOperationCountForDisk;
+ }
+
+ @Override
+ public Map<String, Long> getWriteOperationCountForDisk() {
+ checkUpdate();
+ return lastWriteOperationCountForDisk;
+ }
+
+ @Override
+ public Map<String, Long> getReadCostTimeForDisk() {
+ checkUpdate();
+ return lastReadTimeCostForDisk;
+ }
+
+ @Override
+ public Map<String, Long> getWriteCostTimeForDisk() {
+ checkUpdate();
+ return lastWriteTimeCostForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getIoUtilsPercentage() {
+ checkUpdate();
+ return lastIoUtilsPercentageForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getAvgReadCostTimeOfEachOpsForDisk() {
+ checkUpdate();
+ return lastAvgReadCostTimeOfEachOpsForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getAvgWriteCostTimeOfEachOpsForDisk() {
+ checkUpdate();
+ return lastAvgWriteCostTimeOfEachOpsForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getAvgSizeOfEachReadForDisk() {
+ checkUpdate();
+ return lastAvgSizeOfEachReadForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getAvgSizeOfEachWriteForDisk() {
+ checkUpdate();
+ return lastAvgSizeOfEachWriteForDisk;
+ }
+
+ @Override
+ public Map<String, Long> getMergedWriteOperationForDisk() {
+ checkUpdate();
+ return lastMergedWriteCountForDisk;
+ }
+
+ @Override
+ public Map<String, Long> getMergedReadOperationForDisk() {
+ checkUpdate();
+ return lastMergedReadCountForDisk;
+ }
+
+ @Override
+ public Map<String, Double> getQueueSizeForDisk() {
+ checkUpdate();
+ return lastQueueSizeForDisk;
+ }
+
+ @Override
+ public double getActualReadDataSizeForProcess() {
+ checkUpdate();
+ return lastReallyReadSizeForProcess / BYTES_PER_KB;
+ }
+
+ @Override
+ public double getActualWriteDataSizeForProcess() {
+ checkUpdate();
+ return lastReallyWriteSizeForProcess / BYTES_PER_KB;
+ }
+
+ @Override
+ public long getReadOpsCountForProcess() {
+ checkUpdate();
+ return lastReadOpsCountForProcess;
+ }
+
+ @Override
+ public long getWriteOpsCountForProcess() {
+ checkUpdate();
+ return lastWriteOpsCountForProcess;
+ }
+
+ @Override
+ public double getAttemptReadSizeForProcess() {
+ checkUpdate();
+ return lastAttemptReadSizeForProcess / BYTES_PER_KB;
+ }
+
+ @Override
+ public double getAttemptWriteSizeForProcess() {
+ checkUpdate();
+ return lastAttemptWriteSizeForProcess / BYTES_PER_KB;
+ }
+
+ @Override
+ public Set<String> getDiskIds() {
+ checkUpdate();
+ return diskIdSet;
+ }
+
+ private void collectDiskId() {
+ Map<String, String[]> diskInfoMap = queryDiskInfo();
+ if (diskInfoMap.isEmpty()) {
+ return;
+ }
+ diskIdSet.clear();
+ diskIdSet.addAll(diskInfoMap.keySet());
+ }
+
+ private Map<String, Double> toKbMap(Map<String, Long> source) {
+ Map<String, Double> result = new HashMap<>(source.size());
+ for (Map.Entry<String, Long> entry : source.entrySet()) {
+ result.put(entry.getKey(), entry.getValue() / BYTES_PER_KB);
+ }
+ return result;
+ }
+
+ private void updateInfo() {
+ long currentTime = System.currentTimeMillis();
+ updateInterval = lastUpdateTime == 0L ? 0L : currentTime - lastUpdateTime;
+ lastUpdateTime = currentTime;
+ updateDiskInfo();
+ updateProcessInfo();
+ }
+
+ private void updateDiskInfo() {
+ Map<String, String[]> diskInfoMap = queryDiskInfo();
+ if (diskInfoMap.isEmpty()) {
+ return;
+ }
+
+ diskIdSet.clear();
+ diskIdSet.addAll(diskInfoMap.keySet());
+ pruneDiskMetricMaps();
+
+ for (Map.Entry<String, String[]> entry : diskInfoMap.entrySet()) {
+ String diskId = entry.getKey();
+ String[] diskInfo = entry.getValue();
+ long readOpsPerSec = parseLong(diskInfo[0]);
+ long writeOpsPerSec = parseLong(diskInfo[1]);
+ long readBytesPerSec = parseLong(diskInfo[2]);
+ long writeBytesPerSec = parseLong(diskInfo[3]);
+ double avgDiskSecPerRead = parseDouble(diskInfo[4]);
+ double avgDiskSecPerWrite = parseDouble(diskInfo[5]);
+ double percentIdleTime = parseDouble(diskInfo[6]);
+ double avgDiskQueueLength = parseDouble(diskInfo[7]);
+
+ long intervalMillis = updateInterval;
+ lastReadOperationCountForDisk.put(
+ diskId,
+ accumulate(lastReadOperationCountForDisk.get(diskId), readOpsPerSec,
intervalMillis));
+ lastWriteOperationCountForDisk.put(
+ diskId,
+ accumulate(lastWriteOperationCountForDisk.get(diskId),
writeOpsPerSec, intervalMillis));
+ lastMergedReadCountForDisk.put(diskId, 0L);
+ lastMergedWriteCountForDisk.put(diskId, 0L);
+ lastReadSizeForDisk.put(
+ diskId, accumulate(lastReadSizeForDisk.get(diskId), readBytesPerSec,
intervalMillis));
+ lastWriteSizeForDisk.put(
+ diskId, accumulate(lastWriteSizeForDisk.get(diskId),
writeBytesPerSec, intervalMillis));
+ lastReadTimeCostForDisk.put(
+ diskId,
+ accumulateTimeCost(
+ lastReadTimeCostForDisk.get(diskId),
+ avgDiskSecPerRead,
+ readOpsPerSec,
+ intervalMillis));
+ lastWriteTimeCostForDisk.put(
+ diskId,
+ accumulateTimeCost(
+ lastWriteTimeCostForDisk.get(diskId),
+ avgDiskSecPerWrite,
+ writeOpsPerSec,
+ intervalMillis));
+ lastIoUtilsPercentageForDisk.put(diskId, clampPercentage(1.0 -
percentIdleTime / 100.0));
+ lastQueueSizeForDisk.put(diskId, avgDiskQueueLength);
+ lastAvgReadCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerRead *
1000.0);
+ lastAvgWriteCostTimeOfEachOpsForDisk.put(diskId, avgDiskSecPerWrite *
1000.0);
+ lastAvgSizeOfEachReadForDisk.put(
+ diskId, readOpsPerSec == 0 ? 0.0 : ((double) readBytesPerSec) /
readOpsPerSec);
+ lastAvgSizeOfEachWriteForDisk.put(
+ diskId, writeOpsPerSec == 0 ? 0.0 : ((double) writeBytesPerSec) /
writeOpsPerSec);
+ }
+ }
+
+ private void pruneDiskMetricMaps() {
+ pruneDiskMetricMap(lastReadOperationCountForDisk);
+ pruneDiskMetricMap(lastWriteOperationCountForDisk);
+ pruneDiskMetricMap(lastReadTimeCostForDisk);
+ pruneDiskMetricMap(lastWriteTimeCostForDisk);
+ pruneDiskMetricMap(lastMergedReadCountForDisk);
+ pruneDiskMetricMap(lastMergedWriteCountForDisk);
+ pruneDiskMetricMap(lastReadSizeForDisk);
+ pruneDiskMetricMap(lastWriteSizeForDisk);
+ pruneDiskMetricMap(lastIoUtilsPercentageForDisk);
+ pruneDiskMetricMap(lastQueueSizeForDisk);
+ pruneDiskMetricMap(lastAvgReadCostTimeOfEachOpsForDisk);
+ pruneDiskMetricMap(lastAvgWriteCostTimeOfEachOpsForDisk);
+ pruneDiskMetricMap(lastAvgSizeOfEachReadForDisk);
+ pruneDiskMetricMap(lastAvgSizeOfEachWriteForDisk);
+ }
+
+ private <T> void pruneDiskMetricMap(Map<String, T> metricMap) {
+ metricMap.keySet().retainAll(diskIdSet);
+ }
+
+ private void updateProcessInfo() {
+ String processInfo = queryProcessInfo();
+ if (processInfo == null || processInfo.isEmpty()) {
+ return;
+ }
+
+ String[] processMetricArray = processInfo.split("\t");
+ if (processMetricArray.length < 4) {
+ LOGGER.warn("Unexpected windows process io info format: {}",
processInfo);
+ return;
+ }
+
+ long readOpsPerSec = parseLong(processMetricArray[0]);
+ long writeOpsPerSec = parseLong(processMetricArray[1]);
+ long readBytesPerSec = parseLong(processMetricArray[2]);
+ long writeBytesPerSec = parseLong(processMetricArray[3]);
+
+ lastReadOpsCountForProcess =
+ accumulate(lastReadOpsCountForProcess, readOpsPerSec, updateInterval);
+ lastWriteOpsCountForProcess =
+ accumulate(lastWriteOpsCountForProcess, writeOpsPerSec,
updateInterval);
+ lastReallyReadSizeForProcess =
+ accumulate(lastReallyReadSizeForProcess, readBytesPerSec,
updateInterval);
+ lastReallyWriteSizeForProcess =
+ accumulate(lastReallyWriteSizeForProcess, writeBytesPerSec,
updateInterval);
+
+ // Windows does not expose attempted read/write sizes directly in these
counters.
+ lastAttemptReadSizeForProcess = lastReallyReadSizeForProcess;
+ lastAttemptWriteSizeForProcess = lastReallyWriteSizeForProcess;
+ }
+
+ private Map<String, String[]> queryDiskInfo() {
+ Map<String, String[]> result = new HashMap<>();
+ for (String line : executePowerShell(DISK_QUERY)) {
+ if (line == null || line.isEmpty()) {
+ continue;
+ }
+ String[] values = line.split("\t");
+ if (values.length < 9) {
+ LOGGER.warn("Unexpected windows disk io info format: {}", line);
+ continue;
+ }
+ String diskId = values[0].trim();
+ if (diskId.isEmpty() || TOTAL_DISK_INSTANCE.equals(diskId)) {
+ continue;
+ }
+ String[] metricArray = new String[8];
+ System.arraycopy(values, 1, metricArray, 0, metricArray.length);
+ result.put(diskId, metricArray);
+ }
+ return result;
+ }
+
+ private String queryProcessInfo() {
+ for (String line :
+ executePowerShell(
+ String.format(PROCESS_QUERY_TEMPLATE,
escapeSingleQuotedPowerShell(processId)))) {
+ if (line != null && !line.isEmpty()) {
+ return line;
+ }
+ }
+ return null;
+ }
+
+ private String escapeSingleQuotedPowerShell(String value) {
+ return value.replace("'", "''");
+ }
+
+ private long accumulate(Long previousValue, long valuePerSec, long
intervalMillis) {
+ if (intervalMillis <= 0L) {
+ return previousValue == null ? 0L : previousValue;
+ }
+ return (previousValue == null ? 0L : previousValue) + valuePerSec *
intervalMillis / 1000L;
+ }
+
+ private long accumulate(long previousValue, long valuePerSec, long
intervalMillis) {
+ if (intervalMillis <= 0L) {
+ return previousValue;
+ }
+ return previousValue + valuePerSec * intervalMillis / 1000L;
+ }
+
+ private long accumulateTimeCost(
+ Long previousValue, double avgTimeInSecond, long opsPerSec, long
intervalMillis) {
+ if (intervalMillis <= 0L) {
+ return previousValue == null ? 0L : previousValue;
+ }
+ long previous = previousValue == null ? 0L : previousValue;
+ double operationCount = opsPerSec * intervalMillis / 1000.0;
+ return previous + Math.round(avgTimeInSecond * operationCount * 1000.0);
+ }
+
+ private long parseLong(String value) {
+ try {
+ return Math.round(Double.parseDouble(value.trim()));
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Failed to parse long value from windows disk metrics: {}",
value, e);
+ return 0L;
+ }
+ }
+
+ private double parseDouble(String value) {
+ try {
+ return Double.parseDouble(value.trim());
+ } catch (NumberFormatException e) {
+ LOGGER.warn("Failed to parse double value from windows disk metrics:
{}", value, e);
+ return 0.0;
+ }
+ }
+
+ private double clampPercentage(double value) {
+ return Math.max(0.0, Math.min(1.0, value));
+ }
+
+ private List<String> executePowerShell(String command) {
+ List<String> result = new ArrayList<>();
+ List<String> rawOutput = new ArrayList<>();
+ Process process = null;
+ try {
+ process =
+ new ProcessBuilder(POWER_SHELL, POWER_SHELL_NO_PROFILE,
POWER_SHELL_COMMAND, command)
+ .redirectErrorStream(true)
+ .start();
+ try (BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(process.getInputStream(),
WINDOWS_SHELL_CHARSET))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ String trimmedLine = line.trim();
+ if (!trimmedLine.isEmpty()) {
+ rawOutput.add(trimmedLine);
+ }
+ }
+ }
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ LOGGER.warn(
+ "Failed to collect windows disk metrics, powershell exit code: {},
command {}, output {}",
+ exitCode,
+ command,
+ String.join(" | ", rawOutput));
+ } else {
+ result.addAll(rawOutput);
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Failed to execute powershell for windows disk metrics", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while collecting windows disk metrics", e);
+ } finally {
+ if (process != null) {
+ process.destroy();
+ }
+ }
+ return result;
+ }
+
+ private static Charset getWindowsShellCharset() {
+ String nativeEncoding = System.getProperty("sun.jnu.encoding");
+ if (nativeEncoding != null && Charset.isSupported(nativeEncoding)) {
+ return Charset.forName(nativeEncoding);
+ }
+
+ String fileEncoding = System.getProperty("file.encoding");
+ if (fileEncoding != null && Charset.isSupported(fileEncoding)) {
+ return Charset.forName(fileEncoding);
+ }
+
+ if (Charset.isSupported("GBK")) {
+ return Charset.forName("GBK");
+ }
+ return Charset.defaultCharset();
+ }
+
+ private void checkUpdate() {
+ if (System.currentTimeMillis() - lastUpdateTime >
UPDATE_SMALLEST_INTERVAL) {
+ updateInfo();
+ }
+ }
+}
diff --git
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
index c3ecb4b8d50..7fb82ffaa0d 100644
---
a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
+++
b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java
@@ -19,4 +19,201 @@
package org.apache.iotdb.metrics.metricsets.net;
-public class WindowsNetMetricManager implements INetMetricManager {}
+import org.apache.iotdb.metrics.MetricConstant;
+import org.apache.iotdb.metrics.config.MetricConfig;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class WindowsNetMetricManager implements INetMetricManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(WindowsNetMetricManager.class);
+
+ private static final MetricConfig METRIC_CONFIG =
+ MetricConfigDescriptor.getInstance().getMetricConfig();
+
+ private long lastUpdateTime = 0L;
+
+ private Set<String> ifaceSet = new HashSet<>();
+
+ private final Map<String, Long> receivedBytesMapForIface = new HashMap<>();
+
+ private final Map<String, Long> transmittedBytesMapForIface = new
HashMap<>();
+
+ private final Map<String, Long> receivedPacketsMapForIface = new HashMap<>();
+
+ private final Map<String, Long> transmittedPacketsMapForIface = new
HashMap<>();
+
+ private int connectionNum = 0;
+
+ public WindowsNetMetricManager() {
+ checkUpdate();
+ }
+
+ @Override
+ public Set<String> getIfaceSet() {
+ checkUpdate();
+ return ifaceSet;
+ }
+
+ @Override
+ public Map<String, Long> getReceivedByte() {
+ checkUpdate();
+ return receivedBytesMapForIface;
+ }
+
+ @Override
+ public Map<String, Long> getTransmittedBytes() {
+ checkUpdate();
+ return transmittedBytesMapForIface;
+ }
+
+ @Override
+ public Map<String, Long> getReceivedPackets() {
+ checkUpdate();
+ return receivedPacketsMapForIface;
+ }
+
+ @Override
+ public Map<String, Long> getTransmittedPackets() {
+ checkUpdate();
+ return transmittedPacketsMapForIface;
+ }
+
+ @Override
+ public int getConnectionNum() {
+ checkUpdate();
+ return connectionNum;
+ }
+
+ private void checkUpdate() {
+ if (System.currentTimeMillis() - lastUpdateTime >=
MetricConstant.UPDATE_INTERVAL) {
+ updateNetStatus();
+ }
+ }
+
+ private void updateNetStatus() {
+ lastUpdateTime = System.currentTimeMillis();
+ if (ifaceSet.isEmpty()) {
+ updateInterfaces();
+ }
+ updateStatistics();
+ if (MetricLevel.higherOrEqual(MetricLevel.NORMAL,
METRIC_CONFIG.getMetricLevel())) {
+ updateConnectionNum();
+ }
+ }
+
+ private void updateInterfaces() {
+ try {
+ ifaceSet.clear();
+ Process process =
+ Runtime.getRuntime()
+ .exec(
+ "cmd.exe /c chcp 65001 > nul & powershell.exe -Command
\"Get-NetAdapter -IncludeHidden | Select Name | Format-List \"");
+ BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(process.getInputStream(),
StandardCharsets.UTF_8));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (line.startsWith("Name :")) {
+ ifaceSet.add(line.substring("Name : ".length()).trim());
+ }
+ }
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ LOGGER.error("Failed to get interfaces, exit code: {}", exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ LOGGER.error("Error updating interfaces", e);
+ ifaceSet.clear();
+ }
+ }
+
+ private void updateStatistics() {
+ try {
+ receivedBytesMapForIface.clear();
+ transmittedBytesMapForIface.clear();
+ receivedPacketsMapForIface.clear();
+ transmittedPacketsMapForIface.clear();
+ Process process =
+ Runtime.getRuntime()
+ .exec(
+ "cmd.exe /c chcp 65001 > nul & powershell.exe -Command
\"Get-NetAdapterStatistics -IncludeHidden | Format-List
Name,ReceivedBytes,SentBytes,ReceivedUnicastPackets,SentUnicastPackets \"");
+ BufferedReader reader =
+ new BufferedReader(
+ new InputStreamReader(process.getInputStream(),
StandardCharsets.UTF_8));
+ String line;
+ String currentName = null;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (line.startsWith("Name ")) {
+ currentName = line.substring(line.indexOf(": ") + 2).trim();
+ } else if (line.startsWith("ReceivedBytes ") && currentName != null) {
+ long value = Long.parseLong(line.substring(line.indexOf(": ") +
2).trim());
+ receivedBytesMapForIface.put(currentName, value);
+ } else if (line.startsWith("SentBytes ") && currentName != null) {
+ long value = Long.parseLong(line.substring(line.indexOf(": ") +
2).trim());
+ transmittedBytesMapForIface.put(currentName, value);
+ } else if (line.startsWith("ReceivedUnicastPackets ") && currentName
!= null) {
+ long value = Long.parseLong(line.substring(line.indexOf(": ") +
2).trim());
+ receivedPacketsMapForIface.put(currentName, value);
+ } else if (line.startsWith("SentUnicastPackets ") && currentName !=
null) {
+ long value = Long.parseLong(line.substring(line.indexOf(": ") +
2).trim());
+ transmittedPacketsMapForIface.put(currentName, value);
+ currentName = null; // Reset after processing an interface
+ }
+ }
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ LOGGER.error("Failed to get statistics, exit code: {}", exitCode);
+ }
+ } catch (IOException | InterruptedException | NumberFormatException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ LOGGER.error("Error updating statistics", e);
+ }
+ }
+
+ private void updateConnectionNum() {
+ try {
+ Process process = Runtime.getRuntime().exec("netstat -ano");
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(process.getInputStream()));
+ int count = 0;
+ String line;
+ while ((line = reader.readLine()) != null) {
+ line = line.trim();
+ if (!line.isEmpty() && !line.startsWith("Active") &&
!line.startsWith("Proto")) {
+ String[] parts = line.split("\\s+");
+ if (parts.length >= 5 && parts[parts.length -
1].equals(METRIC_CONFIG.getPid())) {
+ count++;
+ }
+ }
+ }
+ this.connectionNum = count;
+ int exitCode = process.waitFor();
+ if (exitCode != 0) {
+ LOGGER.error("Failed to get connection num, exit code: {}", exitCode);
+ }
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ LOGGER.error("Error updating connection num", e);
+ }
+ }
+}