Repository: tez Updated Branches: refs/heads/branch-0.7 765c75b37 -> 570a190c6
TEZ-3140. Reduce AM memory usage during serialization Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/570a190c Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/570a190c Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/570a190c Branch: refs/heads/branch-0.7 Commit: 570a190c6e9362783d1e93343118d28cf1e031d5 Parents: 765c75b Author: Rohini Palaniswamy <[email protected]> Authored: Mon Mar 7 07:57:22 2016 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Mon Mar 7 07:57:22 2016 -0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/api/EntityDescriptor.java | 104 +++++++++++++++++-- .../tez/dag/api/TestEntityDescriptor.java | 63 +++++++++++ 3 files changed, 157 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e73329e..2e2f852 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES TEZ-2972. Avoid task rescheduling when a node turns unhealthy ALL CHANGES: + TEZ-3140. Reduce AM memory usage during serialization TEZ-3156. Tez client keeps trying to talk to RM even if RM does not know about the application. TEZ-3115. Shuffle string handling adds significant memory overhead TEZ-3149. Tez-tools: Add username in DagInfo http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java index d02bddd..dcc4ebf 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/EntityDescriptor.java @@ -21,10 +21,11 @@ package org.apache.tez.dag.api; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; - import java.nio.ByteBuffer; + import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -32,7 +33,7 @@ import com.google.common.base.Preconditions; /** * Describes a given user code entity. Consists of the name of the class implementing - * the user logic and a payload that can be used to configure an object instance of + * the user logic and a payload that can be used to configure an object instance of * that class. In addition some history information can be set for logging/debugging. * <br>This is not supposed to be extended by users. Users are expected to use the derived * classes for specific entities @@ -41,6 +42,7 @@ import com.google.common.base.Preconditions; @SuppressWarnings("unchecked") public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements Writable { + private static final int SERIALIZE_BUFFER_SIZE = 8192; private UserPayload userPayload = null; private String className; protected String historyText; @@ -48,7 +50,7 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements @Private // for Writable public EntityDescriptor() { } - + public EntityDescriptor(String className) { this.className = className; } @@ -91,7 +93,7 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements public String getClassName() { return this.className; } - + @Override public void write(DataOutput out) throws IOException { Text.writeString(out, className); @@ -100,17 +102,25 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements if (bb == null) { out.writeInt(-1); } else { - int size = bb.limit() - bb.position(); + int size = bb.remaining(); if (size == 0) { out.writeInt(-1); } else { out.writeInt(size); - byte[] bytes = new byte[size]; - // This modified the ByteBuffer, and primarily works since UserPayload.getByteBuffer - // return a new copy each time - bb.get(bytes); - // TODO: TEZ-305 - should be more efficient by using protobuf serde. - out.write(bytes); + if (out instanceof DataOutputBuffer) { + DataOutputBuffer buf = (DataOutputBuffer) out; + buf.write(new ByteBufferDataInput(bb), size); + } else { + // This code is just for fallback in case serialization is changed to + // use something other than DataOutputBuffer. + int len; + byte[] buf = new byte[SERIALIZE_BUFFER_SIZE]; + do { + len = Math.min(bb.remaining(), SERIALIZE_BUFFER_SIZE); + bb.get(buf, 0, len); + out.write(buf, 0, len); + } while (bb.remaining() > 0); + } } out.writeInt(userPayload.getVersion()); } @@ -134,4 +144,76 @@ public abstract class EntityDescriptor<T extends EntityDescriptor<T>> implements userPayload == null ? false : userPayload.getPayload() == null ? false : true; return "ClassName=" + className + ", hasPayload=" + hasPayload; } + + private static class ByteBufferDataInput implements DataInput { + + private final ByteBuffer bb; + + public ByteBufferDataInput(ByteBuffer bb) { + this.bb = bb; + } + + @Override + public void readFully(byte[] b) throws IOException { + bb.get(b, 0, bb.remaining()); + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + bb.get(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public boolean readBoolean() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public byte readByte() throws IOException { + return bb.get(); + } + @Override + public int readUnsignedByte() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public short readShort() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public int readUnsignedShort() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public char readChar() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public int readInt() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public long readLong() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public float readFloat() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public double readDouble() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public String readLine() throws IOException { + throw new UnsupportedOperationException(); + } + @Override + public String readUTF() throws IOException { + throw new UnsupportedOperationException(); + } + } } http://git-wip-us.apache.org/repos/asf/tez/blob/570a190c/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java new file mode 100644 index 0000000..1e8a99d --- /dev/null +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestEntityDescriptor.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.dag.api; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.lang.RandomStringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.TezUtils; +import org.junit.Assert; +import org.junit.Test; + +public class TestEntityDescriptor { + + @Test + public void testEntityDescriptorHadoopSerialization() throws IOException { + // This tests the alternate serialization code path + // if the DataOutput is not DataOutputBuffer + Configuration conf = new Configuration(true); + String confVal = RandomStringUtils.random(10000, true, true); + conf.set("testKey", confVal); + UserPayload payload = TezUtils.createUserPayloadFromConf(conf); + InputDescriptor entityDescriptor = + InputDescriptor.create("inputClazz").setUserPayload(payload) + .setHistoryText("Bar123"); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(bos); + entityDescriptor.write(out); + out.close(); + + InputDescriptor deserialized = InputDescriptor.create("dummy"); + deserialized.readFields(new DataInputStream(new ByteArrayInputStream(bos.toByteArray()))); + + Assert.assertEquals(entityDescriptor.getClassName(), deserialized.getClassName()); + // History text is not serialized when sending to tasks + Assert.assertNull(deserialized.getHistoryText()); + Assert.assertArrayEquals(payload.deepCopyAsArray(), deserialized.getUserPayload().deepCopyAsArray()); + Configuration deserializedConf = TezUtils.createConfFromUserPayload(deserialized.getUserPayload()); + Assert.assertEquals(confVal, deserializedConf.get("testKey")); + } + +}
