Repository: tez Updated Branches: refs/heads/master c0d59139c -> ff7081e06
TEZ-1288. Create FastTezSerialization as an optional feature (rajesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ff7081e0 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ff7081e0 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ff7081e0 Branch: refs/heads/master Commit: ff7081e06db40bfb7e6dfc72a1c9ad6ca93be79f Parents: c0d5913 Author: Rajesh Balamohan <[email protected]> Authored: Tue Jul 29 03:37:12 2014 +0530 Committer: Rajesh Balamohan <[email protected]> Committed: Tue Jul 29 03:37:12 2014 +0530 ---------------------------------------------------------------------- .../org/apache/hadoop/io/HashComparator.java | 24 - .../runtime/library/common/ValuesIterator.java | 10 +- .../common/comparator/HashComparator.java | 24 + .../common/comparator/TezBytesComparator.java | 38 ++ .../TezBytesWritableSerialization.java | 129 ++++++ .../common/sort/impl/ExternalSorter.java | 4 + .../runtime/library/common/sort/impl/IFile.java | 6 +- .../common/sort/impl/PipelinedSorter.java | 2 +- .../conf/OnFileSortedOutputConfiguration.java | 36 ++ .../OnFileUnorderedKVOutputConfiguration.java | 29 ++ ...orderedPartitionedKVOutputConfiguration.java | 29 ++ .../OrderedPartitionedKVEdgeConfigurer.java | 30 ++ .../conf/ShuffledMergedInputConfiguration.java | 36 ++ .../ShuffledUnorderedKVInputConfiguration.java | 30 ++ .../UnorderedPartitionedKVEdgeConfigurer.java | 24 + .../UnorderedUnpartitionedKVEdgeConfigurer.java | 26 ++ .../library/common/TestValuesIterator.java | 460 +++++++++++++++++++ .../TestOnFileUnorderedPartitionedKVOutput.java | 9 +- .../TestOrderedPartitionedKVEdgeConfigurer.java | 53 +++ ...estUnorderedPartitionedKVEdgeConfigurer.java | 7 + ...tUnorderedUnpartitionedKVEdgeConfigurer.java | 6 + 21 files changed, 982 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java deleted file mode 100644 index a372e01..0000000 --- a/tez-runtime-library/src/main/java/org/apache/hadoop/io/HashComparator.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.io; - -public interface HashComparator<KEY> { - - int getHashCode(KEY key); - -} http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java index ab1cf7f..c0068eb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ValuesIterator.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; @@ -41,6 +43,7 @@ import com.google.common.base.Preconditions; */ public class ValuesIterator<KEY,VALUE> { + private static final Log LOG = LogFactory.getLog(ValuesIterator.class.getName()); protected TezRawKeyValueIterator in; //input iterator private KEY key; // current key private KEY nextKey; @@ -75,6 +78,7 @@ public class ValuesIterator<KEY,VALUE> { this.keyDeserializer.open(keyIn); this.valDeserializer = serializationFactory.getDeserializer(valClass); this.valDeserializer.open(this.valueIn); + LOG.info("keyDeserializer=" + keyDeserializer + "; valueDeserializer=" + valDeserializer); } TezRawKeyValueIterator getRawIterator() { return in; } @@ -175,7 +179,8 @@ public class ValuesIterator<KEY,VALUE> { more = in.next(); if (more) { DataInputBuffer nextKeyBytes = in.getKey(); - keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength()); + keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), + nextKeyBytes.getLength() - nextKeyBytes.getPosition()); nextKey = keyDeserializer.deserialize(nextKey); // TODO Is a counter increment required here ? hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0); @@ -190,7 +195,8 @@ public class ValuesIterator<KEY,VALUE> { */ private void readNextValue() throws IOException { DataInputBuffer nextValueBytes = in.getValue(); - valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength()); + valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), + nextValueBytes.getLength() - nextValueBytes.getPosition()); value = valDeserializer.deserialize(value); } } http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java new file mode 100644 index 0000000..4af03ba --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/HashComparator.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.comparator; + +public interface HashComparator<KEY> { + + int getHashCode(KEY key); + +} http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java new file mode 100644 index 0000000..149188e --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/comparator/TezBytesComparator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.runtime.library.common.comparator; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.WritableComparator; + +public class TezBytesComparator extends WritableComparator { + + public TezBytesComparator() { + super(BytesWritable.class); + } + + /** + * Compare the buffers in serialized form. + */ + @Override + public int compare(byte[] b1, int s1, int l1, + byte[] b2, int s2, int l2) { + return compareBytes(b1, s1, l1, b2, s2, l2); + } + +} http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java new file mode 100644 index 0000000..e195178 --- /dev/null +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/serializer/TezBytesWritableSerialization.java @@ -0,0 +1,129 @@ +package org.apache.tez.runtime.library.common.serializer; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.google.common.base.Preconditions; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.Serialization; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.util.ReflectionUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * <pre> + * When using BytesWritable, data is serialized in memory (4 bytes per key and 4 bytes per value) + * and written to IFile where it gets serialized again (4 bytes per key and 4 bytes per value). + * This adds an overhead of 8 bytes per key value pair written. This class reduces this overhead + * by providing a fast serializer/deserializer to speed up inner loop of sort, + * spill, merge. + * + * Usage e.g: + * OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer + * .newBuilder(keyClass, valClass, MRPartitioner.class.getName(), partitionerConf) + * .setFromConfiguration(conf) + * .setKeySerializationClass(TezBytesWritableSerialization.class.getName(), + * TezBytesComparator.class.getName())) + * </pre> + */ +public class TezBytesWritableSerialization extends Configured implements Serialization<Writable> { + + private static final Log LOG = LogFactory.getLog(TezBytesWritableSerialization.class.getName()); + + @Override + public boolean accept(Class<?> c) { + return (BytesWritable.class.isAssignableFrom(c)); + } + + @Override + public Serializer<Writable> getSerializer(Class<Writable> c) { + return new TezBytesWritableSerializer(); + } + + @Override + public Deserializer<Writable> getDeserializer(Class<Writable> c) { + return new TezBytesWritableDeserializer(getConf(), c); + } + + public static class TezBytesWritableDeserializer extends Configured + implements Deserializer<Writable> { + private Class<?> writableClass; + private DataInputBuffer dataIn; + + public TezBytesWritableDeserializer(Configuration conf, Class<?> c) { + setConf(conf); + this.writableClass = c; + } + + @Override + public void open(InputStream in) { + dataIn = (DataInputBuffer) in; + } + + @Override + public Writable deserialize(Writable w) throws IOException { + BytesWritable writable = (BytesWritable) w; + if (w == null) { + writable = (BytesWritable) ReflectionUtils.newInstance(writableClass, getConf()); + } + + writable.set(dataIn.getData(), dataIn.getPosition(), dataIn.getLength() - dataIn + .getPosition()); + return writable; + } + + @Override + public void close() throws IOException { + dataIn.close(); + } + + } + + public static class TezBytesWritableSerializer extends Configured implements + Serializer<Writable> { + + private OutputStream dataOut; + + @Override + public void open(OutputStream out) { + this.dataOut = out; + } + + @Override + public void serialize(Writable w) throws IOException { + BytesWritable writable = (BytesWritable) w; + dataOut.write(writable.getBytes(), 0, writable.getLength()); + } + + @Override + public void close() throws IOException { + dataOut.close(); + } + } +} + http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java index cd0bc26..0f4dbe2 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -162,6 +163,9 @@ public abstract class ExternalSorter { serializationFactory = new SerializationFactory(this.conf); keySerializer = serializationFactory.getSerializer(keyClass); valSerializer = serializationFactory.getSerializer(valClass); + LOG.info("keySerializer=" + keySerializer + "; valueSerializer=" + valSerializer + + "; comparator=" + (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf) + + "; conf=" + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); // counters mapOutputByteCounter = outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES); http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java index 3b6fea5..cccadd9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java @@ -238,8 +238,10 @@ public class IFile { if (writtenRecordsCounter != null) { writtenRecordsCounter.increment(numRecordsWritten); } - LOG.info("Total keys written=" + numRecordsWritten + "; Savings(optimized due to " + - "multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" + rleWritten); + LOG.info("Total keys written=" + numRecordsWritten + "; rleEnabled=" + rle + "; Savings" + + "(due to multi-kv/rle)=" + totalKeySaving + "; number of RLEs written=" + + rleWritten + "; compressedLen=" + compressedBytesWritten + "; rawLen=" + + decompressedBytesWritten); } /** http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java index de43acf..6960736 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java @@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; -import org.apache.hadoop.io.HashComparator; +import org.apache.tez.runtime.library.common.comparator.HashComparator; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.IndexedSorter; http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java index ec90e6f..97712a3 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfiguration.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -308,6 +309,41 @@ public class OnFileSortedOutputConfiguration { } /** + * Set serialization class and the relevant comparator to be used for sorting. + * Providing custom serialization class could change the way, keys needs to be compared in + * sorting. Providing invalid comparator here could create invalid results. + * + * @param serializationClassName + * @param comparatorClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName, + String comparatorClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + Preconditions.checkArgument(comparatorClassName != null, + "comparator cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + setKeyComparatorClass(comparatorClassName); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** * Create the actual configuration instance. * * @return an instance of the Configuration http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java index 9f10138..17bb3b8 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfiguration.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -200,6 +201,34 @@ public class OnFileUnorderedKVOutputConfiguration { return this; } + /** + * Set serialization class responsible for providing serializer/deserializer for keys. + * + * @param serializationClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + public Builder enableCompression(String compressionCodec) { this.conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, true); if (compressionCodec != null) { http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java index 01b7405..2967501 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfiguration.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -249,6 +250,34 @@ public class OnFileUnorderedPartitionedKVOutputConfiguration { } /** + * Set serialization class responsible for providing serializer/deserializer for keys. + * + * @param serializationClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** * Create the actual configuration instance. * * @return an instance of the Configuration http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java index 78ef7bf..4da5060 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; @@ -155,6 +156,35 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase return this; } + /** + * Set serialization class and the relevant comparator to be used for sorting. + * Providing custom serialization class could change the way, keys needs to be compared in + * sorting. Providing invalid comparator here could create invalid results. + * + * @param serializationClassName + * @param comparatorClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName, + String comparatorClassName) { + outputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName); + inputBuilder.setKeySerializationClass(serializationClassName, comparatorClassName); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + outputBuilder.setValueSerializationClass(serializationClassName); + inputBuilder.setValueSerializationClass(serializationClassName); + return this; + } + + @Override public Builder enableCompression(String compressionCodec) { outputBuilder.enableCompression(compressionCodec); http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java index 1a5a6ed..d6abf03 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfiguration.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -409,6 +410,41 @@ public class ShuffledMergedInputConfiguration { } /** + * Set serialization class and the relevant comparator to be used for sorting. + * Providing custom serialization class could change the way, keys needs to be compared in + * sorting. Providing invalid comparator here could create invalid results. + * + * @param serializationClassName + * @param comparatorClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName, + String comparatorClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + Preconditions.checkArgument(comparatorClassName != null, + "comparator cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + setKeyComparatorClass(comparatorClassName); + return this; + } + + /** + * Serialization class to be used for serializing values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** * Create the actual configuration instance. * * @return an instance of the Configuration http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java index 8452b36..7ff8a58 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledUnorderedKVInputConfiguration.java @@ -30,6 +30,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -302,6 +303,35 @@ public class ShuffledUnorderedKVInputConfiguration { } /** + * Set serialization class responsible for providing serializer/deserializer for key/value and + * the corresponding comparator class to be used as key comparator. + * + * @param serializationClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + Preconditions.checkArgument(serializationClassName != null, + "serializationClassName cannot be null"); + this.conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + return this; + } + + /** * Create the actual configuration instance. * * @return an instance of the Configuration http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java index 5cd7f49..3f985f6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedPartitionedKVEdgeConfigurer.java @@ -178,6 +178,30 @@ public class UnorderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBa } /** + * Set serialization class responsible for providing serializer/deserializer for keys. + * + * @param serializationClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName) { + outputBuilder.setKeySerializationClass(serializationClassName); + inputBuilder.setKeySerializationClass(serializationClassName); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + outputBuilder.setValueSerializationClass(serializationClassName); + inputBuilder.setValueSerializationClass(serializationClassName); + return this; + } + + /** * Configure the specific output * * @return a builder to configure the output http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java index 5c93df3..8adfad9 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/UnorderedUnpartitionedKVEdgeConfigurer.java @@ -26,6 +26,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.InputDescriptor; @@ -192,6 +193,31 @@ public class UnorderedUnpartitionedKVEdgeConfigurer extends HadoopKeyValuesBased } /** + * Set serialization class responsible for providing serializer/deserializer for key/value and + * the corresponding comparator class to be used as key comparator. + * + * @param serializationClassName + * @return + */ + public Builder setKeySerializationClass(String serializationClassName) { + outputBuilder.setKeySerializationClass(serializationClassName); + inputBuilder.setKeySerializationClass(serializationClassName); + return this; + } + + /** + * Set serialization class responsible for providing serializer/deserializer for values. + * + * @param serializationClassName + * @return + */ + public Builder setValueSerializationClass(String serializationClassName) { + outputBuilder.setValueSerializationClass(serializationClassName); + inputBuilder.setValueSerializationClass(serializationClassName); + return this; + } + + /** * Configure the specific output * @return a builder to configure the output */ http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java new file mode 100644 index 0000000..b74789a --- /dev/null +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java @@ -0,0 +1,460 @@ +package org.apache.tez.runtime.library.common; + +import com.google.common.collect.Ordering; +import com.google.common.collect.TreeMultimap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BoundedByteArrayOutputStream; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.util.Progress; +import org.apache.hadoop.util.Progressable; +import org.apache.tez.common.TezRuntimeFrameworkConfigs; +import org.apache.tez.common.counters.GenericCounter; +import org.apache.tez.common.counters.TezCounter; +import org.apache.tez.common.counters.TezCounters; +import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.comparator.TezBytesComparator; +import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization; +import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryReader; +import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter; +import org.apache.tez.runtime.library.common.shuffle.impl.MergeManager; +import org.apache.tez.runtime.library.common.sort.impl.IFile; +import org.apache.tez.runtime.library.common.sort.impl.TezMerger; +import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; + +import static junit.framework.TestCase.assertFalse; +import static junit.framework.TestCase.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +@RunWith(Parameterized.class) +public class TestValuesIterator { + + private static final Log LOG = LogFactory.getLog(TestValuesIterator.class); + + static final String TEZ_BYTES_SERIALIZATION = TezBytesWritableSerialization.class.getName(); + + enum TestWithComparator { + LONG, INT, BYTES, TEZ_BYTES, TEXT, CUSTOM + } + + Configuration conf; + FileSystem fs; + static final Random rnd = new Random(); + + final Class keyClass; + final Class valClass; + final RawComparator comparator; + final RawComparator correctComparator; + final boolean expectedTestResult; + + int mergeFactor; + //For storing original data + final TreeMultimap<Writable, Writable> sortedDataMap; + + TezRawKeyValueIterator rawKeyValueIterator; + + Path baseDir; + Path tmpDir; + Path[] streamPaths; //merge stream paths + + /** + * Constructor + * + * @param serializationClassName serialization class to be used + * @param key key class name + * @param val value class name + * @param comparator to be used + * @param correctComparator (real comparator to be used for correct results) + * @param testResult expected result + * @throws IOException + */ + public TestValuesIterator(String serializationClassName, Class key, Class val, + TestWithComparator comparator, TestWithComparator correctComparator, boolean testResult) + throws IOException { + this.keyClass = key; + this.valClass = val; + this.comparator = getComparator(comparator); + this.correctComparator = + (correctComparator == null) ? this.comparator : getComparator(correctComparator); + this.expectedTestResult = testResult; + sortedDataMap = TreeMultimap.create(this.correctComparator, Ordering.natural()); + setupConf(serializationClassName); + } + + private void setupConf(String serializationClassName) throws IOException { + mergeFactor = Math.max(2, rnd.nextInt(100)); + conf = new Configuration(); + conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, mergeFactor); + if (serializationClassName != null) { + conf.set(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializationClassName + "," + + conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY)); + } + baseDir = new Path(".", this.getClass().getName()); + String localDirs = baseDir.toString(); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs); + fs = FileSystem.getLocal(conf); + } + + @Before + public void setup() throws Exception { + fs.mkdirs(baseDir); + tmpDir = new Path(baseDir, "tmp"); + } + + @After + public void cleanup() throws Exception { + fs.delete(baseDir, true); + sortedDataMap.clear(); + } + + @Test(timeout = 20000) + public void testIteratorWithInMemoryReader() throws IOException { + ValuesIterator iterator = createIterator(true); + testIterator(iterator); + } + + @Test(timeout = 20000) + public void testIteratorWithIFileReader() throws IOException { + ValuesIterator iterator = createIterator(false); + testIterator(iterator); + } + + /** + * Tests whether data in valuesIterator matches with sorted input data set. + * + * @param valuesIterator + * @throws IOException + */ + private void testIterator(ValuesIterator valuesIterator) throws IOException { + Iterator<Writable> oriKeySet = sortedDataMap.keySet().iterator(); + boolean result = true; + while (valuesIterator.moveToNext()) { + Writable key = (Writable) valuesIterator.getKey(); + assertTrue(oriKeySet.hasNext()); + Writable ori = oriKeySet.next(); + if (!key.equals(ori)) { + result = false; + break; + } + for (Object val : valuesIterator.getValues()) { + if (!sortedDataMap.get(key).contains(val)) { + result = false; + break; + } + } + } + if (expectedTestResult) { + assertTrue(result); + } else { + assertFalse(result); + } + } + + /** + * Create sample data (in memory / disk based), merge them and return ValuesIterator + * + * @param inMemory + * @return ValuesIterator + * @throws IOException + */ + private ValuesIterator createIterator(boolean inMemory) throws IOException { + if (!inMemory) { + streamPaths = createFiles(); + //Merge all files to get KeyValueIterator + rawKeyValueIterator = + TezMerger.merge(conf, fs, keyClass, valClass, null, + false, -1, 1024, streamPaths, false, mergeFactor, tmpDir, comparator, + new ProgressReporter(), null, null, null, null); + } else { + List<TezMerger.Segment> segments = createInMemStreams(); + rawKeyValueIterator = + TezMerger.merge(conf, fs, keyClass, valClass, segments, mergeFactor, tmpDir, + comparator, new ProgressReporter(), new GenericCounter("readsCounter", "y"), + new GenericCounter("writesCounter", "y1"), + new GenericCounter("bytesReadCounter", "y2"), new Progress()); + } + return new ValuesIterator(rawKeyValueIterator, comparator, + keyClass, valClass, conf, (TezCounter) new GenericCounter("inputKeyCounter", "y3"), + (TezCounter) new GenericCounter("inputValueCounter", "y4")); + } + + @Parameterized.Parameters(name = "test[{0}, {1}, {2}, {3} {4} {5} {6}]") + public static Collection<Object[]> getParameters() { + Collection<Object[]> parameters = new ArrayList<Object[]>(); + + //parameters for constructor + parameters.add(new Object[] + { null, Text.class, Text.class, TestWithComparator.TEXT, null, true }); + parameters.add(new Object[] + { null, LongWritable.class, Text.class, TestWithComparator.LONG, null, true }); + parameters.add(new Object[] + { null, IntWritable.class, Text.class, TestWithComparator.INT, null, true }); + parameters.add(new Object[] + { null, BytesWritable.class, BytesWritable.class, TestWithComparator.BYTES, null, true }); + parameters.add(new Object[] + { + TEZ_BYTES_SERIALIZATION, BytesWritable.class, BytesWritable.class, + TestWithComparator.TEZ_BYTES, null, true + }); + parameters.add(new Object[] + { + TEZ_BYTES_SERIALIZATION, BytesWritable.class, LongWritable.class, + TestWithComparator.TEZ_BYTES, + null, true + }); + parameters.add(new Object[] + { + TEZ_BYTES_SERIALIZATION, CustomKey.class, LongWritable.class, + TestWithComparator.TEZ_BYTES, + null, true + }); + + //negative tests + parameters.add(new Object[] + { + TEZ_BYTES_SERIALIZATION, BytesWritable.class, BytesWritable.class, + TestWithComparator.BYTES, + TestWithComparator.TEZ_BYTES, false + }); + parameters.add(new Object[] + { + TEZ_BYTES_SERIALIZATION, CustomKey.class, LongWritable.class, TestWithComparator.CUSTOM, + TestWithComparator.TEZ_BYTES, false + }); + return parameters; + } + + private RawComparator getComparator(TestWithComparator comparator) { + switch (comparator) { + case LONG: + return new LongWritable.Comparator(); + case INT: + return new IntWritable.Comparator(); + case BYTES: + return new BytesWritable.Comparator(); + case TEZ_BYTES: + return new TezBytesComparator(); + case TEXT: + return new Text.Comparator(); + case CUSTOM: + return new CustomKey.Comparator(); + default: + return null; + } + } + + private Path[] createFiles() throws IOException { + int numberOfStreams = Math.max(2, rnd.nextInt(10)); + LOG.info("No of streams : " + numberOfStreams); + + Path[] paths = new Path[numberOfStreams]; + for (int i = 0; i < numberOfStreams; i++) { + paths[i] = new Path(baseDir, "ifile_" + i + ".out"); + IFile.Writer writer = + new IFile.Writer(conf, fs, paths[i], keyClass, valClass, null, + null, null); + Map<Writable, Writable> data = createData(); + //write data + for (Map.Entry<Writable, Writable> entry : data.entrySet()) { + writer.append(entry.getKey(), entry.getValue()); + } + LOG.info("Wrote " + data.size() + " in " + paths[i]); + data.clear(); + writer.close(); + } + return paths; + } + + /** + * create inmemory segments + * + * @return + * @throws IOException + */ + public List<TezMerger.Segment> createInMemStreams() throws IOException { + int numberOfStreams = Math.max(2, rnd.nextInt(5)); + LOG.info("No of streams : " + numberOfStreams); + + SerializationFactory serializationFactory = new SerializationFactory(conf); + Serializer keySerializer = serializationFactory.getSerializer(keyClass); + Serializer valueSerializer = serializationFactory.getSerializer(valClass); + + LocalDirAllocator localDirAllocator = + new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + TezInputContext context = createTezInputContext(); + MergeManager mergeManager = new MergeManager(conf, fs, localDirAllocator, + context, null, null, null, null, null, 1024 * 1024 * 10, null, false, -1); + + DataOutputBuffer keyBuf = new DataOutputBuffer(); + DataOutputBuffer valBuf = new DataOutputBuffer(); + DataInputBuffer keyIn = new DataInputBuffer(); + DataInputBuffer valIn = new DataInputBuffer(); + keySerializer.open(keyBuf); + valueSerializer.open(valBuf); + + List<TezMerger.Segment> segments = new LinkedList<TezMerger.Segment>(); + for (int i = 0; i < numberOfStreams; i++) { + BoundedByteArrayOutputStream bout = new BoundedByteArrayOutputStream(1024 * 1024); + InMemoryWriter writer = + new InMemoryWriter(bout); + Map<Writable, Writable> data = createData(); + //write data + for (Map.Entry<Writable, Writable> entry : data.entrySet()) { + keySerializer.serialize(entry.getKey()); + valueSerializer.serialize(entry.getValue()); + keyIn.reset(keyBuf.getData(), 0, keyBuf.getLength()); + valIn.reset(valBuf.getData(), 0, valBuf.getLength()); + writer.append(keyIn, valIn); + keyBuf.reset(); + valBuf.reset(); + keyIn.reset(); + valIn.reset(); + } + IFile.Reader reader = new InMemoryReader(mergeManager, null, bout.getBuffer(), 0, + bout.getBuffer().length); + segments.add(new TezMerger.Segment(reader, true)); + + data.clear(); + writer.close(); + } + return segments; + } + + private TezInputContext createTezInputContext() { + TezCounters counters = new TezCounters(); + TezInputContext inputContext = mock(TezInputContext.class); + doReturn(1024 * 1024 * 100l).when(inputContext).getTotalMemoryAvailableToTask(); + doReturn(counters).when(inputContext).getCounters(); + doReturn(1).when(inputContext).getInputIndex(); + doReturn("srcVertex").when(inputContext).getSourceVertexName(); + doReturn(1).when(inputContext).getTaskVertexIndex(); + doReturn(new byte[1024]).when(inputContext).getUserPayload(); + return inputContext; + } + + private Map<Writable, Writable> createData() { + Map<Writable, Writable> map = new TreeMap<Writable, Writable>(comparator); + for (int j = 0; j < Math.max(10, rnd.nextInt(50)); j++) { + Writable key = createData(keyClass); + Writable value = createData(valClass); + map.put(key, value); + sortedDataMap.put(key, value); + } + return map; + } + + private Writable createData(Class c) { + if (c.getName().equalsIgnoreCase(BytesWritable.class.getName())) { + return new BytesWritable(new BigInteger(256, rnd).toString().getBytes()); + } else if (c.getName().equalsIgnoreCase(IntWritable.class.getName())) { + return new IntWritable(rnd.nextInt()); + } else if (c.getName().equalsIgnoreCase(LongWritable.class.getName())) { + return new LongWritable(rnd.nextLong()); + } else if (c.getName().equalsIgnoreCase(CustomKey.class.getName())) { + String rndStr = new BigInteger(256, rnd).toString() + "_" + new BigInteger(256, + rnd).toString(); + return new CustomKey(rndStr.getBytes(), rndStr.hashCode()); + } else if (c.getName().equalsIgnoreCase(Text.class.getName())) { + String rndStr = new BigInteger(256, rnd).toString() + "_" + + new BigInteger(256, rnd).toString(); + return new Text(rndStr); + } else { + throw new IllegalArgumentException("Illegal argument : " + c.getName()); + } + } + + private static class ProgressReporter implements Progressable { + @Override public void progress() { + //no impl + } + } + + //Custom key and comparator + public static class CustomKey extends BytesWritable { + private static final int LENGTH_BYTES = 4; + private int hashCode; + + public CustomKey() { + } + + public CustomKey(byte[] data, int hashCode) { + super(data); + this.hashCode = hashCode; + } + + @Override + public int hashCode() { + return hashCode; + } + + public static class Comparator extends WritableComparator { + public Comparator() { + super(CustomKey.class); + } + + /** + * Compare the buffers in serialized form. + */ + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + return compareBytes(b1, s1 + LENGTH_BYTES, l1 - LENGTH_BYTES, b2, s2 + + LENGTH_BYTES, l2 - LENGTH_BYTES); + } + } + + static { + WritableComparator.define(CustomKey.class, new Comparator()); + } + } +} http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java index 262d778..90b725d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOnFileUnorderedPartitionedKVOutput.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Test; @@ -122,7 +123,9 @@ public class TestOnFileUnorderedPartitionedKVOutput { public void testDefaultConfigsUsed() { OnFileUnorderedPartitionedKVOutputConfiguration.Builder builder = OnFileUnorderedPartitionedKVOutputConfiguration - .newBuilder("KEY", "VALUE", "PARTITIONER", null); + .newBuilder("KEY", "VALUE", "PARTITIONER", null) + .setKeySerializationClass("SerClass1") + .setValueSerializationClass("SerClass2"); OnFileUnorderedPartitionedKVOutputConfiguration configuration = builder.build(); byte[] confBytes = configuration.toByteArray(); @@ -143,6 +146,10 @@ public class TestOnFileUnorderedPartitionedKVOutput { assertEquals("KEY", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, "")); assertEquals("VALUE", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, "")); assertEquals("PARTITIONER", conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, "")); + assertTrue(conf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith("SerClass2," + + "SerClass1")); + //for unordered paritioned kv output, comparator is not populated + assertNull(conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS)); } @Test http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java index eaef16f..09a006a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestOrderedPartitionedKVEdgeConfigurer.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Test; @@ -246,4 +247,56 @@ public class TestOrderedPartitionedKVEdgeConfigurer { } + @Test + public void testSerialization() { + OrderedPartitionedKVEdgeConfigurer.Builder builder = OrderedPartitionedKVEdgeConfigurer + .newBuilder("KEY", "VALUE", "PARTITIONER", null) + .enableCompression("CustomCodec") + .setKeySerializationClass("serClass1", "SomeComparator1") + .setValueSerializationClass("serClass2"); + + OrderedPartitionedKVEdgeConfigurer configuration = builder.build(); + + byte[] outputBytes = configuration.getOutputPayload(); + byte[] inputBytes = configuration.getInputPayload(); + + OnFileSortedOutputConfiguration rebuiltOutput = new OnFileSortedOutputConfiguration(); + rebuiltOutput.fromByteArray(outputBytes); + ShuffledMergedInputConfiguration rebuiltInput = new ShuffledMergedInputConfiguration(); + rebuiltInput.fromByteArray(inputBytes); + + Configuration outputConf = rebuiltOutput.conf; + Configuration inputConf = rebuiltInput.conf; + + assertEquals("KEY", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, "")); + assertEquals("VALUE", + outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, "")); + assertEquals("PARTITIONER", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, "")); + assertEquals("CustomCodec", + outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertEquals(true, + outputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, + false)); + assertNull(outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT)); + //verify comparator and serialization class + assertEquals("SomeComparator1", + outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS)); + assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).trim().startsWith + ("serClass2,serClass1")); + + + //verify comparator and serialization class + assertEquals("SomeComparator1", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS)); + assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).trim().startsWith + ("serClass2,serClass1")); + + assertEquals("KEY", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, "")); + assertEquals("VALUE", + inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, "")); + assertEquals("CustomCodec", + inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertEquals(true, + inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, + false)); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java index 758434c..b006c4d 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedPartitionedKVEdgeConfigurer.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Test; @@ -62,6 +63,8 @@ public class TestUnorderedPartitionedKVEdgeConfigurer { public void testDefaultConfigsUsed() { UnorderedPartitionedKVEdgeConfigurer.Builder builder = UnorderedPartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE", "PARTITIONER", null); + builder.setKeySerializationClass("SerClass1"); + builder.setValueSerializationClass("SerClass2"); UnorderedPartitionedKVEdgeConfigurer configuration = builder.build(); @@ -80,12 +83,16 @@ public class TestUnorderedPartitionedKVEdgeConfigurer { TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); assertEquals("TestCodec", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith + ("SerClass2,SerClass1")); Configuration inputConf = rebuiltInput.conf; assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); assertEquals("TestCodec", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith + ("SerClass2,SerClass1")); } @Test http://git-wip-us.apache.org/repos/asf/tez/blob/ff7081e0/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java index c86542f..01008f4 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/conf/TestUnorderedUnpartitionedKVEdgeConfigurer.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.junit.Test; @@ -55,6 +56,7 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer { public void testDefaultConfigsUsed() { UnorderedUnpartitionedKVEdgeConfigurer.Builder builder = UnorderedUnpartitionedKVEdgeConfigurer.newBuilder("KEY", "VALUE"); + builder.setKeySerializationClass("SerClass1").setValueSerializationClass("SerClass2"); UnorderedUnpartitionedKVEdgeConfigurer configuration = builder.build(); @@ -73,12 +75,16 @@ public class TestUnorderedUnpartitionedKVEdgeConfigurer { TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); assertEquals("TestCodec", outputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertTrue(outputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith + ("SerClass2,SerClass1")); Configuration inputConf = rebuiltInput.conf; assertEquals(true, inputConf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_DEFAULT)); assertEquals("TestCodec", inputConf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS_CODEC, "")); + assertTrue(inputConf.get(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY).startsWith + ("SerClass2,SerClass1")); } @Test
