TEZ-2203. Intern strings in tez counters (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/6e15b2f6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/6e15b2f6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/6e15b2f6 Branch: refs/heads/TEZ-2003 Commit: 6e15b2f6dac8471d266b8f313038f3767fedab04 Parents: 9b845f2 Author: Bikas Saha <[email protected]> Authored: Fri Mar 20 12:27:02 2015 -0700 Committer: Bikas Saha <[email protected]> Committed: Fri Mar 20 12:27:02 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../common/counters/AbstractCounterGroup.java | 7 +- .../common/counters/FileSystemCounterGroup.java | 9 +- .../common/counters/FrameworkCounterGroup.java | 3 +- .../tez/common/counters/GenericCounter.java | 12 +- .../tez/dag/app/TestMockDAGAppMaster.java | 111 ++++++++++++++++++- 6 files changed, 128 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 759504d..6045975 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ Release 0.7.0: Unreleased INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-2203. Intern strings in tez counters TEZ-2202. Fix LocalTaskExecutionThread ID to the standard thread numbering. TEZ-2059. Remove TaskEventHandler in TestDAGImpl TEZ-2191. Simulation improvements to MockDAGAppMaster http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java index 7e791d7..a4b153f 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringInterner; import com.google.common.collect.Iterators; @@ -49,8 +50,8 @@ public abstract class AbstractCounterGroup<T extends TezCounter> public AbstractCounterGroup(String name, String displayName, Limits limits) { - this.name = name; - this.displayName = displayName; + this.name = StringInterner.weakIntern(name); + this.displayName = StringInterner.weakIntern(displayName); this.limits = limits; } @@ -160,7 +161,7 @@ public abstract class AbstractCounterGroup<T extends TezCounter> @Override public synchronized void readFields(DataInput in) throws IOException { - displayName = Text.readString(in); + displayName = StringInterner.weakIntern(Text.readString(in)); counters.clear(); int size = WritableUtils.readVInt(in); for (int i = 0; i < size; i++) { http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java index 07df3fb..771f523 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java @@ -29,13 +29,16 @@ import java.util.Locale; import java.util.Map; import com.google.common.base.Joiner; + import static com.google.common.base.Preconditions.*; + import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringInterner; /** * An abstract class to provide common implementation of the filesystem @@ -54,7 +57,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter> // Just a few local casts probably worth not having to carry it around. private final Map<String, Object[]> map = new ConcurrentSkipListMap<String, Object[]>(); - private String displayName = "File System Counters"; + private String displayName = StringInterner.weakIntern("File System Counters"); private static final Joiner NAME_JOINER = Joiner.on('_'); @@ -65,7 +68,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter> private long value; public FSCounter(String scheme, FileSystemCounter ref) { - this.scheme = scheme; + this.scheme = scheme; // this is interned in the checkScheme() method via a map key = ref; } @@ -122,7 +125,7 @@ public abstract class FileSystemCounterGroup<C extends TezCounter> @Override public void setDisplayName(String displayName) { - this.displayName = displayName; + this.displayName = StringInterner.weakIntern(displayName); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java index 66b6e33..3a4aa97 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Iterator; -import com.google.common.base.Joiner; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.WritableUtils; @@ -60,7 +59,7 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>, public FrameworkCounter(T ref, String groupName) { key = ref; - this.groupName = groupName; + this.groupName = groupName; // this is interned in the fmap/i2s of CounterGroupFactory } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java index 5477606..4bb4c76 100644 --- a/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java +++ b/tez-api/src/main/java/org/apache/tez/common/counters/GenericCounter.java @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.StringInterner; /** * A generic counter implementation @@ -41,13 +42,12 @@ public class GenericCounter extends AbstractCounter { } public GenericCounter(String name, String displayName) { - this.name = name; - this.displayName = displayName; + this(name, displayName, 0); } public GenericCounter(String name, String displayName, long value) { - this.name = name; - this.displayName = displayName; + this.name = StringInterner.weakIntern(name); + this.displayName = StringInterner.weakIntern(displayName); this.value = value; } @@ -58,8 +58,8 @@ public class GenericCounter extends AbstractCounter { @Override public synchronized void readFields(DataInput in) throws IOException { - name = Text.readString(in); - displayName = in.readBoolean() ? Text.readString(in) : name; + name = StringInterner.weakIntern(Text.readString(in)); + displayName = in.readBoolean() ? StringInterner.weakIntern(Text.readString(in)) : name; value = WritableUtils.readVLong(in); } http://git-wip-us.apache.org/repos/asf/tez/blob/6e15b2f6/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java index a7feef8..822fe7f 100644 --- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java +++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java @@ -18,7 +18,11 @@ package org.apache.tez.dag.app; +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,11 +30,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataInputByteBuffer; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.URL; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; @@ -64,6 +72,7 @@ import org.apache.tez.runtime.api.impl.OutputSpec; import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.api.impl.TezEvent; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import com.google.common.collect.Maps; @@ -245,6 +254,12 @@ public class TestMockDAGAppMaster { Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, OutputDescriptor.create("Out"), InputDescriptor.create("In")))); + TezCounters temp = new TezCounters(); + temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(bos); + temp.write(out); + final byte[] payload = bos.toByteArray(); MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); @@ -254,7 +269,15 @@ public class TestMockDAGAppMaster { public TezCounters getCounters(TaskSpec taskSpec) { String vName = taskSpec.getVertexName(); TezCounters counters = new TezCounters(); - counters.findCounter(globalCounterName, globalCounterName).increment(1); + final DataInputByteBuffer in = new DataInputByteBuffer(); + in.reset(ByteBuffer.wrap(payload)); + try { + // this ensures that the serde code path is covered. + // the internal merges of counters covers the constructor code path. + counters.readFields(in); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } counters.findCounter(vName, procCounterName).increment(1); for (OutputSpec output : taskSpec.getOutputs()) { counters.findCounter(vName, output.getDestinationVertexName()).increment(1); @@ -281,7 +304,93 @@ public class TestMockDAGAppMaster { Assert.assertEquals(1, counters.findCounter(vBName, vAName).getValue()); // verify global counters Assert.assertEquals(11, counters.findCounter(globalCounterName, globalCounterName).getValue()); + VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName); + VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName); + TezCounters vACounters = vAImpl.getAllCounters(); + TezCounters vBCounters = vBImpl.getAllCounters(); + String vACounterName = vACounters.findCounter(globalCounterName, globalCounterName).getName(); + String vBCounterName = vBCounters.findCounter(globalCounterName, globalCounterName).getName(); + if (vACounterName != vBCounterName) { + Assert.fail("String counter name objects dont match despite interning."); + } + CounterGroup vaGroup = vACounters.getGroup(globalCounterName); + String vaGrouName = vaGroup.getName(); + CounterGroup vBGroup = vBCounters.getGroup(globalCounterName); + String vBGrouName = vBGroup.getName(); + if (vaGrouName != vBGrouName) { + Assert.fail("String group name objects dont match despite interning."); + } + + tezClient.stop(); + } + + private void checkMemory(String name, MockDAGAppMaster mockApp) { + long mb = 1024*1024; + + //Getting the runtime reference from system + Runtime runtime = Runtime.getRuntime(); + + System.out.println("##### Heap utilization statistics [MB] for " + name); + + runtime.gc(); + + //Print used memory + System.out.println("##### Used Memory:" + + (runtime.totalMemory() - runtime.freeMemory()) / mb); + + //Print free memory + System.out.println("##### Free Memory:" + + runtime.freeMemory() / mb); + + //Print total available memory + System.out.println("##### Total Memory:" + runtime.totalMemory() / mb); + + //Print Maximum available memory + System.out.println("##### Max Memory:" + runtime.maxMemory() / mb); + } + + @Ignore + @Test (timeout = 60000) + public void testBasicCounterMemory() throws Exception { + Logger.getRootLogger().setLevel(Level.WARN); + TezConfiguration tezconf = new TezConfiguration(defaultConf); + MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, + null, false, false); + tezClient.start(); + + final String vAName = "A"; + DAG dag = DAG.create("testBasicCounters"); + Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000); + dag.addVertex(vA); + + MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp(); + MockContainerLauncher mockLauncher = mockApp.getContainerLauncher(); + mockLauncher.startScheduling(false); + mockApp.countersDelegate = new CountersDelegate() { + @Override + public TezCounters getCounters(TaskSpec taskSpec) { + TezCounters counters = new TezCounters(); + final String longName = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz"; + final String shortName = "abcdefghijklmnopqrstuvwxyz"; + for (int i=0; i<6; ++i) { + for (int j=0; j<15; ++j) { + counters.findCounter((i + longName), (i + (shortName))).increment(1); + } + } + return counters; + } + }; + mockApp.doSleep = false; + DAGClient dagClient = tezClient.submitDAG(dag); + mockLauncher.waitTillContainersLaunched(); + DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG(); + mockLauncher.startScheduling(true); + DAGStatus status = dagClient.waitForCompletion(); + Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState()); + TezCounters counters = dagImpl.getAllCounters(); + Assert.assertNotNull(counters); + checkMemory(dag.getName(), mockApp); tezClient.stop(); }
