ayushtkn commented on code in PR #379:
URL: https://github.com/apache/tez/pull/379#discussion_r1855099184
##########
tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounter.java:
##########
@@ -19,16 +19,42 @@
package org.apache.tez.common.counters;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
@Private
public enum FileSystemCounter {
- BYTES_READ,
- BYTES_WRITTEN,
- READ_OPS,
- LARGE_READ_OPS,
- WRITE_OPS,
- HDFS_BYTES_READ,
- HDFS_BYTES_WRITTEN,
- FILE_BYTES_READ,
- FILE_BYTES_WRITTEN
+ BYTES_READ("bytesRead"),
+ BYTES_WRITTEN("bytesWritten"),
+ READ_OPS("readOps"),
+ LARGE_READ_OPS("largeReadOps"),
+ WRITE_OPS("writeOps"),
+ HDFS_BYTES_READ("hdfsBytesRead"),
+ HDFS_BYTES_WRITTEN("hdfsBytesWritten"),
+ FILE_BYTES_READ("fileBytesRead"),
+ FILE_BYTES_WRITTEN("fileBytesWritten"),
+
+ // Additional counters from HADOOP-13305
Review Comment:
There seems to be some more counters as well, like
``OP_CREATE_NON_RECURSIVE``, ``OP_EXISTS``, ``OP_IS_FILE`` etc
Should we include them as well?
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
Review Comment:
can be ``final``
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ remoteFs.mkdirs(new Path("/tmp/foo/"));
+ FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
+ out.writeBytes("xyz");
+ out.close();
+
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
Review Comment:
nit, can use logger format, here & below for same lines
```
LOG.info("Counters: {}", counters);
```
##########
tez-runtime-internals/src/main/java/org/apache/tez/runtime/metrics/TaskCounterUpdater.java:
##########
@@ -69,32 +75,16 @@ public TaskCounterUpdater(TezCounters counters,
Configuration conf, String pid)
public void updateCounters() {
- // FileSystemStatistics are reset each time a new task is seen by the
- // container.
- // This doesn't remove the fileSystem, and does not clear all statistics -
- // so there is a potential of an unused FileSystem showing up for a
- // Container, and strange values for READ_OPS etc.
- Map<String, List<FileSystem.Statistics>> map = new
- HashMap<String, List<FileSystem.Statistics>>();
- for(Statistics stat: FileSystem.getAllStatistics()) {
- String uriScheme = stat.getScheme();
- if (map.containsKey(uriScheme)) {
- List<FileSystem.Statistics> list = map.get(uriScheme);
- list.add(stat);
- } else {
- List<FileSystem.Statistics> list = new
ArrayList<FileSystem.Statistics>();
- list.add(stat);
- map.put(uriScheme, list);
- }
- }
- for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet())
{
- FileSystemStatisticUpdater updater =
statisticUpdaters.get(entry.getKey());
- if(updater==null) {//new FileSystem has been found in the cache
- updater =
- new FileSystemStatisticUpdater(tezCounters, entry.getValue(),
- entry.getKey());
- statisticUpdaters.put(entry.getKey(), updater);
+ GlobalStorageStatistics globalStorageStatistics =
FileSystem.getGlobalStorageStatistics();
+ Iterator<StorageStatistics> iter = globalStorageStatistics.iterator();
+ while (iter.hasNext()) {
+ StorageStatistics stats = iter.next();
+ if (!statisticUpdaters.containsKey(stats.getScheme())) {
+ Map<String, FileSystemStatisticUpdater> updaterSet = new TreeMap<>();
+ statisticUpdaters.put(stats.getScheme(), updaterSet);
}
+ FileSystemStatisticUpdater updater =
statisticUpdaters.get(stats.getScheme())
+ .computeIfAbsent(stats.getName(), k -> new
FileSystemStatisticUpdater(tezCounters, stats));
Review Comment:
this looks like ``computeIfAbsent``
maybe can do
```
// Fetch or initialize the updater set for the scheme
Map<String, FileSystemStatisticUpdater> updaterSet = statisticUpdaters
.computeIfAbsent(stats.getScheme(), k -> new TreeMap<>());
// Fetch or create the updater for the specific statistic
FileSystemStatisticUpdater updater = updaterSet
.computeIfAbsent(stats.getName(), k -> new
FileSystemStatisticUpdater(tezCounters, stats));
```
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
+ private static Configuration conf = new Configuration();
+
+ @Test
+ public void basicTest() {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+
+ Assert.assertNotNull(gcCounter);
+ Assert.assertNotNull(cpuCounter);
+ long oldVal = cpuCounter.getValue();
+ Assert.assertTrue(cpuCounter.getValue() > 0);
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldVal
+ + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal);
+
+ }
+
+
Review Comment:
avoid
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ remoteFs.mkdirs(new Path("/tmp/foo/"));
+ FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
+ out.writeBytes("xyz");
+ out.close();
+
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_MKDIRS);
+ TezCounter createCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_CREATE);
+ Assert.assertNotNull(mkdirCounter);
+ Assert.assertNotNull(createCounter);
+ Assert.assertEquals(1, mkdirCounter.getValue());
+ Assert.assertEquals(1, createCounter.getValue());
+
+ FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt"));
+ out1.writeBytes("xyz");
+ out1.close();
+
+ long oldCreateVal = createCounter.getValue();
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldCreateVal
+ + ", new=" + createCounter.getValue(), createCounter.getValue() >
oldCreateVal);
+
+ oldCreateVal = createCounter.getValue();
+ // Ensure all numbers are reset
+ remoteFs.clearStatistics();
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ Assert.assertEquals(oldCreateVal, createCounter.getValue());
+
+ }
+
+
+
Review Comment:
nit, avoid these many empty lines :-)
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
+ private static Configuration conf = new Configuration();
+
+ @Test
+ public void basicTest() {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
Review Comment:
use {}
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
Review Comment:
``.format(true).racks(null)`` ain't required IMO, they are by default `true`
& `null`
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ remoteFs.mkdirs(new Path("/tmp/foo/"));
+ FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
+ out.writeBytes("xyz");
+ out.close();
Review Comment:
can use this
```
DFSTestUtil.writeFile(remoteFs, new Path("/tmp/foo/abc.txt"), "xyz");
```
and below as well
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
+ private static Configuration conf = new Configuration();
+
+ @Test
+ public void basicTest() {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ TezCounter gcCounter = counters.findCounter(TaskCounter.GC_TIME_MILLIS);
+ TezCounter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
+
+ Assert.assertNotNull(gcCounter);
+ Assert.assertNotNull(cpuCounter);
+ long oldVal = cpuCounter.getValue();
+ Assert.assertTrue(cpuCounter.getValue() > 0);
+
+ updater.updateCounters();
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldVal
+ + ", new=" + cpuCounter.getValue(), cpuCounter.getValue() > oldVal);
Review Comment:
should we put some sleep before ``updateCounters``, just thinking in extreme
conditions, this check won't go flaky, right?
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
Review Comment:
nit
line break wasn't required I think
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestFileSystemStatisticUpdater.java:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.tez.common.counters.FileSystemCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestFileSystemStatisticUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestFileSystemStatisticUpdater.class);
+
+ private static MiniDFSCluster dfsCluster;
+
+ private static Configuration conf = new Configuration();
+ private static FileSystem remoteFs;
+
+ private static final String TEST_ROOT_DIR = "target" + Path.SEPARATOR +
+ TestFileSystemStatisticUpdater.class.getName() + "-tmpDir";
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ try {
+ conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+ dfsCluster = new
MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+ .build();
+ remoteFs = dfsCluster.getFileSystem();
+ } catch (IOException io) {
+ throw new RuntimeException("problem starting mini dfs cluster", io);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ }
+
+ @Test
+ public void basicTest() throws IOException {
+ TezCounters counters = new TezCounters();
+ TaskCounterUpdater updater = new TaskCounterUpdater(counters, conf, "pid");
+
+ remoteFs.mkdirs(new Path("/tmp/foo/"));
+ FSDataOutputStream out = remoteFs.create(new Path("/tmp/foo/abc.txt"));
+ out.writeBytes("xyz");
+ out.close();
+
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ TezCounter mkdirCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_MKDIRS);
+ TezCounter createCounter = counters.findCounter(remoteFs.getScheme(),
+ FileSystemCounter.OP_CREATE);
+ Assert.assertNotNull(mkdirCounter);
+ Assert.assertNotNull(createCounter);
+ Assert.assertEquals(1, mkdirCounter.getValue());
+ Assert.assertEquals(1, createCounter.getValue());
+
+ FSDataOutputStream out1 = remoteFs.create(new Path("/tmp/foo/abc1.txt"));
+ out1.writeBytes("xyz");
+ out1.close();
+
+ long oldCreateVal = createCounter.getValue();
+ updater.updateCounters();
+
+ LOG.info("Counters: " + counters);
+ Assert.assertTrue("Counter not updated, old=" + oldCreateVal
+ + ", new=" + createCounter.getValue(), createCounter.getValue() >
oldCreateVal);
+
+ oldCreateVal = createCounter.getValue();
+ // Ensure all numbers are reset
+ remoteFs.clearStatistics();
Review Comment:
clearStatistics is a static method, maybe better to do
```FileSystem.clearStatistics();```
##########
tez-runtime-internals/src/test/java/org/apache/tez/runtime/metrics/TestTaskCounterUpdater.java:
##########
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.metrics;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestTaskCounterUpdater {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestTaskCounterUpdater.class);
+ private static Configuration conf = new Configuration();
Review Comment:
can be ``final``
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]