Repository: tez Updated Branches: refs/heads/master 91a397b0b -> 495e6f0a4
TEZ-3284. Synchronization for every write in UnorderdKVWriter (jeagles) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/495e6f0a Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/495e6f0a Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/495e6f0a Branch: refs/heads/master Commit: 495e6f0a41c4e359c4e4f6786bea74f914d2c8b7 Parents: 91a397b Author: Jonathan Eagles <[email protected]> Authored: Thu Sep 8 10:37:58 2016 -0500 Committer: Jonathan Eagles <[email protected]> Committed: Thu Sep 8 10:38:13 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/tez/common/TezUtilsInternal.java | 6 +- .../common/io/NonSyncByteArrayInputStream.java | 99 ++++++++++++++++ .../common/io/NonSyncByteArrayOutputStream.java | 113 +++++++++++++++++++ .../tez/common/io/NonSyncDataOutputStream.java | 57 ++++++++++ .../apache/tez/dag/app/dag/impl/VertexImpl.java | 8 +- .../org/apache/tez/examples/JoinDataGen.java | 12 +- .../tez/mapreduce/hadoop/MRInputHelpers.java | 4 +- .../tez/history/parser/ATSFileParser.java | 4 +- .../vertexmanager/ShuffleVertexManagerBase.java | 4 +- .../common/shuffle/MemoryFetchedInput.java | 4 +- .../shuffle/orderedgrouped/InMemoryReader.java | 4 +- .../shuffle/orderedgrouped/InMemoryWriter.java | 4 +- .../common/sort/impl/PipelinedSorter.java | 6 +- .../common/sort/impl/dflt/DefaultSorter.java | 4 +- .../writers/UnorderedPartitionedKVWriter.java | 6 +- .../examples/MultipleCommitsExample.java | 12 +- 17 files changed, 309 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3f6281f..060d6d7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3284. Synchronization for every write in UnorderdKVWriter TEZ-3426. Second AM attempt launched for session mode and recovery disabled for certain cases TEZ-3230. Implement vertex manager and edge manager of cartesian product edge. TEZ-3326. Display JVM system properties in AM and task logs. http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java index 08a9aa8..b0b8906 100644 --- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java +++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java @@ -17,7 +17,6 @@ package org.apache.tez.common; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -41,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.log4j.Appender; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -139,7 +139,7 @@ public class TezUtilsInternal { private static byte[] compressBytesInflateDeflate(byte[] inBytes) { Deflater deflater = new Deflater(Deflater.BEST_SPEED); deflater.setInput(inBytes); - ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length); + NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length); deflater.finish(); byte[] buffer = new byte[1024 * 8]; while (!deflater.finished()) { @@ -153,7 +153,7 @@ public class TezUtilsInternal { private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException { Inflater inflater = new Inflater(); inflater.setInput(inBytes); - ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length); + NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(inBytes.length); byte[] buffer = new byte[1024 * 8]; while (!inflater.finished()) { int count; http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java new file mode 100644 index 0000000..5b6e52a --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayInputStream.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common.io; + +import java.io.ByteArrayInputStream; + +/** + * A thread-not-safe version of ByteArrayInputStream, which removes all + * synchronized modifiers. + */ +public class NonSyncByteArrayInputStream extends ByteArrayInputStream { + public NonSyncByteArrayInputStream(byte[] bs) { + super(bs); + } + + public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) { + super(buf, offset, length); + } + + /** + * {@inheritDoc} + */ + @Override + public int read() { + return (pos < count) ? (buf[pos++] & 0xff) : -1; + } + + /** + * {@inheritDoc} + */ + @Override + public int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + if (pos >= count) { + return -1; + } + + int avail = count - pos; + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + /** + * {@inheritDoc} + */ + @Override + public long skip(long n) { + long k = count - pos; + if (n < k) { + k = n < 0 ? 0 : n; + } + + pos += k; + return k; + } + + /** + * {@inheritDoc} + */ + @Override + public int available() { + return count - pos; + } + + /** + * {@inheritDoc} + */ + @Override + public void reset() { + pos = mark; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java new file mode 100644 index 0000000..40fae6f --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncByteArrayOutputStream.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common.io; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; + +/** + * A thread-not-safe version of ByteArrayOutputStream, which removes all + * synchronized modifiers. + */ +public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream { + public NonSyncByteArrayOutputStream(int size) { + super(size); + } + + public NonSyncByteArrayOutputStream() { + super(); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(int b) { + enLargeBuffer(1); + buf[count] = (byte) b; + count += 1; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(byte b[], int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + enLargeBuffer(len); + System.arraycopy(b, off, buf, count, len); + count += len; + } + + /** + * {@inheritDoc} + */ + @Override + public void reset() { + count = 0; + } + + public void write(DataInput in, int length) throws IOException { + enLargeBuffer(length); + in.readFully(buf, count, length); + count += length; + } + + private int enLargeBuffer(int increment) { + int temp = count + increment; + int newLen = temp; + if (temp > buf.length) { + if ((buf.length << 1) > temp) { + newLen = buf.length << 1; + } + byte newbuf[] = new byte[newLen]; + System.arraycopy(buf, 0, newbuf, 0, count); + buf = newbuf; + } + return newLen; + } + + /** + * {@inheritDoc} + */ + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } + + /** + * {@inheritDoc} + */ + @Override public byte toByteArray()[] { + return Arrays.copyOf(buf, count); + } + + /** + * {@inheritDoc} + */ public int size() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java ---------------------------------------------------------------------- diff --git a/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java new file mode 100644 index 0000000..d6302fe --- /dev/null +++ b/tez-common/src/main/java/org/apache/tez/common/io/NonSyncDataOutputStream.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.common.io; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * A thread-not-safe version of DataOutputStream, which removes all + * synchronized modifiers. + */ +public class NonSyncDataOutputStream extends DataOutputStream { + public NonSyncDataOutputStream(OutputStream stream) { + super(stream); + } + + private void incrementWritten(int len) { + written += len; + if (written < 0) { + written = Integer.MAX_VALUE; + } + } + + /** + * {@inheritDoc} + */ + @Override + public void write(int b) throws IOException { + out.write(b); + incrementWritten(1); + } + + /** + * {@inheritDoc} + */ + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + incrementWritten(len); + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index 01bca8f..1dd7756 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -17,8 +17,6 @@ package org.apache.tez.dag.app.dag.impl; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -65,6 +63,8 @@ import org.apache.tez.common.ReflectionUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; @@ -2583,7 +2583,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl + ", use NoOpVertexManager to replace it, vertexId=" + logIdentifier); LOG.debug("VertexReconfigureDoneEvent=" + reconfigureDoneEvent); } - ByteArrayOutputStream out = new ByteArrayOutputStream(); + NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream(); try { reconfigureDoneEvent.toProtoStream(out); } catch (IOException e) { @@ -4458,7 +4458,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl LOG.debug("initialize NoOpVertexManager"); } configurationDoneEvent = new VertexConfigurationDoneEvent(); - configurationDoneEvent.fromProtoStream(new ByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray())); + configurationDoneEvent.fromProtoStream(new NonSyncByteArrayInputStream(getContext().getUserPayload().deepCopyAsArray())); String vertexName = getContext().getVertexName(); if (getContext().getVertexNumTasks(vertexName) == -1) { Preconditions.checkArgument(configurationDoneEvent.isSetParallelismCalled(), "SetParallelism must be called " http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java index 02728aa..4c0d201 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinDataGen.java @@ -18,10 +18,7 @@ package org.apache.tez.examples; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; @@ -41,6 +38,9 @@ import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.dag.api.Vertex; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.mapreduce.output.MROutput; import org.apache.tez.mapreduce.processor.SimpleMRProcessor; import org.apache.tez.runtime.api.ProcessorContext; @@ -171,8 +171,8 @@ public class JoinDataGen extends TezExampleBase { public static byte[] createConfiguration(long streamOutputFileSize, long hashOutputFileSize) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bos); + NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(); + NonSyncDataOutputStream dos = new NonSyncDataOutputStream(bos); dos.writeLong(streamOutputFileSize); dos.writeLong(hashOutputFileSize); dos.close(); @@ -183,7 +183,7 @@ public class JoinDataGen extends TezExampleBase { @Override public void initialize() throws Exception { byte[] payload = getContext().getUserPayload().deepCopyAsArray(); - ByteArrayInputStream bis = new ByteArrayInputStream(payload); + NonSyncByteArrayInputStream bis = new NonSyncByteArrayInputStream(payload); DataInputStream dis = new DataInputStream(bis); streamOutputFileSize = dis.readLong(); hashOutputFileSize = dis.readLong(); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java index 6262e59..9b88c4d 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java @@ -18,7 +18,6 @@ package org.apache.tez.mapreduce.hadoop; -import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.tez.common.TezUtils; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.dag.api.DataSourceDescriptor; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.TaskLocationHint; @@ -247,7 +247,7 @@ public class MRInputHelpers { ByteString.Output os = ByteString .newOutput(SPLIT_SERIALIZED_LENGTH_ESTIMATE); - oldSplit.write(new DataOutputStream(os)); + oldSplit.write(new NonSyncDataOutputStream(os)); ByteString splitBs = os.toByteString(); builder.setSplitBytes(splitBs); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java ---------------------------------------------------------------------- diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java index b4f3df3..fb42129 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.commons.io.IOUtils; import org.apache.tez.dag.api.TezException; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; import org.apache.tez.history.parser.datamodel.BaseParser; import org.apache.tez.history.parser.datamodel.Constants; import org.apache.tez.history.parser.datamodel.DagInfo; @@ -35,7 +36,6 @@ import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -176,7 +176,7 @@ public class ATSFileParser extends BaseParser implements ATSData { private JSONObject readJson(InputStream in) throws IOException, JSONException { //Read entire content to memory - final ByteArrayOutputStream bout = new ByteArrayOutputStream(); + final NonSyncByteArrayOutputStream bout = new NonSyncByteArrayOutputStream(); IOUtils.copy(in, bout); return new JSONObject(new String(bout.toByteArray(), "UTF-8")); } http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java index 951ce30..9b88cfd 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.java @@ -29,6 +29,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; import org.apache.tez.dag.api.EdgeManagerPluginDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; @@ -55,7 +56,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto; -import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.util.BitSet; @@ -337,7 +337,7 @@ abstract class ShuffleVertexManagerBase extends VertexManagerPlugin { ByteString compressedPartitionStats = proto.getPartitionStats(); byte[] rawData = TezCommonUtils.decompressByteStringToByteArray( compressedPartitionStats); - ByteArrayInputStream bin = new ByteArrayInputStream(rawData); + NonSyncByteArrayInputStream bin = new NonSyncByteArrayInputStream(rawData); partitionStats.deserialize(new DataInputStream(bin)); parsePartitionStats(partitionStats); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java index e25a325..78f1f3b 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/MemoryFetchedInput.java @@ -18,11 +18,11 @@ package org.apache.tez.runtime.library.common.shuffle; -import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import com.google.common.base.Preconditions; @@ -45,7 +45,7 @@ public class MemoryFetchedInput extends FetchedInput { @Override public InputStream getInputStream() { - return new ByteArrayInputStream(byteStream.getBuffer()); + return new NonSyncByteArrayInputStream(byteStream.getBuffer()); } public byte[] getBytes() { http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java index 12fe057..41e8432 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryReader.java @@ -18,7 +18,6 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; -import java.io.ByteArrayInputStream; import java.io.DataInput; import java.io.File; import java.io.FileOutputStream; @@ -27,6 +26,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; import org.apache.tez.runtime.library.common.InputAttemptIdentifier; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @@ -38,7 +38,7 @@ import org.apache.tez.runtime.library.common.sort.impl.IFile.Reader; @InterfaceStability.Unstable public class InMemoryReader extends Reader { - private static class ByteArrayDataInput extends ByteArrayInputStream implements DataInput { + private static class ByteArrayDataInput extends NonSyncByteArrayInputStream implements DataInput { public ByteArrayDataInput(byte buf[], int offset, int length) { super(buf, offset, length); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java index 17d57a6..d2778d8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/InMemoryWriter.java @@ -17,7 +17,6 @@ */ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; -import java.io.DataOutputStream; import java.io.IOException; import org.slf4j.Logger; @@ -26,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.BoundedByteArrayOutputStream; import org.apache.hadoop.io.WritableUtils; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.library.common.sort.impl.IFile; import org.apache.tez.runtime.library.common.sort.impl.IFileOutputStream; import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer; @@ -40,7 +40,7 @@ public class InMemoryWriter extends Writer { public InMemoryWriter(BoundedByteArrayOutputStream arrayStream) { super(null, null); this.out = - new DataOutputStream(new IFileOutputStream(arrayStream)); + new NonSyncDataOutputStream(new IFileOutputStream(arrayStream)); } public void append(Object key, Object value) throws IOException { http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index 897d7d7..609e9ff 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -17,7 +17,6 @@ */ package org.apache.tez.runtime.library.common.sort.impl; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.BufferOverflowException; @@ -46,6 +45,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.CallableWithNdc; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.library.common.comparator.ProxyComparator; import org.apache.hadoop.io.RawComparator; @@ -863,7 +863,7 @@ public class PipelinedSorter extends ExternalSorter { final byte[] rawkvmeta; final int kvmetabase; final ByteBuffer kvbuffer; - final DataOutputStream out; + final NonSyncDataOutputStream out; final RawComparator comparator; final byte[] imeta = new byte[METASIZE]; @@ -895,7 +895,7 @@ public class PipelinedSorter extends ExternalSorter { kvmeta = kvmetabuffer .order(ByteOrder.nativeOrder()) .asIntBuffer(); - out = new DataOutputStream( + out = new NonSyncDataOutputStream( new BufferStreamWrapper(kvbuffer)); this.comparator = comparator; } http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java index 69bfdb8..873d8e1 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java @@ -18,7 +18,6 @@ package org.apache.tez.runtime.library.common.sort.impl.dflt; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -43,6 +42,7 @@ import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.Progress; import org.apache.tez.common.TezUtilsInternal; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -474,7 +474,7 @@ public final class DefaultSorter extends ExternalSorter implements IndexedSortab /** * Inner class managing the spill of serialized records to disk. */ - protected class BlockingBuffer extends DataOutputStream { + protected class BlockingBuffer extends NonSyncDataOutputStream { public BlockingBuffer() { super(new Buffer()); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java index 152096c..eff29a5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java @@ -17,7 +17,6 @@ */ package org.apache.tez.runtime.library.common.writers; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -52,6 +51,7 @@ import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.TezUtilsInternal; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.TaskFailureType; import org.apache.tez.runtime.api.OutputContext; @@ -102,7 +102,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit @VisibleForTesting final BlockingQueue<WrappedBuffer> availableBuffers; private final ByteArrayOutputStream baos; - private final DataOutputStream dos; + private final NonSyncDataOutputStream dos; @VisibleForTesting WrappedBuffer currentBuffer; private final FileSystem rfs; @@ -192,7 +192,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit } currentBuffer = buffers[0]; baos = new ByteArrayOutputStream(); - dos = new DataOutputStream(baos); + dos = new NonSyncDataOutputStream(baos); keySerializer.open(dos); valSerializer.open(dos); rfs = ((LocalFileSystem) FileSystem.getLocal(this.conf)).getRaw(); http://git-wip-us.apache.org/repos/asf/tez/blob/495e6f0a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java index fe7984b..5c93e87 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/MultipleCommitsExample.java @@ -17,12 +17,9 @@ */ package org.apache.tez.mapreduce.examples; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutput; -import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -34,6 +31,9 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.tez.client.TezClient; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; +import org.apache.tez.common.io.NonSyncDataOutputStream; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; import org.apache.tez.dag.api.GroupInputEdge; @@ -172,8 +172,8 @@ public class MultipleCommitsExample extends TezExampleBase { } public UserPayload toUserPayload() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - this.write(new DataOutputStream(out)); + NonSyncByteArrayOutputStream out = new NonSyncByteArrayOutputStream(); + this.write(new NonSyncDataOutputStream(out)); return UserPayload.create(ByteBuffer.wrap(out.toByteArray())); } @@ -181,7 +181,7 @@ public class MultipleCommitsExample extends TezExampleBase { throws IOException { MultipleOutputProcessorConfig config = new MultipleOutputProcessorConfig(); config.readFields(new DataInputStream( - new ByteArrayInputStream(payload.deepCopyAsArray()))); + new NonSyncByteArrayInputStream(payload.deepCopyAsArray()))); return config; } }
