HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. (Chris Douglas via kasha)
(cherry picked from commit ac6048372a58b3a3b57cd5f2702b44a3d4667f3d) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fc989ebe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fc989ebe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fc989ebe Branch: refs/heads/branch-2 Commit: fc989ebe164fef01f7cbeae7149fc351d6554d72 Parents: ccf1870 Author: Karthik Kambatla <[email protected]> Authored: Thu Jul 9 09:56:40 2015 -0700 Committer: Karthik Kambatla <[email protected]> Committed: Thu Jul 9 09:59:02 2015 -0700 ---------------------------------------------------------------------- hadoop-common-project/hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/util/CpuTimeTracker.java | 115 +++++ .../java/org/apache/hadoop/util/SysInfo.java | 111 +++++ .../org/apache/hadoop/util/SysInfoLinux.java | 444 +++++++++++++++++++ .../org/apache/hadoop/util/SysInfoWindows.java | 181 ++++++++ .../apache/hadoop/util/TestSysInfoLinux.java | 323 ++++++++++++++ .../apache/hadoop/util/TestSysInfoWindows.java | 100 +++++ .../apache/hadoop/yarn/util/CpuTimeTracker.java | 100 ----- .../util/LinuxResourceCalculatorPlugin.java | 392 +--------------- .../yarn/util/ProcfsBasedProcessTree.java | 34 +- .../yarn/util/ResourceCalculatorPlugin.java | 68 +-- .../yarn/util/WindowsBasedProcessTree.java | 2 +- .../util/WindowsResourceCalculatorPlugin.java | 158 +------ .../util/TestLinuxResourceCalculatorPlugin.java | 324 -------------- .../util/TestResourceCalculatorProcessTree.java | 2 +- .../TestWindowsResourceCalculatorPlugin.java | 86 ---- 16 files changed, 1335 insertions(+), 1108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 42a3851..9e8004b 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -189,6 +189,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12201. Add tracing to FileSystem#createFileSystem and Globber#glob (cmccabe) + HADOOP-12180. Move ResourceCalculatorPlugin from YARN to Common. + (Chris Douglas via kasha) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java new file mode 100644 index 0000000..3f17c9a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/CpuTimeTracker.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.math.BigInteger; + +/** + * Utility for sampling and computing CPU usage. + */ [email protected] [email protected] +public class CpuTimeTracker { + public static final int UNAVAILABLE = -1; + private final long minimumTimeInterval; + + // CPU used time since system is on (ms) + private BigInteger cumulativeCpuTime = BigInteger.ZERO; + + // CPU used time read last time (ms) + private BigInteger lastCumulativeCpuTime = BigInteger.ZERO; + + // Unix timestamp while reading the CPU time (ms) + private long sampleTime; + private long lastSampleTime; + private float cpuUsage; + private BigInteger jiffyLengthInMillis; + + public CpuTimeTracker(long jiffyLengthInMillis) { + this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); + this.cpuUsage = UNAVAILABLE; + this.sampleTime = UNAVAILABLE; + this.lastSampleTime = UNAVAILABLE; + minimumTimeInterval = 10 * jiffyLengthInMillis; + } + + /** + * Return percentage of cpu time spent over the time since last update. + * CPU time spent is based on elapsed jiffies multiplied by amount of + * time for 1 core. Thus, if you use 2 cores completely you would have spent + * twice the actual time between updates and this will return 200%. + * + * @return Return percentage of cpu usage since last update, {@link + * CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than + * {@link CpuTimeTracker#minimumTimeInterval} apart + */ + public float getCpuTrackerUsagePercent() { + if (lastSampleTime == UNAVAILABLE || + lastSampleTime > sampleTime) { + // lastSampleTime > sampleTime may happen when the system time is changed + lastSampleTime = sampleTime; + lastCumulativeCpuTime = cumulativeCpuTime; + return cpuUsage; + } + // When lastSampleTime is sufficiently old, update cpuUsage. + // Also take a sample of the current time and cumulative CPU time for the + // use of the next calculation. + if (sampleTime > lastSampleTime + minimumTimeInterval) { + cpuUsage = + ((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue()) + * 100F / ((float) (sampleTime - lastSampleTime)); + lastSampleTime = sampleTime; + lastCumulativeCpuTime = cumulativeCpuTime; + } + return cpuUsage; + } + + /** + * Obtain the cumulative CPU time since the system is on. + * @return cumulative CPU time in milliseconds + */ + public long getCumulativeCpuTime() { + return cumulativeCpuTime.longValue(); + } + + /** + * Apply delta to accumulators. + * @param elapsedJiffies updated jiffies + * @param newTime new sample time + */ + public void updateElapsedJiffies(BigInteger elapsedJiffies, long newTime) { + cumulativeCpuTime = elapsedJiffies.multiply(jiffyLengthInMillis); + sampleTime = newTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SampleTime " + this.sampleTime); + sb.append(" CummulativeCpuTime " + this.cumulativeCpuTime); + sb.append(" LastSampleTime " + this.lastSampleTime); + sb.append(" LastCummulativeCpuTime " + this.lastCumulativeCpuTime); + sb.append(" CpuUsage " + this.cpuUsage); + sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java new file mode 100644 index 0000000..ec7fb24 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Plugin to calculate resource information on the system. + */ [email protected] [email protected] +public abstract class SysInfo { + + /** + * Return default OS instance. + * @throws UnsupportedOperationException If cannot determine OS. + * @return Default instance for the detected OS. + */ + public static SysInfo newInstance() { + if (Shell.LINUX) { + return new SysInfoLinux(); + } + if (Shell.WINDOWS) { + return new SysInfoWindows(); + } + throw new UnsupportedOperationException("Could not determine OS"); + } + + /** + * Obtain the total size of the virtual memory present in the system. + * + * @return virtual memory size in bytes. + */ + public abstract long getVirtualMemorySize(); + + /** + * Obtain the total size of the physical memory present in the system. + * + * @return physical memory size bytes. + */ + public abstract long getPhysicalMemorySize(); + + /** + * Obtain the total size of the available virtual memory present + * in the system. + * + * @return available virtual memory size in bytes. + */ + public abstract long getAvailableVirtualMemorySize(); + + /** + * Obtain the total size of the available physical memory present + * in the system. + * + * @return available physical memory size bytes. + */ + public abstract long getAvailablePhysicalMemorySize(); + + /** + * Obtain the total number of logical processors present on the system. + * + * @return number of logical processors + */ + public abstract int getNumProcessors(); + + /** + * Obtain total number of physical cores present on the system. + * + * @return number of physical cores + */ + public abstract int getNumCores(); + + /** + * Obtain the CPU frequency of on the system. + * + * @return CPU frequency in kHz + */ + public abstract long getCpuFrequency(); + + /** + * Obtain the cumulative CPU time since the system is on. + * + * @return cumulative CPU time in milliseconds + */ + public abstract long getCumulativeCpuTime(); + + /** + * Obtain the CPU usage % of the machine. Return -1 if it is unavailable + * + * @return CPU usage as a percentage of available cycles. + */ + public abstract float getCpuUsage(); + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java new file mode 100644 index 0000000..055298d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -0,0 +1,444 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStreamReader; +import java.io.IOException; +import java.math.BigInteger; +import java.nio.charset.Charset; +import java.util.HashSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; + +/** + * Plugin to calculate resource information on Linux systems. + */ [email protected] [email protected] +public class SysInfoLinux extends SysInfo { + private static final Log LOG = + LogFactory.getLog(SysInfoLinux.class); + + /** + * proc's meminfo virtual file has keys-values in the format + * "key:[ \t]*value[ \t]kB". + */ + private static final String PROCFS_MEMFILE = "/proc/meminfo"; + private static final Pattern PROCFS_MEMFILE_FORMAT = + Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB"); + + // We need the values for the following keys in meminfo + private static final String MEMTOTAL_STRING = "MemTotal"; + private static final String SWAPTOTAL_STRING = "SwapTotal"; + private static final String MEMFREE_STRING = "MemFree"; + private static final String SWAPFREE_STRING = "SwapFree"; + private static final String INACTIVE_STRING = "Inactive"; + + /** + * Patterns for parsing /proc/cpuinfo. + */ + private static final String PROCFS_CPUINFO = "/proc/cpuinfo"; + private static final Pattern PROCESSOR_FORMAT = + Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)"); + private static final Pattern FREQUENCY_FORMAT = + Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)"); + private static final Pattern PHYSICAL_ID_FORMAT = + Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)"); + private static final Pattern CORE_ID_FORMAT = + Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)"); + + /** + * Pattern for parsing /proc/stat. + */ + private static final String PROCFS_STAT = "/proc/stat"; + private static final Pattern CPU_TIME_FORMAT = + Pattern.compile("^cpu[ \t]*([0-9]*)" + + "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); + private CpuTimeTracker cpuTimeTracker; + + private String procfsMemFile; + private String procfsCpuFile; + private String procfsStatFile; + private long jiffyLengthInMillis; + + private long ramSize = 0; + private long swapSize = 0; + private long ramSizeFree = 0; // free ram space on the machine (kB) + private long swapSizeFree = 0; // free swap space on the machine (kB) + private long inactiveSize = 0; // inactive cache memory (kB) + /* number of logical processors on the system. */ + private int numProcessors = 0; + /* number of physical cores on the system. */ + private int numCores = 0; + private long cpuFrequency = 0L; // CPU frequency on the system (kHz) + + private boolean readMemInfoFile = false; + private boolean readCpuInfoFile = false; + + public static final long PAGE_SIZE = getConf("PAGESIZE"); + public static final long JIFFY_LENGTH_IN_MILLIS = + Math.max(Math.round(1000D / getConf("CLK_TCK")), -1); + + private static long getConf(String attr) { + if(Shell.LINUX) { + try { + ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor( + new String[] {"getconf", attr }); + shellExecutorClk.execute(); + return Long.parseLong(shellExecutorClk.getOutput().replace("\n", "")); + } catch (IOException|NumberFormatException e) { + return -1; + } + } + return -1; + } + + /** + * Get current time. + * @return Unix time stamp in millisecond + */ + long getCurrentTime() { + return System.currentTimeMillis(); + } + + public SysInfoLinux() { + this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT, + JIFFY_LENGTH_IN_MILLIS); + } + + /** + * Constructor which allows assigning the /proc/ directories. This will be + * used only in unit tests. + * @param procfsMemFile fake file for /proc/meminfo + * @param procfsCpuFile fake file for /proc/cpuinfo + * @param procfsStatFile fake file for /proc/stat + * @param jiffyLengthInMillis fake jiffy length value + */ + @VisibleForTesting + public SysInfoLinux(String procfsMemFile, + String procfsCpuFile, + String procfsStatFile, + long jiffyLengthInMillis) { + this.procfsMemFile = procfsMemFile; + this.procfsCpuFile = procfsCpuFile; + this.procfsStatFile = procfsStatFile; + this.jiffyLengthInMillis = jiffyLengthInMillis; + this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); + } + + /** + * Read /proc/meminfo, parse and compute memory information only once. + */ + private void readProcMemInfoFile() { + readProcMemInfoFile(false); + } + + /** + * Read /proc/meminfo, parse and compute memory information. + * @param readAgain if false, read only on the first time + */ + private void readProcMemInfoFile(boolean readAgain) { + + if (readMemInfoFile && !readAgain) { + return; + } + + // Read "/proc/memInfo" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsMemFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("Couldn't read " + procfsMemFile + + "; can't determine memory settings"); + return; + } + + Matcher mat; + + try { + String str = in.readLine(); + while (str != null) { + mat = PROCFS_MEMFILE_FORMAT.matcher(str); + if (mat.find()) { + if (mat.group(1).equals(MEMTOTAL_STRING)) { + ramSize = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(SWAPTOTAL_STRING)) { + swapSize = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(MEMFREE_STRING)) { + ramSizeFree = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(SWAPFREE_STRING)) { + swapSizeFree = Long.parseLong(mat.group(2)); + } else if (mat.group(1).equals(INACTIVE_STRING)) { + inactiveSize = Long.parseLong(mat.group(2)); + } + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + + readMemInfoFile = true; + } + + /** + * Read /proc/cpuinfo, parse and calculate CPU information. + */ + private void readProcCpuInfoFile() { + // This directory needs to be read only once + if (readCpuInfoFile) { + return; + } + HashSet<String> coreIdSet = new HashSet<>(); + // Read "/proc/cpuinfo" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsCpuFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info"); + return; + } + Matcher mat; + try { + numProcessors = 0; + numCores = 1; + String currentPhysicalId = ""; + String str = in.readLine(); + while (str != null) { + mat = PROCESSOR_FORMAT.matcher(str); + if (mat.find()) { + numProcessors++; + } + mat = FREQUENCY_FORMAT.matcher(str); + if (mat.find()) { + cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz + } + mat = PHYSICAL_ID_FORMAT.matcher(str); + if (mat.find()) { + currentPhysicalId = str; + } + mat = CORE_ID_FORMAT.matcher(str); + if (mat.find()) { + coreIdSet.add(currentPhysicalId + " " + str); + numCores = coreIdSet.size(); + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + readCpuInfoFile = true; + } + + /** + * Read /proc/stat file, parse and calculate cumulative CPU. + */ + private void readProcStatFile() { + // Read "/proc/stat" file + BufferedReader in; + InputStreamReader fReader; + try { + fReader = new InputStreamReader( + new FileInputStream(procfsStatFile), Charset.forName("UTF-8")); + in = new BufferedReader(fReader); + } catch (FileNotFoundException f) { + // shouldn't happen.... + return; + } + + Matcher mat; + try { + String str = in.readLine(); + while (str != null) { + mat = CPU_TIME_FORMAT.matcher(str); + if (mat.find()) { + long uTime = Long.parseLong(mat.group(1)); + long nTime = Long.parseLong(mat.group(2)); + long sTime = Long.parseLong(mat.group(3)); + cpuTimeTracker.updateElapsedJiffies( + BigInteger.valueOf(uTime + nTime + sTime), + getCurrentTime()); + break; + } + str = in.readLine(); + } + } catch (IOException io) { + LOG.warn("Error reading the stream " + io); + } finally { + // Close the streams + try { + fReader.close(); + try { + in.close(); + } catch (IOException i) { + LOG.warn("Error closing the stream " + in); + } + } catch (IOException i) { + LOG.warn("Error closing the stream " + fReader); + } + } + } + + /** {@inheritDoc} */ + @Override + public long getPhysicalMemorySize() { + readProcMemInfoFile(); + return ramSize * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getVirtualMemorySize() { + readProcMemInfoFile(); + return (ramSize + swapSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getAvailablePhysicalMemorySize() { + readProcMemInfoFile(true); + return (ramSizeFree + inactiveSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public long getAvailableVirtualMemorySize() { + readProcMemInfoFile(true); + return (ramSizeFree + swapSizeFree + inactiveSize) * 1024; + } + + /** {@inheritDoc} */ + @Override + public int getNumProcessors() { + readProcCpuInfoFile(); + return numProcessors; + } + + /** {@inheritDoc} */ + @Override + public int getNumCores() { + readProcCpuInfoFile(); + return numCores; + } + + /** {@inheritDoc} */ + @Override + public long getCpuFrequency() { + readProcCpuInfoFile(); + return cpuFrequency; + } + + /** {@inheritDoc} */ + @Override + public long getCumulativeCpuTime() { + readProcStatFile(); + return cpuTimeTracker.getCumulativeCpuTime(); + } + + /** {@inheritDoc} */ + @Override + public float getCpuUsage() { + readProcStatFile(); + float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent(); + if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) { + overallCpuUsage = overallCpuUsage / getNumProcessors(); + } + return overallCpuUsage; + } + + /** + * Test the {@link SysInfoLinux}. + * + * @param args - arguments to this calculator test + */ + public static void main(String[] args) { + SysInfoLinux plugin = new SysInfoLinux(); + System.out.println("Physical memory Size (bytes) : " + + plugin.getPhysicalMemorySize()); + System.out.println("Total Virtual memory Size (bytes) : " + + plugin.getVirtualMemorySize()); + System.out.println("Available Physical memory Size (bytes) : " + + plugin.getAvailablePhysicalMemorySize()); + System.out.println("Total Available Virtual memory Size (bytes) : " + + plugin.getAvailableVirtualMemorySize()); + System.out.println("Number of Processors : " + plugin.getNumProcessors()); + System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency()); + System.out.println("Cumulative CPU time (ms) : " + + plugin.getCumulativeCpuTime()); + try { + // Sleep so we can compute the CPU usage + Thread.sleep(500L); + } catch (InterruptedException e) { + // do nothing + } + System.out.println("CPU usage % : " + plugin.getCpuUsage()); + } + + @VisibleForTesting + void setReadCpuInfoFile(boolean readCpuInfoFileValue) { + this.readCpuInfoFile = readCpuInfoFileValue; + } + + public long getJiffyLengthInMillis() { + return this.jiffyLengthInMillis; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java new file mode 100644 index 0000000..da4c1c5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.Shell.ShellCommandExecutor; + +/** + * Plugin to calculate resource information on Windows systems. + */ [email protected] [email protected] +public class SysInfoWindows extends SysInfo { + + private static final Log LOG = LogFactory.getLog(SysInfoWindows.class); + + private long vmemSize; + private long memSize; + private long vmemAvailable; + private long memAvailable; + private int numProcessors; + private long cpuFrequencyKhz; + private long cumulativeCpuTimeMs; + private float cpuUsage; + + private long lastRefreshTime; + static final int REFRESH_INTERVAL_MS = 1000; + + public SysInfoWindows() { + lastRefreshTime = 0; + reset(); + } + + @VisibleForTesting + long now() { + return System.nanoTime(); + } + + void reset() { + vmemSize = -1; + memSize = -1; + vmemAvailable = -1; + memAvailable = -1; + numProcessors = -1; + cpuFrequencyKhz = -1; + cumulativeCpuTimeMs = -1; + cpuUsage = -1; + } + + String getSystemInfoInfoFromShell() { + ShellCommandExecutor shellExecutor = new ShellCommandExecutor( + new String[] {Shell.WINUTILS, "systeminfo" }); + try { + shellExecutor.execute(); + return shellExecutor.getOutput(); + } catch (IOException e) { + LOG.error(StringUtils.stringifyException(e)); + } + return null; + } + + void refreshIfNeeded() { + long now = now(); + if (now - lastRefreshTime > REFRESH_INTERVAL_MS) { + long refreshInterval = now - lastRefreshTime; + lastRefreshTime = now; + long lastCumCpuTimeMs = cumulativeCpuTimeMs; + reset(); + String sysInfoStr = getSystemInfoInfoFromShell(); + if (sysInfoStr != null) { + final int sysInfoSplitCount = 7; + String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) + .split(","); + if (sysInfo.length == sysInfoSplitCount) { + try { + vmemSize = Long.parseLong(sysInfo[0]); + memSize = Long.parseLong(sysInfo[1]); + vmemAvailable = Long.parseLong(sysInfo[2]); + memAvailable = Long.parseLong(sysInfo[3]); + numProcessors = Integer.parseInt(sysInfo[4]); + cpuFrequencyKhz = Long.parseLong(sysInfo[5]); + cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); + if (lastCumCpuTimeMs != -1) { + cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) + / (refreshInterval * 1.0f); + } + } catch (NumberFormatException nfe) { + LOG.warn("Error parsing sysInfo", nfe); + } + } else { + LOG.warn("Expected split length of sysInfo to be " + + sysInfoSplitCount + ". Got " + sysInfo.length); + } + } + } + } + + /** {@inheritDoc} */ + @Override + public long getVirtualMemorySize() { + refreshIfNeeded(); + return vmemSize; + } + + /** {@inheritDoc} */ + @Override + public long getPhysicalMemorySize() { + refreshIfNeeded(); + return memSize; + } + + /** {@inheritDoc} */ + @Override + public long getAvailableVirtualMemorySize() { + refreshIfNeeded(); + return vmemAvailable; + } + + /** {@inheritDoc} */ + @Override + public long getAvailablePhysicalMemorySize() { + refreshIfNeeded(); + return memAvailable; + } + + /** {@inheritDoc} */ + @Override + public int getNumProcessors() { + refreshIfNeeded(); + return numProcessors; + } + + /** {@inheritDoc} */ + @Override + public int getNumCores() { + return getNumProcessors(); + } + + /** {@inheritDoc} */ + @Override + public long getCpuFrequency() { + refreshIfNeeded(); + return cpuFrequencyKhz; + } + + /** {@inheritDoc} */ + @Override + public long getCumulativeCpuTime() { + refreshIfNeeded(); + return cumulativeCpuTimeMs; + } + + /** {@inheritDoc} */ + @Override + public float getCpuUsage() { + refreshIfNeeded(); + return cpuUsage; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java new file mode 100644 index 0000000..73edc77 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoLinux.java @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * A JUnit test to test {@link SysInfoLinux} + * Create the fake /proc/ information and verify the parsing and calculation + */ +public class TestSysInfoLinux { + /** + * LinuxResourceCalculatorPlugin with a fake timer + */ + static class FakeLinuxResourceCalculatorPlugin extends + SysInfoLinux { + + long currentTime = 0; + public FakeLinuxResourceCalculatorPlugin(String procfsMemFile, + String procfsCpuFile, + String procfsStatFile, + long jiffyLengthInMillis) { + super(procfsMemFile, procfsCpuFile, procfsStatFile, jiffyLengthInMillis); + } + @Override + long getCurrentTime() { + return currentTime; + } + public void advanceTime(long adv) { + currentTime += adv * this.getJiffyLengthInMillis(); + } + } + private static final FakeLinuxResourceCalculatorPlugin plugin; + private static String TEST_ROOT_DIR = new Path(System.getProperty( + "test.build.data", "/tmp")).toString().replace(' ', '+'); + private static final String FAKE_MEMFILE; + private static final String FAKE_CPUFILE; + private static final String FAKE_STATFILE; + private static final long FAKE_JIFFY_LENGTH = 10L; + static { + int randomNum = (new Random()).nextInt(1000000000); + FAKE_MEMFILE = TEST_ROOT_DIR + File.separator + "MEMINFO_" + randomNum; + FAKE_CPUFILE = TEST_ROOT_DIR + File.separator + "CPUINFO_" + randomNum; + FAKE_STATFILE = TEST_ROOT_DIR + File.separator + "STATINFO_" + randomNum; + plugin = new FakeLinuxResourceCalculatorPlugin(FAKE_MEMFILE, FAKE_CPUFILE, + FAKE_STATFILE, + FAKE_JIFFY_LENGTH); + } + static final String MEMINFO_FORMAT = + "MemTotal: %d kB\n" + + "MemFree: %d kB\n" + + "Buffers: 138244 kB\n" + + "Cached: 947780 kB\n" + + "SwapCached: 142880 kB\n" + + "Active: 3229888 kB\n" + + "Inactive: %d kB\n" + + "SwapTotal: %d kB\n" + + "SwapFree: %d kB\n" + + "Dirty: 122012 kB\n" + + "Writeback: 0 kB\n" + + "AnonPages: 2710792 kB\n" + + "Mapped: 24740 kB\n" + + "Slab: 132528 kB\n" + + "SReclaimable: 105096 kB\n" + + "SUnreclaim: 27432 kB\n" + + "PageTables: 11448 kB\n" + + "NFS_Unstable: 0 kB\n" + + "Bounce: 0 kB\n" + + "CommitLimit: 4125904 kB\n" + + "Committed_AS: 4143556 kB\n" + + "VmallocTotal: 34359738367 kB\n" + + "VmallocUsed: 1632 kB\n" + + "VmallocChunk: 34359736375 kB\n" + + "HugePages_Total: 0\n" + + "HugePages_Free: 0\n" + + "HugePages_Rsvd: 0\n" + + "Hugepagesize: 2048 kB"; + + static final String CPUINFO_FORMAT = + "processor : %s\n" + + "vendor_id : AuthenticAMD\n" + + "cpu family : 15\n" + + "model : 33\n" + + "model name : Dual Core AMD Opteron(tm) Processor 280\n" + + "stepping : 2\n" + + "cpu MHz : %f\n" + + "cache size : 1024 KB\n" + + "physical id : %s\n" + + "siblings : 2\n" + + "core id : %s\n" + + "cpu cores : 2\n" + + "fpu : yes\n" + + "fpu_exception : yes\n" + + "cpuid level : 1\n" + + "wp : yes\n" + + "flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov " + + "pat pse36 clflush mmx fxsr sse sse2 ht syscall nx mmxext fxsr_opt lm " + + "3dnowext 3dnow pni lahf_lm cmp_legacy\n" + + "bogomips : 4792.41\n" + + "TLB size : 1024 4K pages\n" + + "clflush size : 64\n" + + "cache_alignment : 64\n" + + "address sizes : 40 bits physical, 48 bits virtual\n" + + "power management: ts fid vid ttp"; + + static final String STAT_FILE_FORMAT = + "cpu %d %d %d 1646495089 831319 48713 164346 0\n" + + "cpu0 15096055 30805 3823005 411456015 206027 13 14269 0\n" + + "cpu1 14760561 89890 6432036 408707910 456857 48074 130857 0\n" + + "cpu2 12761169 20842 3758639 413976772 98028 411 10288 0\n" + + "cpu3 12355207 47322 5789691 412354390 70406 213 8931 0\n" + + "intr 114648668 20010764 2 0 945665 2 0 0 0 0 0 0 0 4 0 0 0 0 0 0\n" + + "ctxt 242017731764\n" + + "btime 1257808753\n" + + "processes 26414943\n" + + "procs_running 1\n" + + "procs_blocked 0\n"; + + /** + * Test parsing /proc/stat and /proc/cpuinfo + * @throws IOException + */ + @Test + public void parsingProcStatAndCpuFile() throws IOException { + // Write fake /proc/cpuinfo file. + long numProcessors = 8; + long cpuFrequencyKHz = 2392781; + String fileContent = ""; + for (int i = 0; i < numProcessors; i++) { + fileContent += + String.format(CPUINFO_FORMAT, i, cpuFrequencyKHz / 1000D, 0, 0) + + "\n"; + } + File tempFile = new File(FAKE_CPUFILE); + tempFile.deleteOnExit(); + FileWriter fWriter = new FileWriter(FAKE_CPUFILE); + fWriter.write(fileContent); + fWriter.close(); + assertEquals(plugin.getNumProcessors(), numProcessors); + assertEquals(plugin.getCpuFrequency(), cpuFrequencyKHz); + + // Write fake /proc/stat file. + long uTime = 54972994; + long nTime = 188860; + long sTime = 19803373; + tempFile = new File(FAKE_STATFILE); + tempFile.deleteOnExit(); + updateStatFile(uTime, nTime, sTime); + assertEquals(plugin.getCumulativeCpuTime(), + FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); + assertEquals(plugin.getCpuUsage(), (float)(CpuTimeTracker.UNAVAILABLE),0.0); + + // Advance the time and sample again to test the CPU usage calculation + uTime += 100L; + plugin.advanceTime(200L); + updateStatFile(uTime, nTime, sTime); + assertEquals(plugin.getCumulativeCpuTime(), + FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); + assertEquals(plugin.getCpuUsage(), 6.25F, 0.0); + + // Advance the time and sample again. This time, we call getCpuUsage() only. + uTime += 600L; + plugin.advanceTime(300L); + updateStatFile(uTime, nTime, sTime); + assertEquals(plugin.getCpuUsage(), 25F, 0.0); + + // Advance very short period of time (one jiffy length). + // In this case, CPU usage should not be updated. + uTime += 1L; + plugin.advanceTime(1L); + updateStatFile(uTime, nTime, sTime); + assertEquals(plugin.getCumulativeCpuTime(), + FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); + assertEquals(plugin.getCpuUsage(), 25F, 0.0); // CPU usage is not updated. + } + + /** + * Write information to fake /proc/stat file + */ + private void updateStatFile(long uTime, long nTime, long sTime) + throws IOException { + FileWriter fWriter = new FileWriter(FAKE_STATFILE); + fWriter.write(String.format(STAT_FILE_FORMAT, uTime, nTime, sTime)); + fWriter.close(); + } + + /** + * Test parsing /proc/meminfo + * @throws IOException + */ + @Test + public void parsingProcMemFile() throws IOException { + long memTotal = 4058864L; + long memFree = 99632L; + long inactive = 567732L; + long swapTotal = 2096472L; + long swapFree = 1818480L; + File tempFile = new File(FAKE_MEMFILE); + tempFile.deleteOnExit(); + FileWriter fWriter = new FileWriter(FAKE_MEMFILE); + fWriter.write(String.format(MEMINFO_FORMAT, + memTotal, memFree, inactive, swapTotal, swapFree)); + + fWriter.close(); + assertEquals(plugin.getAvailablePhysicalMemorySize(), + 1024L * (memFree + inactive)); + assertEquals(plugin.getAvailableVirtualMemorySize(), + 1024L * (memFree + inactive + swapFree)); + assertEquals(plugin.getPhysicalMemorySize(), 1024L * memTotal); + assertEquals(plugin.getVirtualMemorySize(), 1024L * (memTotal + swapTotal)); + } + + @Test + public void testCoreCounts() throws IOException { + + String fileContent = ""; + // single core, hyper threading + long numProcessors = 2; + long cpuFrequencyKHz = 2392781; + for (int i = 0; i < numProcessors; i++) { + fileContent = + fileContent.concat(String.format(CPUINFO_FORMAT, i, + cpuFrequencyKHz / 1000D, 0, 0)); + fileContent = fileContent.concat("\n"); + } + writeFakeCPUInfoFile(fileContent); + plugin.setReadCpuInfoFile(false); + assertEquals(numProcessors, plugin.getNumProcessors()); + assertEquals(1, plugin.getNumCores()); + + // single socket quad core, no hyper threading + fileContent = ""; + numProcessors = 4; + for (int i = 0; i < numProcessors; i++) { + fileContent = + fileContent.concat(String.format(CPUINFO_FORMAT, i, + cpuFrequencyKHz / 1000D, 0, i)); + fileContent = fileContent.concat("\n"); + } + writeFakeCPUInfoFile(fileContent); + plugin.setReadCpuInfoFile(false); + assertEquals(numProcessors, plugin.getNumProcessors()); + assertEquals(4, plugin.getNumCores()); + + // dual socket single core, hyper threading + fileContent = ""; + numProcessors = 4; + for (int i = 0; i < numProcessors; i++) { + fileContent = + fileContent.concat(String.format(CPUINFO_FORMAT, i, + cpuFrequencyKHz / 1000D, i / 2, 0)); + fileContent = fileContent.concat("\n"); + } + writeFakeCPUInfoFile(fileContent); + plugin.setReadCpuInfoFile(false); + assertEquals(numProcessors, plugin.getNumProcessors()); + assertEquals(2, plugin.getNumCores()); + + // dual socket, dual core, no hyper threading + fileContent = ""; + numProcessors = 4; + for (int i = 0; i < numProcessors; i++) { + fileContent = + fileContent.concat(String.format(CPUINFO_FORMAT, i, + cpuFrequencyKHz / 1000D, i / 2, i % 2)); + fileContent = fileContent.concat("\n"); + } + writeFakeCPUInfoFile(fileContent); + plugin.setReadCpuInfoFile(false); + assertEquals(numProcessors, plugin.getNumProcessors()); + assertEquals(4, plugin.getNumCores()); + + // dual socket, dual core, hyper threading + fileContent = ""; + numProcessors = 8; + for (int i = 0; i < numProcessors; i++) { + fileContent = + fileContent.concat(String.format(CPUINFO_FORMAT, i, + cpuFrequencyKHz / 1000D, i / 4, (i % 4) / 2)); + fileContent = fileContent.concat("\n"); + } + writeFakeCPUInfoFile(fileContent); + plugin.setReadCpuInfoFile(false); + assertEquals(numProcessors, plugin.getNumProcessors()); + assertEquals(4, plugin.getNumCores()); + } + + private void writeFakeCPUInfoFile(String content) throws IOException { + File tempFile = new File(FAKE_CPUFILE); + FileWriter fWriter = new FileWriter(FAKE_CPUFILE); + tempFile.deleteOnExit(); + try { + fWriter.write(content); + } finally { + IOUtils.closeQuietly(fWriter); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java new file mode 100644 index 0000000..7924c02 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestSysInfoWindows.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class TestSysInfoWindows { + + + static class SysInfoWindowsMock extends SysInfoWindows { + private long time = SysInfoWindows.REFRESH_INTERVAL_MS + 1; + private String infoStr = null; + void setSysinfoString(String infoStr) { + this.infoStr = infoStr; + } + void advance(long dur) { + time += dur; + } + @Override + String getSystemInfoInfoFromShell() { + return infoStr; + } + @Override + long now() { + return time; + } + } + + @Test(timeout = 10000) + public void parseSystemInfoString() { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + tester.setSysinfoString( + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + // info str derived from windows shell command has \r\n termination + assertEquals(17177038848L, tester.getVirtualMemorySize()); + assertEquals(8589467648L, tester.getPhysicalMemorySize()); + assertEquals(15232745472L, tester.getAvailableVirtualMemorySize()); + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(1, tester.getNumProcessors()); + assertEquals(1, tester.getNumCores()); + assertEquals(2805000L, tester.getCpuFrequency()); + assertEquals(6261812L, tester.getCumulativeCpuTime()); + // undef on first call + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + } + + @Test(timeout = 10000) + public void refreshAndCpuUsage() throws InterruptedException { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + tester.setSysinfoString( + "17177038848,8589467648,15232745472,6400417792,1,2805000,6261812\r\n"); + // info str derived from windows shell command has \r\n termination + tester.getAvailablePhysicalMemorySize(); + // verify information has been refreshed + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + + tester.setSysinfoString( + "17177038848,8589467648,15232745472,5400417792,1,2805000,6263012\r\n"); + tester.getAvailablePhysicalMemorySize(); + // verify information has not been refreshed + assertEquals(6400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals(-1.0, tester.getCpuUsage(), 0.0); + + // advance clock + tester.advance(SysInfoWindows.REFRESH_INTERVAL_MS + 1); + + // verify information has been refreshed + assertEquals(5400417792L, tester.getAvailablePhysicalMemorySize()); + assertEquals((6263012 - 6261812) / (SysInfoWindows.REFRESH_INTERVAL_MS + 1f), + tester.getCpuUsage(), 0.0); + } + + @Test(timeout = 10000) + public void errorInGetSystemInfo() { + SysInfoWindowsMock tester = new SysInfoWindowsMock(); + // info str derived from windows shell command has \r\n termination + tester.setSysinfoString(null); + // call a method to refresh values + tester.getAvailablePhysicalMemorySize(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java deleted file mode 100644 index b09a4b6..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/CpuTimeTracker.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.util; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import java.math.BigInteger; - [email protected] [email protected] -public class CpuTimeTracker { - public static final int UNAVAILABLE = - ResourceCalculatorProcessTree.UNAVAILABLE; - final long MINIMUM_UPDATE_INTERVAL; - - // CPU used time since system is on (ms) - BigInteger cumulativeCpuTime = BigInteger.ZERO; - - // CPU used time read last time (ms) - BigInteger lastCumulativeCpuTime = BigInteger.ZERO; - - // Unix timestamp while reading the CPU time (ms) - long sampleTime; - long lastSampleTime; - float cpuUsage; - BigInteger jiffyLengthInMillis; - - public CpuTimeTracker(long jiffyLengthInMillis) { - this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); - this.cpuUsage = UNAVAILABLE; - this.sampleTime = UNAVAILABLE; - this.lastSampleTime = UNAVAILABLE; - MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis; - } - - /** - * Return percentage of cpu time spent over the time since last update. - * CPU time spent is based on elapsed jiffies multiplied by amount of - * time for 1 core. Thus, if you use 2 cores completely you would have spent - * twice the actual time between updates and this will return 200%. - * - * @return Return percentage of cpu usage since last update, {@link - * CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than - * {@link CpuTimeTracker#MINIMUM_UPDATE_INTERVAL} apart - */ - public float getCpuTrackerUsagePercent() { - if (lastSampleTime == UNAVAILABLE || - lastSampleTime > sampleTime) { - // lastSampleTime > sampleTime may happen when the system time is changed - lastSampleTime = sampleTime; - lastCumulativeCpuTime = cumulativeCpuTime; - return cpuUsage; - } - // When lastSampleTime is sufficiently old, update cpuUsage. - // Also take a sample of the current time and cumulative CPU time for the - // use of the next calculation. - if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) { - cpuUsage = - ((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue()) - * 100F / ((float) (sampleTime - lastSampleTime)); - lastSampleTime = sampleTime; - lastCumulativeCpuTime = cumulativeCpuTime; - } - return cpuUsage; - } - - public void updateElapsedJiffies(BigInteger elapedJiffies, long sampleTime) { - this.cumulativeCpuTime = elapedJiffies.multiply(jiffyLengthInMillis); - this.sampleTime = sampleTime; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("SampleTime " + this.sampleTime); - sb.append(" CummulativeCpuTime " + this.cumulativeCpuTime); - sb.append(" LastSampleTime " + this.lastSampleTime); - sb.append(" LastCummulativeCpuTime " + this.lastCumulativeCpuTime); - sb.append(" CpuUsage " + this.cpuUsage); - sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); - return sb.toString(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java index bf4cfa4..f458f16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LinuxResourceCalculatorPlugin.java @@ -15,25 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.InputStreamReader; -import java.io.IOException; -import java.math.BigInteger; -import java.nio.charset.Charset; -import java.util.HashSet; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.SysInfoLinux; /** * Plugin to calculate resource information on Linux systems. @@ -41,383 +27,9 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Unstable public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin { - private static final Log LOG = - LogFactory.getLog(LinuxResourceCalculatorPlugin.class); - - /** - * proc's meminfo virtual file has keys-values in the format - * "key:[ \t]*value[ \t]kB". - */ - private static final String PROCFS_MEMFILE = "/proc/meminfo"; - private static final Pattern PROCFS_MEMFILE_FORMAT = - Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB"); - - // We need the values for the following keys in meminfo - private static final String MEMTOTAL_STRING = "MemTotal"; - private static final String SWAPTOTAL_STRING = "SwapTotal"; - private static final String MEMFREE_STRING = "MemFree"; - private static final String SWAPFREE_STRING = "SwapFree"; - private static final String INACTIVE_STRING = "Inactive"; - - /** - * Patterns for parsing /proc/cpuinfo. - */ - private static final String PROCFS_CPUINFO = "/proc/cpuinfo"; - private static final Pattern PROCESSOR_FORMAT = - Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)"); - private static final Pattern FREQUENCY_FORMAT = - Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)"); - private static final Pattern PHYSICAL_ID_FORMAT = - Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)"); - private static final Pattern CORE_ID_FORMAT = - Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)"); - - /** - * Pattern for parsing /proc/stat. - */ - private static final String PROCFS_STAT = "/proc/stat"; - private static final Pattern CPU_TIME_FORMAT = - Pattern.compile("^cpu[ \t]*([0-9]*)" + - "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); - private CpuTimeTracker cpuTimeTracker; - - private String procfsMemFile; - private String procfsCpuFile; - private String procfsStatFile; - private long jiffyLengthInMillis; - - private long ramSize = 0; - private long swapSize = 0; - private long ramSizeFree = 0; // free ram space on the machine (kB) - private long swapSizeFree = 0; // free swap space on the machine (kB) - private long inactiveSize = 0; // inactive cache memory (kB) - /* number of logical processors on the system. */ - private int numProcessors = 0; - /* number of physical cores on the system. */ - private int numCores = 0; - private long cpuFrequency = 0L; // CPU frequency on the system (kHz) - - private boolean readMemInfoFile = false; - private boolean readCpuInfoFile = false; - - /** - * Get current time. - * @return Unix time stamp in millisecond - */ - long getCurrentTime() { - return System.currentTimeMillis(); - } public LinuxResourceCalculatorPlugin() { - this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT, - ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS); - } - - /** - * Constructor which allows assigning the /proc/ directories. This will be - * used only in unit tests. - * @param procfsMemFile fake file for /proc/meminfo - * @param procfsCpuFile fake file for /proc/cpuinfo - * @param procfsStatFile fake file for /proc/stat - * @param jiffyLengthInMillis fake jiffy length value - */ - public LinuxResourceCalculatorPlugin(String procfsMemFile, - String procfsCpuFile, - String procfsStatFile, - long jiffyLengthInMillis) { - this.procfsMemFile = procfsMemFile; - this.procfsCpuFile = procfsCpuFile; - this.procfsStatFile = procfsStatFile; - this.jiffyLengthInMillis = jiffyLengthInMillis; - this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); - } - - /** - * Read /proc/meminfo, parse and compute memory information only once. - */ - private void readProcMemInfoFile() { - readProcMemInfoFile(false); - } - - /** - * Read /proc/meminfo, parse and compute memory information. - * @param readAgain if false, read only on the first time - */ - private void readProcMemInfoFile(boolean readAgain) { - - if (readMemInfoFile && !readAgain) { - return; - } - - // Read "/proc/memInfo" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsMemFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - LOG.warn("Couldn't read " + procfsMemFile - + "; can't determine memory settings"); - return; - } - - Matcher mat; - - try { - String str = in.readLine(); - while (str != null) { - mat = PROCFS_MEMFILE_FORMAT.matcher(str); - if (mat.find()) { - if (mat.group(1).equals(MEMTOTAL_STRING)) { - ramSize = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(SWAPTOTAL_STRING)) { - swapSize = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(MEMFREE_STRING)) { - ramSizeFree = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(SWAPFREE_STRING)) { - swapSizeFree = Long.parseLong(mat.group(2)); - } else if (mat.group(1).equals(INACTIVE_STRING)) { - inactiveSize = Long.parseLong(mat.group(2)); - } - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - - readMemInfoFile = true; - } - - /** - * Read /proc/cpuinfo, parse and calculate CPU information. - */ - private void readProcCpuInfoFile() { - // This directory needs to be read only once - if (readCpuInfoFile) { - return; - } - HashSet<String> coreIdSet = new HashSet<>(); - // Read "/proc/cpuinfo" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsCpuFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info"); - return; - } - Matcher mat; - try { - numProcessors = 0; - numCores = 1; - String currentPhysicalId = ""; - String str = in.readLine(); - while (str != null) { - mat = PROCESSOR_FORMAT.matcher(str); - if (mat.find()) { - numProcessors++; - } - mat = FREQUENCY_FORMAT.matcher(str); - if (mat.find()) { - cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz - } - mat = PHYSICAL_ID_FORMAT.matcher(str); - if (mat.find()) { - currentPhysicalId = str; - } - mat = CORE_ID_FORMAT.matcher(str); - if (mat.find()) { - coreIdSet.add(currentPhysicalId + " " + str); - numCores = coreIdSet.size(); - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - readCpuInfoFile = true; - } - - /** - * Read /proc/stat file, parse and calculate cumulative CPU. - */ - private void readProcStatFile() { - // Read "/proc/stat" file - BufferedReader in; - InputStreamReader fReader; - try { - fReader = new InputStreamReader( - new FileInputStream(procfsStatFile), Charset.forName("UTF-8")); - in = new BufferedReader(fReader); - } catch (FileNotFoundException f) { - // shouldn't happen.... - return; - } - - Matcher mat; - try { - String str = in.readLine(); - while (str != null) { - mat = CPU_TIME_FORMAT.matcher(str); - if (mat.find()) { - long uTime = Long.parseLong(mat.group(1)); - long nTime = Long.parseLong(mat.group(2)); - long sTime = Long.parseLong(mat.group(3)); - cpuTimeTracker.updateElapsedJiffies( - BigInteger.valueOf(uTime + nTime + sTime), - getCurrentTime()); - break; - } - str = in.readLine(); - } - } catch (IOException io) { - LOG.warn("Error reading the stream " + io); - } finally { - // Close the streams - try { - fReader.close(); - try { - in.close(); - } catch (IOException i) { - LOG.warn("Error closing the stream " + in); - } - } catch (IOException i) { - LOG.warn("Error closing the stream " + fReader); - } - } - } - - /** {@inheritDoc} */ - @Override - public long getPhysicalMemorySize() { - readProcMemInfoFile(); - return ramSize * 1024; + super(new SysInfoLinux()); } - /** {@inheritDoc} */ - @Override - public long getVirtualMemorySize() { - readProcMemInfoFile(); - return (ramSize + swapSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getAvailablePhysicalMemorySize() { - readProcMemInfoFile(true); - return (ramSizeFree + inactiveSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public long getAvailableVirtualMemorySize() { - readProcMemInfoFile(true); - return (ramSizeFree + swapSizeFree + inactiveSize) * 1024; - } - - /** {@inheritDoc} */ - @Override - public int getNumProcessors() { - readProcCpuInfoFile(); - return numProcessors; - } - - /** {@inheritDoc} */ - @Override - public int getNumCores() { - readProcCpuInfoFile(); - return numCores; - } - - /** {@inheritDoc} */ - @Override - public long getCpuFrequency() { - readProcCpuInfoFile(); - return cpuFrequency; - } - - /** {@inheritDoc} */ - @Override - public long getCumulativeCpuTime() { - readProcStatFile(); - return cpuTimeTracker.cumulativeCpuTime.longValue(); - } - - /** {@inheritDoc} */ - @Override - public float getCpuUsage() { - readProcStatFile(); - float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent(); - if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) { - overallCpuUsage = overallCpuUsage / getNumProcessors(); - } - return overallCpuUsage; - } - - /** - * Test the {@link LinuxResourceCalculatorPlugin}. - * - * @param args - arguments to this calculator test - */ - public static void main(String[] args) { - LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin(); - System.out.println("Physical memory Size (bytes) : " - + plugin.getPhysicalMemorySize()); - System.out.println("Total Virtual memory Size (bytes) : " - + plugin.getVirtualMemorySize()); - System.out.println("Available Physical memory Size (bytes) : " - + plugin.getAvailablePhysicalMemorySize()); - System.out.println("Total Available Virtual memory Size (bytes) : " - + plugin.getAvailableVirtualMemorySize()); - System.out.println("Number of Processors : " + plugin.getNumProcessors()); - System.out.println("CPU frequency (kHz) : " + plugin.getCpuFrequency()); - System.out.println("Cumulative CPU time (ms) : " + - plugin.getCumulativeCpuTime()); - try { - // Sleep so we can compute the CPU usage - Thread.sleep(500L); - } catch (InterruptedException e) { - // do nothing - } - System.out.println("CPU usage % : " + plugin.getCpuUsage()); - } - - @VisibleForTesting - void setReadCpuInfoFile(boolean readCpuInfoFileValue) { - this.readCpuInfoFile = readCpuInfoFileValue; - } - - public long getJiffyLengthInMillis() { - return this.jiffyLengthInMillis; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java index df9d28a..2345c62 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ProcfsBasedProcessTree.java @@ -40,9 +40,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.CpuTimeTracker; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.SysInfoLinux; import org.apache.hadoop.yarn.conf.YarnConfiguration; /** @@ -64,8 +64,9 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { public static final String PROCFS_STAT_FILE = "stat"; public static final String PROCFS_CMDLINE_FILE = "cmdline"; - public static final long PAGE_SIZE; - public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond + public static final long PAGE_SIZE = SysInfoLinux.PAGE_SIZE; + public static final long JIFFY_LENGTH_IN_MILLIS = + SysInfoLinux.JIFFY_LENGTH_IN_MILLIS; // in millisecond private final CpuTimeTracker cpuTimeTracker; private Clock clock; @@ -108,31 +109,6 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree { protected Map<String, ProcessTreeSmapMemInfo> processSMAPTree = new HashMap<String, ProcessTreeSmapMemInfo>(); - static { - long jiffiesPerSecond = -1; - long pageSize = -1; - try { - if(Shell.LINUX) { - ShellCommandExecutor shellExecutorClk = new ShellCommandExecutor( - new String[] { "getconf", "CLK_TCK" }); - shellExecutorClk.execute(); - jiffiesPerSecond = Long.parseLong(shellExecutorClk.getOutput().replace("\n", "")); - - ShellCommandExecutor shellExecutorPage = new ShellCommandExecutor( - new String[] { "getconf", "PAGESIZE" }); - shellExecutorPage.execute(); - pageSize = Long.parseLong(shellExecutorPage.getOutput().replace("\n", "")); - - } - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - } finally { - JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ? - Math.round(1000D / jiffiesPerSecond) : -1; - PAGE_SIZE = pageSize; - } - } - // to enable testing, using this variable which can be configured // to a test directory. private String procfsDir; http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 40bd44e..5e5f1b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; import org.apache.hadoop.classification.InterfaceAudience; @@ -23,29 +22,42 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.SysInfo; /** * Plugin to calculate resource information on the system. - * */ @InterfaceAudience.LimitedPrivate({"YARN", "MAPREDUCE"}) @InterfaceStability.Unstable -public abstract class ResourceCalculatorPlugin extends Configured { +public class ResourceCalculatorPlugin extends Configured { + + private final SysInfo sys; + + protected ResourceCalculatorPlugin() { + this(SysInfo.newInstance()); + } + + public ResourceCalculatorPlugin(SysInfo sys) { + this.sys = sys; + } /** * Obtain the total size of the virtual memory present in the system. * * @return virtual memory size in bytes. */ - public abstract long getVirtualMemorySize(); + public long getVirtualMemorySize() { + return sys.getVirtualMemorySize(); + } /** * Obtain the total size of the physical memory present in the system. * * @return physical memory size bytes. */ - public abstract long getPhysicalMemorySize(); + public long getPhysicalMemorySize() { + return sys.getPhysicalMemorySize(); + } /** * Obtain the total size of the available virtual memory present @@ -53,7 +65,9 @@ public abstract class ResourceCalculatorPlugin extends Configured { * * @return available virtual memory size in bytes. */ - public abstract long getAvailableVirtualMemorySize(); + public long getAvailableVirtualMemorySize() { + return sys.getAvailableVirtualMemorySize(); + } /** * Obtain the total size of the available physical memory present @@ -61,42 +75,54 @@ public abstract class ResourceCalculatorPlugin extends Configured { * * @return available physical memory size bytes. */ - public abstract long getAvailablePhysicalMemorySize(); + public long getAvailablePhysicalMemorySize() { + return sys.getAvailablePhysicalMemorySize(); + } /** * Obtain the total number of logical processors present on the system. * * @return number of logical processors */ - public abstract int getNumProcessors(); + public int getNumProcessors() { + return sys.getNumProcessors(); + } /** * Obtain total number of physical cores present on the system. * * @return number of physical cores */ - public abstract int getNumCores(); + public int getNumCores() { + return sys.getNumCores(); + } /** * Obtain the CPU frequency of on the system. * * @return CPU frequency in kHz */ - public abstract long getCpuFrequency(); + public long getCpuFrequency() { + return sys.getCpuFrequency(); + } /** * Obtain the cumulative CPU time since the system is on. * * @return cumulative CPU time in milliseconds */ - public abstract long getCumulativeCpuTime(); + public long getCumulativeCpuTime() { + return sys.getCumulativeCpuTime(); + } /** * Obtain the CPU usage % of the machine. Return -1 if it is unavailable * * @return CPU usage in % */ - public abstract float getCpuUsage(); + public float getCpuUsage() { + return sys.getCpuUsage(); + } /** * Create the ResourceCalculatorPlugin from the class name and configure it. If @@ -114,21 +140,11 @@ public abstract class ResourceCalculatorPlugin extends Configured { if (clazz != null) { return ReflectionUtils.newInstance(clazz, conf); } - - // No class given, try a os specific class try { - if (Shell.LINUX) { - return new LinuxResourceCalculatorPlugin(); - } - if (Shell.WINDOWS) { - return new WindowsResourceCalculatorPlugin(); - } - } catch (SecurityException se) { - // Failed to get Operating System name. + return new ResourceCalculatorPlugin(); + } catch (SecurityException e) { return null; } - - // Not supported on this system. - return null; } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java index 7d9c7d3..ebe8df1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsBasedProcessTree.java @@ -229,7 +229,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree { @Override public float getCpuUsagePercent() { - return CpuTimeTracker.UNAVAILABLE; + return UNAVAILABLE; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/fc989ebe/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java index cdbf525..f817b7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/WindowsResourceCalculatorPlugin.java @@ -15,162 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.yarn.util; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.SysInfoWindows; -@Private [email protected] [email protected] public class WindowsResourceCalculatorPlugin extends ResourceCalculatorPlugin { - - static final Log LOG = LogFactory - .getLog(WindowsResourceCalculatorPlugin.class); - - long vmemSize; - long memSize; - long vmemAvailable; - long memAvailable; - int numProcessors; - long cpuFrequencyKhz; - long cumulativeCpuTimeMs; - float cpuUsage; - - long lastRefreshTime; - private final int refreshIntervalMs = 1000; - - WindowsBasedProcessTree pTree = null; - - public WindowsResourceCalculatorPlugin() { - lastRefreshTime = 0; - reset(); - } - - void reset() { - vmemSize = -1; - memSize = -1; - vmemAvailable = -1; - memAvailable = -1; - numProcessors = -1; - cpuFrequencyKhz = -1; - cumulativeCpuTimeMs = -1; - cpuUsage = -1; - } - - String getSystemInfoInfoFromShell() { - ShellCommandExecutor shellExecutor = new ShellCommandExecutor( - new String[] { Shell.WINUTILS, "systeminfo" }); - try { - shellExecutor.execute(); - return shellExecutor.getOutput(); - } catch (IOException e) { - LOG.error(StringUtils.stringifyException(e)); - } - return null; - } - - void refreshIfNeeded() { - long now = System.currentTimeMillis(); - if (now - lastRefreshTime > refreshIntervalMs) { - long refreshInterval = now - lastRefreshTime; - lastRefreshTime = now; - long lastCumCpuTimeMs = cumulativeCpuTimeMs; - reset(); - String sysInfoStr = getSystemInfoInfoFromShell(); - if (sysInfoStr != null) { - final int sysInfoSplitCount = 7; - String[] sysInfo = sysInfoStr.substring(0, sysInfoStr.indexOf("\r\n")) - .split(","); - if (sysInfo.length == sysInfoSplitCount) { - try { - vmemSize = Long.parseLong(sysInfo[0]); - memSize = Long.parseLong(sysInfo[1]); - vmemAvailable = Long.parseLong(sysInfo[2]); - memAvailable = Long.parseLong(sysInfo[3]); - numProcessors = Integer.parseInt(sysInfo[4]); - cpuFrequencyKhz = Long.parseLong(sysInfo[5]); - cumulativeCpuTimeMs = Long.parseLong(sysInfo[6]); - if (lastCumCpuTimeMs != -1) { - cpuUsage = (cumulativeCpuTimeMs - lastCumCpuTimeMs) - / (refreshInterval * 1.0f); - } - - } catch (NumberFormatException nfe) { - LOG.warn("Error parsing sysInfo." + nfe); - } - } else { - LOG.warn("Expected split length of sysInfo to be " - + sysInfoSplitCount + ". Got " + sysInfo.length); - } - } - } - } - - /** {@inheritDoc} */ - @Override - public long getVirtualMemorySize() { - refreshIfNeeded(); - return vmemSize; - } - - /** {@inheritDoc} */ - @Override - public long getPhysicalMemorySize() { - refreshIfNeeded(); - return memSize; - } - /** {@inheritDoc} */ - @Override - public long getAvailableVirtualMemorySize() { - refreshIfNeeded(); - return vmemAvailable; - } - - /** {@inheritDoc} */ - @Override - public long getAvailablePhysicalMemorySize() { - refreshIfNeeded(); - return memAvailable; - } - - /** {@inheritDoc} */ - @Override - public int getNumProcessors() { - refreshIfNeeded(); - return numProcessors; - } - - /** {@inheritDoc} */ - @Override - public int getNumCores() { - return getNumProcessors(); - } - - /** {@inheritDoc} */ - @Override - public long getCpuFrequency() { - refreshIfNeeded(); - return cpuFrequencyKhz; - } - - /** {@inheritDoc} */ - @Override - public long getCumulativeCpuTime() { - refreshIfNeeded(); - return cumulativeCpuTimeMs; + public WindowsResourceCalculatorPlugin() { + super(new SysInfoWindows()); } - /** {@inheritDoc} */ - @Override - public float getCpuUsage() { - refreshIfNeeded(); - return cpuUsage; - } }
