This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch win_metrics_new in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dd528cc304f78b4b0e26fd7661c2518d6b81b832 Author: unknown <[email protected]> AuthorDate: Thu Feb 5 14:56:38 2026 +0800 net and file handle --- .../metrics/file/SystemRelatedFileMetrics.java | 18 +- .../apache/iotdb/metrics/config/MetricConfig.java | 14 +- .../metricsets/disk/WindowsDiskMetricsManager.java | 199 +++++++++----------- .../metricsets/net/WindowsNetMetricManager.java | 204 +++++++++++++++------ 4 files changed, 252 insertions(+), 183 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java index a225dd63123..aa529fc4672 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/file/SystemRelatedFileMetrics.java @@ -29,9 +29,13 @@ import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; import org.apache.iotdb.metrics.utils.SystemType; +import com.sun.jna.Library; +import com.sun.jna.Native; +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinNT; +import com.sun.jna.ptr.IntByReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import oshi.SystemInfo; import java.io.BufferedReader; import java.io.File; @@ -92,8 +96,10 @@ public class SystemRelatedFileMetrics implements IMetricSet { } fdCount = Long.parseLong(result.toString().trim()); } else if (CONFIG.getSystemType() == SystemType.WINDOWS) { - SystemInfo systemInfo = new SystemInfo(); - return systemInfo.getOperatingSystem().getCurrentProcess().getOpenFiles(); + WinNT.HANDLE hProcess = Kernel32.INSTANCE.GetCurrentProcess(); + IntByReference handleCount = new IntByReference(); + boolean success = Kernel32Ext.INSTANCE.GetProcessHandleCount(hProcess, handleCount); + return success ? handleCount.getValue() : 0L; } } catch (IOException e) { LOGGER.warn("Failed to get open file number, because ", e); @@ -112,4 +118,10 @@ public class SystemRelatedFileMetrics implements IMetricSet { "open_file_handlers"); } } + + public interface Kernel32Ext extends Library { + Kernel32Ext INSTANCE = Native.load("kernel32", Kernel32Ext.class); + + boolean GetProcessHandleCount(WinNT.HANDLE hProcess, IntByReference pdwHandleCount); + } } diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java index 317290e3435..0d3f019ce61 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/config/MetricConfig.java @@ -40,16 +40,16 @@ public class MetricConfig { private static final Logger LOGGER = LoggerFactory.getLogger(MetricConfig.class); /** The list of reporters provide metrics for external tool. */ - private List<ReporterType> metricReporterList = Collections.emptyList(); + private List<ReporterType> metricReporterList = Collections.singletonList(ReporterType.PROMETHEUS); /** The level of metric service. */ - private MetricLevel metricLevel = MetricLevel.IMPORTANT; + private MetricLevel metricLevel = MetricLevel.ALL; /** The period of async collection of some metrics in second. */ private Integer asyncCollectPeriodInSecond = 5; /** The export port for prometheus to get metrics. */ - private Integer prometheusReporterPort = 9091; + private Integer prometheusReporterPort = 9092; private String prometheusReporterUsername = ""; @@ -68,7 +68,7 @@ public class MetricConfig { private final SystemType systemType = SystemType.getSystemType(); /** The type of monitored node. */ - private NodeType nodeType = NodeType.CONFIGNODE; + private NodeType nodeType = NodeType.DATANODE; /** The name of iotdb cluster. */ private String clusterName = "defaultCluster"; @@ -99,12 +99,6 @@ public class MetricConfig { } public void setMetricReporterList(String metricReporterList) { - this.metricReporterList = new ArrayList<>(); - for (String type : metricReporterList.split(",")) { - if (!type.trim().isEmpty()) { - this.metricReporterList.add(ReporterType.valueOf(type)); - } - } } public InternalReporterType getInternalReportType() { diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java index b8961bd9b9c..abc0194dd14 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/disk/WindowsDiskMetricsManager.java @@ -19,172 +19,137 @@ package org.apache.iotdb.metrics.metricsets.disk; -import oshi.SystemInfo; -import oshi.hardware.HWDiskStore; -import oshi.software.os.OSProcess; +import org.apache.iotdb.metrics.config.MetricConfig; +import org.apache.iotdb.metrics.config.MetricConfigDescriptor; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; import java.util.HashMap; -import java.util.List; +import java.util.HashSet; import java.util.Map; -import java.util.stream.Collectors; -/** Disk Metrics Manager for Windows system, not implemented yet. */ -@SuppressWarnings({"rawtypes", "unchecked"}) +/** Disk Metrics Manager for Windows system. */ public class WindowsDiskMetricsManager extends AbstractDiskMetricsManager { - - private final SystemInfo systemInfo = new SystemInfo(); - private final OSProcess thisProcess; - private List<HWDiskStore> diskStores; + private static final MetricConfig METRIC_CONFIG = + MetricConfigDescriptor.getInstance().getMetricConfig(); public WindowsDiskMetricsManager() { - thisProcess = systemInfo.getOperatingSystem().getCurrentProcess(); - diskStores = systemInfo.getHardware().getDiskStores(); init(); } @Override public Map<String, Double> getReadDataSizeForDisk() { - checkUpdate(); - Map<String, Double> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), (double) disk.getReadBytes() / BYTES_PER_KB); - }); - return result; + return getDiskCounter("Disk Read Bytes/sec"); } @Override public Map<String, Double> getWriteDataSizeForDisk() { - checkUpdate(); - Map<String, Double> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), (double) disk.getWriteBytes() / BYTES_PER_KB); - }); - return result; + return getDiskCounter("Disk Write Bytes/sec"); } @Override public Map<String, Long> getReadOperationCountForDisk() { - checkUpdate(); - Map<String, Long> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), disk.getReads()); - }); - return result; + return convertToLongMap(getDiskCounter("Disk Reads/sec")); } @Override public Map<String, Long> getWriteOperationCountForDisk() { - checkUpdate(); - Map<String, Long> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), disk.getWrites()); - }); - return result; - } - - private Map<String, Long> getTransferTimesForDisk() { - checkUpdate(); - Map<String, Long> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), disk.getTransferTime()); - }); - return result; + return convertToLongMap(getDiskCounter("Disk Writes/sec")); } @Override public Map<String, Double> getQueueSizeForDisk() { - checkUpdate(); - Map<String, Double> result = new HashMap<>(); - diskStores.forEach( - disk -> { - result.put(this.getDisplayName(disk), (double) disk.getCurrentQueueLength()); - }); - return result; + return getDiskCounter("Current Disk Queue Length"); } @Override public double getActualReadDataSizeForProcess() { - return thisProcess.getBytesRead() / BYTES_PER_KB; + return getProcessIoBytes("IOReadBytes") / BYTES_PER_KB; } @Override public double getActualWriteDataSizeForProcess() { - return thisProcess.getBytesWritten() / BYTES_PER_KB; + return getProcessIoBytes("IOWriteBytes") / BYTES_PER_KB; } - @Override - public Map<String, Double> getAvgSizeOfEachReadForDisk() { - checkUpdate(); - Map<String, Double> result = new HashMap<>(incrementReadSizeForDisk.size()); - for (Map.Entry<String, Long> incrementReadSize : incrementReadSizeForDisk.entrySet()) { - // use Long.max to avoid NaN - long readOpsCount = - Long.max( - incrementReadOperationCountForDisk.getOrDefault(incrementReadSize.getKey(), 1L), 1L); - result.put( - incrementReadSize.getKey(), ((double) incrementReadSize.getValue()) / readOpsCount); + private Map<String, Double> getDiskCounter(String counterName) { + Map<String, Double> result = new HashMap<>(); + String cmd = + "Get-Counter \"\\PhysicalDisk(*)\\" + + counterName + + "\" | Select-Object -ExpandProperty CounterSamples"; + + String output = executePowerShell(cmd); + String[] lines = output.split("\n"); + + for (String line : lines) { + line = line.trim(); + if (line.contains("Path") && line.contains("CookedValue")) { + // format example: \\Computer\PhysicalDisk(0 C:)\Disk Read Bytes/sec : 123456 + try { + String path = + line.substring(line.indexOf("PhysicalDisk("), line.indexOf(")")) + .replace("PhysicalDisk(", ""); + long value = Long.parseLong(line.substring(line.lastIndexOf(":") + 1).trim()); + result.put(path, (double) value); + } catch (Exception ignored) { + } + } } return result; } - @Override - public Map<String, Double> getAvgSizeOfEachWriteForDisk() { - checkUpdate(); - Map<String, Double> result = new HashMap<>(incrementWriteSizeForDisk.size()); - for (Map.Entry<String, Long> incrementReadSize : incrementWriteSizeForDisk.entrySet()) { - // use Long.max to avoid NaN - long readOpsCount = - Long.max( - incrementWriteOperationCountForDisk.getOrDefault(incrementReadSize.getKey(), 1L), 1L); - result.put( - incrementReadSize.getKey(), ((double) incrementReadSize.getValue()) / readOpsCount); - } + private Map<String, Long> convertToLongMap(Map<String, Double> map) { + Map<String, Long> result = new HashMap<>(); + map.forEach((k, v) -> result.put(k, v.longValue())); return result; } - protected void updateInfo() { - super.updateInfo(); - updateDiskInfo(); - } - - private void updateDiskInfo() { - diskStores = systemInfo.getHardware().getDiskStores(); - - Map[] currentMapArray = { - getTransferTimesForDisk(), getReadDataSizeForDisk(), getWriteDataSizeForDisk(), - }; - Map[] lastMapArray = { - lastIoBusyTimeForDisk, lastReadSizeForDisk, lastWriteSizeForDisk, - }; - Map[] incrementMapArray = { - incrementIoBusyTimeForDisk, incrementReadSizeForDisk, incrementWriteSizeForDisk, - }; - - for (int i = 0; i < currentMapArray.length; i++) { - Map map = currentMapArray[i]; - int finalI = i; - map.forEach( - (key, value) -> { - updateSingleDiskInfo( - (String) key, - ((Number) value).longValue(), - lastMapArray[finalI], - incrementMapArray[finalI]); - }); + private long getProcessIoBytes(String field) { + String cmd = + "Get-Process -Id " + + METRIC_CONFIG.getPid() + + " | Select-Object " + + field + + " | Format-List"; + String output = executePowerShell(cmd); + + for (String line : output.split("\n")) { + if (line.contains(":")) { + try { + return Long.parseLong(line.split(":", 2)[1].trim()); + } catch (Exception ignored) { + } + } } - } - - private String getDisplayName(HWDiskStore disk) { - return disk.getName() + "-" + disk.getModel(); + return 0L; } @Override protected void collectDiskId() { - diskIdSet = diskStores.stream().map(this::getDisplayName).collect(Collectors.toSet()); + diskIdSet = new HashSet<>(getDiskCounter("Disk Read Bytes/sec").keySet()); + } + + private String executePowerShell(String command) { + try { + ProcessBuilder pb = new ProcessBuilder("powershell.exe", "-Command", command); + pb.redirectErrorStream(true); + Process process = pb.start(); + + StringBuilder output = new StringBuilder(); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + output.append(line).append("\n"); + } + } + process.waitFor(); + return output.toString(); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + return ""; + } } } diff --git a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java index b8497ec0c16..23fdc039f7f 100644 --- a/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java +++ b/iotdb-core/metrics/interface/src/main/java/org/apache/iotdb/metrics/metricsets/net/WindowsNetMetricManager.java @@ -19,86 +19,184 @@ package org.apache.iotdb.metrics.metricsets.net; -import oshi.SystemInfo; -import oshi.hardware.NetworkIF; +import org.apache.iotdb.metrics.MetricConstant; +import org.apache.iotdb.metrics.config.MetricConfig; +import org.apache.iotdb.metrics.config.MetricConfigDescriptor; +import org.apache.iotdb.metrics.utils.MetricLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; public class WindowsNetMetricManager implements INetMetricManager { - private final SystemInfo systemInfo = new SystemInfo(); - private final int pid; + private static final Logger LOGGER = LoggerFactory.getLogger(WindowsNetMetricManager.class); + + private static final MetricConfig METRIC_CONFIG = + MetricConfigDescriptor.getInstance().getMetricConfig(); + + private long lastUpdateTime = 0L; + + private Set<String> ifaceSet = new HashSet<>(); + + private final Map<String, Long> receivedBytesMapForIface = new HashMap<>(); + + private final Map<String, Long> transmittedBytesMapForIface = new HashMap<>(); + + private final Map<String, Long> receivedPacketsMapForIface = new HashMap<>(); + + private final Map<String, Long> transmittedPacketsMapForIface = new HashMap<>(); + + private int connectionNum = 0; public WindowsNetMetricManager() { - this.pid = systemInfo.getOperatingSystem().getCurrentProcess().getProcessID(); + checkUpdate(); + } + + @Override + public Set<String> getIfaceSet() { + checkUpdate(); + return ifaceSet; } @Override public Map<String, Long> getReceivedByte() { - Map<String, Long> result = new HashMap<>(); - systemInfo - .getHardware() - .getNetworkIFs() - .forEach( - (networkIF) -> { - result.put(networkIF.getDisplayName(), networkIF.getBytesRecv()); - }); - return result; + checkUpdate(); + return receivedBytesMapForIface; } @Override public Map<String, Long> getTransmittedBytes() { - Map<String, Long> result = new HashMap<>(); - systemInfo - .getHardware() - .getNetworkIFs() - .forEach( - (networkIF) -> { - result.put(networkIF.getDisplayName(), networkIF.getBytesSent()); - }); - return result; + checkUpdate(); + return transmittedBytesMapForIface; } @Override public Map<String, Long> getReceivedPackets() { - Map<String, Long> result = new HashMap<>(); - systemInfo - .getHardware() - .getNetworkIFs() - .forEach( - (networkIF) -> { - result.put(networkIF.getDisplayName(), networkIF.getPacketsRecv()); - }); - return result; + checkUpdate(); + return receivedPacketsMapForIface; } @Override public Map<String, Long> getTransmittedPackets() { - Map<String, Long> result = new HashMap<>(); - systemInfo - .getHardware() - .getNetworkIFs() - .forEach( - (networkIF) -> { - result.put(networkIF.getDisplayName(), networkIF.getPacketsSent()); - }); - return result; + checkUpdate(); + return transmittedPacketsMapForIface; } @Override - public Set<String> getIfaceSet() { - return systemInfo.getHardware().getNetworkIFs().stream() - .map(NetworkIF::getDisplayName) - .collect(Collectors.toSet()); + public int getConnectionNum() { + checkUpdate(); + return connectionNum; } - @Override - public int getConnectionNum() { - return (int) - systemInfo.getOperatingSystem().getInternetProtocolStats().getConnections().stream() - .filter(conn -> conn.getowningProcessId() == pid) - .count(); + private void checkUpdate() { + if (System.currentTimeMillis() - lastUpdateTime >= MetricConstant.UPDATE_INTERVAL) { + updateNetStatus(); + } + } + + private void updateNetStatus() { + lastUpdateTime = System.currentTimeMillis(); + updateInterfaces(); + updateStatistics(); + if (MetricLevel.higherOrEqual(MetricLevel.NORMAL, METRIC_CONFIG.getMetricLevel())) { + updateConnectionNum(); + } + } + + private void updateInterfaces() { + try { + Process process = + Runtime.getRuntime() + .exec( + "cmd.exe /c chcp 65001 > nul & powershell.exe -Command \"Get-NetAdapter | Select Name | Format-List \""); + BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("Name :")) { + ifaceSet.add(line.substring("Name : ".length()).trim()); + } + } + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get interfaces, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException e) { + LOGGER.error("Error updating interfaces", e); + ifaceSet = Collections.emptySet(); + } + } + + private void updateStatistics() { + try { + Process process = + Runtime.getRuntime() + .exec( + "cmd.exe /c chcp 65001 > nul & powershell.exe -Command \"Get-NetAdapterStatistics | Format-List Name,ReceivedBytes,SentBytes,ReceivedUnicastPackets,SentUnicastPackets \""); + BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8)); + String line; + String currentName = null; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (line.startsWith("Name ")) { + currentName = line.substring(line.indexOf(": ") + 2).trim(); + } else if (line.startsWith("ReceivedBytes ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + receivedBytesMapForIface.put(currentName, value); + } else if (line.startsWith("SentBytes ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + transmittedBytesMapForIface.put(currentName, value); + } else if (line.startsWith("ReceivedUnicastPackets ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + receivedPacketsMapForIface.put(currentName, value); + } else if (line.startsWith("SentUnicastPackets ") && currentName != null) { + long value = Long.parseLong(line.substring(line.indexOf(": ") + 2).trim()); + transmittedPacketsMapForIface.put(currentName, value); + currentName = null; // Reset after processing an interface + } + } + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get statistics, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException | NumberFormatException e) { + LOGGER.error("Error updating statistics", e); + } + } + + private void updateConnectionNum() { + try { + Process process = Runtime.getRuntime().exec("netstat -ano"); + BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream())); + int count = 0; + String line; + while ((line = reader.readLine()) != null) { + line = line.trim(); + if (!line.isEmpty() && !line.startsWith("Active") && !line.startsWith("Proto")) { + String[] parts = line.split("\\s+"); + if (parts.length >= 5 && parts[parts.length - 1].equals(METRIC_CONFIG.getPid())) { + count++; + } + } + } + this.connectionNum = count; + int exitCode = process.waitFor(); + if (exitCode != 0) { + LOGGER.error("Failed to get connection num, exit code: {}", exitCode); + } + } catch (IOException | InterruptedException e) { + LOGGER.error("Error updating connection num", e); + } } -} +} \ No newline at end of file
