Repository: tez Updated Branches: refs/heads/branch-0.7 60b125686 -> 68fa017ee
TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/68fa017e Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/68fa017e Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/68fa017e Branch: refs/heads/branch-0.7 Commit: 68fa017ee0dce2fdf6dee180ee6eb9311b511ff2 Parents: 60b1256 Author: Jonathan Eagles <[email protected]> Authored: Thu May 12 13:30:55 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu May 12 13:30:55 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/TezUtilsInternal.java | 12 +-- .../java/org/apache/tez/util/StopWatch.java | 108 +++++++++++++++++++ .../java/org/apache/tez/util/TestStopWatch.java | 58 ++++++++++ .../common/MRInputAMSplitGenerator.java | 11 +- .../common/MRInputSplitDistributor.java | 7 +- .../library/common/shuffle/HttpConnection.java | 15 ++- .../common/sort/impl/PipelinedSorter.java | 10 +- tez-tests/pom.xml | 1 - .../tez/mapreduce/examples/RPCLoadGen.java | 7 +- 10 files changed, 199 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 55c79b7..2754b92 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ Release 0.7.2 Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3256. [Backport HADOOP-11032] Remove Guava Stopwatch dependency TEZ-2342. Reduce bytearray copy with TezEvent Serialization and deserialization TEZ-3243. Output vertices are hidden for UI graph view http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 55e1e6e..242ef1a 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -26,6 +26,7 @@ import java.nio.charset.Charset; import java.util.BitSet; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.zip.DataFormatException; @@ -45,11 +46,10 @@ import org.apache.tez.dag.api.TezConstants; import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto; import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair; +import org.apache.tez.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Stopwatch; - @Private public class TezUtilsInternal { @@ -81,23 +81,23 @@ public class TezUtilsInternal { public static byte[] compressBytes(byte[] inBytes) throws IOException { - Stopwatch sw = new Stopwatch().start(); + StopWatch sw = new StopWatch().start(); byte[] compressed = compressBytesInflateDeflate(inBytes); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length - + ", CompressTime: " + sw.elapsedMillis()); + + ", CompressTime: " + sw.now(TimeUnit.MILLISECONDS)); } return compressed; } public static byte[] uncompressBytes(byte[] inBytes) throws IOException { - Stopwatch sw = new Stopwatch().start(); + StopWatch sw = new StopWatch().start(); byte[] uncompressed = uncompressBytesInflateDeflate(inBytes); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length - + ", UncompressTimeTaken: " + sw.elapsedMillis()); + + ", UncompressTimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return uncompressed; } http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-common/src/main/java/org/apache/tez/util/StopWatch.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/util/StopWatch.java b/tez-common/src/main/java/org/apache/tez/util/StopWatch.java new file mode 100644 index 0000000..c9a573e --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/util/StopWatch.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.util; + +import java.io.Closeable; +import java.util.concurrent.TimeUnit; + +/** + * A simplified StopWatch implementation which can measure times in nanoseconds. + */ +public class StopWatch implements Closeable { + private boolean isStarted; + private long startNanos; + private long currentElapsedNanos; + + public StopWatch() { + } + + /** + * The method is used to find out if the StopWatch is started. + * @return boolean If the StopWatch is started. + */ + public boolean isRunning() { + return isStarted; + } + + /** + * Start to measure times and make the state of stopwatch running. + * @return this instance of StopWatch. + */ + public StopWatch start() { + if (isStarted) { + throw new IllegalStateException("StopWatch is already running"); + } + isStarted = true; + startNanos = System.nanoTime(); + return this; + } + + /** + * Stop elapsed time and make the state of stopwatch stop. + * @return this instance of StopWatch. + */ + public StopWatch stop() { + if (!isStarted) { + throw new IllegalStateException("StopWatch is already stopped"); + } + long now = System.nanoTime(); + isStarted = false; + currentElapsedNanos += now - startNanos; + return this; + } + + /** + * Reset elapsed time to zero and make the state of stopwatch stop. + * @return this instance of StopWatch. + */ + public StopWatch reset() { + currentElapsedNanos = 0; + isStarted = false; + return this; + } + + /** + * @return current elapsed time in specified timeunit. + */ + public long now(TimeUnit timeUnit) { + return timeUnit.convert(now(), TimeUnit.NANOSECONDS); + + } + + /** + * @return current elapsed time in nanosecond. + */ + public long now() { + return isStarted ? + System.nanoTime() - startNanos + currentElapsedNanos : + currentElapsedNanos; + } + + @Override + public String toString() { + return String.valueOf(now()); + } + + @Override + public void close() { + if (isStarted) { + stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-common/src/test/java/org/apache/tez/util/TestStopWatch.java ---------------------------------------------------------------------- diff --git a/tez-common/src/test/java/org/apache/tez/util/TestStopWatch.java b/tez-common/src/test/java/org/apache/tez/util/TestStopWatch.java new file mode 100644 index 0000000..b8523fd --- /dev/null +++ b/tez-common/src/test/java/org/apache/tez/util/TestStopWatch.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.util; + +import org.junit.Assert; +import org.junit.Test; + +public class TestStopWatch { + + @Test + public void testStartAndStop() throws Exception { + try { + StopWatch sw = new StopWatch(); + Assert.assertFalse(sw.isRunning()); + sw.start(); + Assert.assertTrue(sw.isRunning()); + sw.stop(); + Assert.assertFalse(sw.isRunning()); + } catch (Exception e) { + Assert.fail("StopWatch should not fail with normal usage"); + } + } + + @Test + public void testExceptions() throws Exception { + StopWatch sw = new StopWatch(); + try { + sw.stop(); + } catch (Exception e) { + Assert.assertTrue("IllegalStateException is expected", + e instanceof IllegalStateException); + } + sw.reset(); + sw.start(); + try { + sw.start(); + } catch (Exception e) { + Assert.assertTrue("IllegalStateException is expected", + e instanceof IllegalStateException); + } + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java index b93e4ba..d895ea3 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java @@ -19,8 +19,8 @@ package org.apache.tez.mapreduce.common; import java.util.List; +import java.util.concurrent.TimeUnit; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import org.slf4j.Logger; @@ -46,6 +46,7 @@ import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; +import org.apache.tez.util.StopWatch; /** * Implements an {@link InputInitializer} that generates Map Reduce @@ -69,13 +70,13 @@ public class MRInputAMSplitGenerator extends InputInitializer { @Override public List<Event> initialize() throws Exception { - Stopwatch sw = new Stopwatch().start(); + StopWatch sw = new StopWatch().start(); MRInputUserPayloadProto userPayloadProto = MRInputHelpers .parseMRInputPayload(getContext().getInputUserPayload()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time to parse MRInput payload into prot: " - + sw.elapsedMillis()); + + sw.now(TimeUnit.MILLISECONDS)); } sw.reset().start(); Configuration conf = TezUtils.createConfFromByteString(userPayloadProto @@ -89,7 +90,7 @@ public class MRInputAMSplitGenerator extends InputInitializer { if (LOG.isDebugEnabled()) { LOG.debug("Emitting serialized splits: " + sendSerializedEvents + " for input " + getContext().getInputName()); - LOG.debug("Time converting ByteString to configuration: " + sw.elapsedMillis()); + LOG.debug("Time converting ByteString to configuration: " + sw.now(TimeUnit.MILLISECONDS)); } sw.reset().start(); @@ -122,7 +123,7 @@ public class MRInputAMSplitGenerator extends InputInitializer { } sw.stop(); if (LOG.isDebugEnabled()) { - LOG.debug("Time to create splits to mem: " + sw.elapsedMillis()); + LOG.debug("Time to create splits to mem: " + sw.now(TimeUnit.MILLISECONDS)); } List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java index 28d108e..c60e5d4 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java @@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.common; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +42,8 @@ import org.apache.tez.runtime.api.InputInitializerContext; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.events.InputInitializerEvent; import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent; +import org.apache.tez.util.StopWatch; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; /** @@ -69,13 +70,13 @@ public class MRInputSplitDistributor extends InputInitializer { @Override public List<Event> initialize() throws IOException { - Stopwatch sw = new Stopwatch().start(); + StopWatch sw = new StopWatch().start(); MRInputUserPayloadProto userPayloadProto = MRInputHelpers .parseMRInputPayload(getContext().getInputUserPayload()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Time to parse MRInput payload into prot: " - + sw.elapsedMillis()); + + sw.now(TimeUnit.MILLISECONDS)); } Configuration conf = TezUtils.createConfFromByteString(userPayloadProto .getConfigurationBytes()); http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java index 17e6e90..dacbeed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java @@ -39,8 +39,7 @@ import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.tez.common.security.JobTokenSecretManager; import org.apache.tez.runtime.library.common.security.SecureShuffleUtils; import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader; - -import com.google.common.base.Stopwatch; +import org.apache.tez.util.StopWatch; /** * HttpConnection which can be used for Unordered / Ordered shuffle. @@ -70,7 +69,7 @@ public class HttpConnection { private String msgToEncode; private final HttpConnectionParams httpConnParams; - private final Stopwatch stopWatch; + private final StopWatch stopWatch; /** * HttpConnection @@ -87,7 +86,7 @@ public class HttpConnection { this.jobTokenSecretMgr = jobTokenSecretManager; this.httpConnParams = connParams; this.url = url; - this.stopWatch = new Stopwatch(); + this.stopWatch = new StopWatch(); if (LOG.isDebugEnabled()) { LOG.debug("MapOutput URL :" + url.toString()); } @@ -205,7 +204,7 @@ public class HttpConnection { } if (LOG.isDebugEnabled()) { LOG.debug("Time taken to connect to " + url.toString() + - " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures); + " " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures); } return true; } @@ -239,7 +238,7 @@ public class HttpConnection { SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr); //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host LOG.info("for url=" + url + - " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms"); + " sent hash and receievd reply " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms"); } /** @@ -257,7 +256,7 @@ public class HttpConnection { } if (LOG.isDebugEnabled()) { LOG.debug("Time taken to getInputStream (connect) " + url + - " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms"); + " " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms"); } return input; } @@ -300,7 +299,7 @@ public class HttpConnection { } if (LOG.isDebugEnabled()) { LOG.debug("Time taken to cleanup connection to " + url + - " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms"); + " " + stopWatch.now(TimeUnit.MILLISECONDS) + " ms"); } } http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index a2d06a8..be9b10c 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -32,11 +32,10 @@ import java.util.PriorityQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; - +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +58,7 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.DiskSegment; import org.apache.tez.runtime.library.common.sort.impl.TezMerger.Segment; +import org.apache.tez.util.StopWatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -225,15 +225,15 @@ public class PipelinedSorter extends ExternalSorter { if(newSpan == null) { //avoid sort/spill of empty span - Stopwatch stopWatch = new Stopwatch(); + StopWatch stopWatch = new StopWatch(); stopWatch.start(); // sort in the same thread, do not wait for the thread pool merger.add(span.sort(sorter)); spill(); stopWatch.stop(); - LOG.info("Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + LOG.info("Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms"); if (LOG.isDebugEnabled()) { - LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.elapsedMillis()) + " ms"); + LOG.debug(outputContext.getDestinationVertexName() + ": Time taken for spill " + (stopWatch.now(TimeUnit.MILLISECONDS)) + " ms"); } if (pipelinedShuffle) { sendPipelinedShuffleEvents(); http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tez-tests/pom.xml b/tez-tests/pom.xml index e16d038..616906f 100644 --- a/tez-tests/pom.xml +++ b/tez-tests/pom.xml @@ -32,7 +32,6 @@ <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-common</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.tez</groupId> http://git-wip-us.apache.org/repos/asf/tez/blob/68fa017e/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java index bf3e9db..e549ec1 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RPCLoadGen.java @@ -23,8 +23,8 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.TimeUnit; -import com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -48,6 +48,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.examples.TezExampleBase; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.processor.SimpleProcessor; +import org.apache.tez.util.StopWatch; import sun.misc.IOUtils; public class RPCLoadGen extends TezExampleBase { @@ -186,7 +187,7 @@ public class RPCLoadGen extends TezExampleBase { @Override public void run() throws Exception { - Stopwatch sw = new Stopwatch().start(); + StopWatch sw = new StopWatch().start(); long sleepTime = random.nextInt(sleepTimeMax); if (modeByte == VIA_RPC_BYTE) { LOG.info("Received via RPC."); @@ -203,7 +204,7 @@ public class RPCLoadGen extends TezExampleBase { } else { throw new IllegalArgumentException("Unknown execution mode: [" + modeByte + "]"); } - LOG.info("TimeTakenToAccessPayload=" + sw.stop().elapsedMillis()); + LOG.info("TimeTakenToAccessPayload=" + sw.stop().now(TimeUnit.MILLISECONDS)); LOG.info("Sleeping for: " + sleepTime); Thread.sleep(sleepTime); }
