http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java new file mode 100644 index 0000000..eb15164 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/InputBuffer.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class InputBuffer { + + private ByteBuffer byteBuffer; + private final BufferType type; + + public InputBuffer(BufferType type, int inputSize) throws IOException { + + final int capacity = inputSize; + this.type = type; + + if (capacity > 0) { + + switch (type) { + case DIRECT_BUFFER: + this.byteBuffer = DirectBufferPool.getInstance().borrowBuffer(capacity); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + break; + case HEAP_BUFFER: + this.byteBuffer = ByteBuffer.allocate(capacity); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + break; + } + byteBuffer.position(0); + byteBuffer.limit(0); + } + } + + public BufferType getType() { + return this.type; + } + + public InputBuffer(byte[] bytes) { + this.type = BufferType.HEAP_BUFFER; + if (bytes.length > 0) { + this.byteBuffer = ByteBuffer.wrap(bytes); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + byteBuffer.position(0); + byteBuffer.limit(0); + } + } + + public ByteBuffer getByteBuffer() { + return this.byteBuffer; + } + + public int length() { + if (null == byteBuffer) { + return 0; + } + return byteBuffer.limit(); + } + + public void rewind(int startOffset, int length) { + if (null == byteBuffer) { + return; + } + byteBuffer.position(startOffset); + byteBuffer.limit(length); + } + + public int remaining() { + if (null == byteBuffer) { + return 0; + } + return byteBuffer.remaining(); + } + + public int position() { + if (null == byteBuffer) { + return 0; + } + return byteBuffer.position(); + } + + public int position(int pos) { + if (null == byteBuffer) { + return 0; + } + + byteBuffer.position(pos); + return pos; + } + + public int capacity() { + if (null == byteBuffer) { + return 0; + } + return byteBuffer.capacity(); + } + + public byte[] array() { + if (null == byteBuffer) { + return null; + } + return byteBuffer.array(); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java new file mode 100644 index 0000000..3c54948 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/buffer/OutputBuffer.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.buffer; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class OutputBuffer { + protected ByteBuffer byteBuffer; + private final BufferType type; + + public OutputBuffer(BufferType type, int outputBufferCapacity) { + + this.type = type; + if (outputBufferCapacity > 0) { + switch (type) { + case DIRECT_BUFFER: + this.byteBuffer = ByteBuffer.allocateDirect(outputBufferCapacity); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + break; + case HEAP_BUFFER: + this.byteBuffer = ByteBuffer.allocate(outputBufferCapacity); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + break; + } + } + } + + public OutputBuffer(byte[] bytes) { + this.type = BufferType.HEAP_BUFFER; + final int outputBufferCapacity = bytes.length; + if (outputBufferCapacity > 0) { + this.byteBuffer = ByteBuffer.wrap(bytes); + this.byteBuffer.order(ByteOrder.BIG_ENDIAN); + this.byteBuffer.position(0); + } + } + + public BufferType getType() { + return this.type; + } + + public ByteBuffer getByteBuffer() { + return this.byteBuffer; + } + + public int length() { + return byteBuffer.position(); + } + + public void rewind() { + byteBuffer.position(0); + } + + public int limit() { + return byteBuffer.limit(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java new file mode 100644 index 0000000..50d7816 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPullee.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; + +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.NativeDataTarget; +import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter; +import org.apache.hadoop.mapred.nativetask.buffer.OutputBuffer; +import org.apache.hadoop.mapred.nativetask.serde.KVSerializer; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; + +/** + * load data into a buffer signaled by a {@link BufferPuller} + */ +public class BufferPullee<IK, IV> implements IDataLoader { + + public static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH; + + private final SizedWritable<IK> tmpInputKey; + private final SizedWritable<IV> tmpInputValue; + private boolean inputKVBufferd = false; + private RawKeyValueIterator rIter; + private ByteBufferDataWriter nativeWriter; + protected KVSerializer<IK, IV> serializer; + private final OutputBuffer outputBuffer; + private final NativeDataTarget target; + private boolean closed = false; + + public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target) + throws IOException { + this.rIter = rIter; + tmpInputKey = new SizedWritable<IK>(iKClass); + tmpInputValue = new SizedWritable<IV>(iVClass); + + if (null != iKClass && null != iVClass) { + this.serializer = new KVSerializer<IK, IV>(iKClass, iVClass); + } + this.outputBuffer = target.getOutputBuffer(); + this.target = target; + } + + @Override + public int load() throws IOException { + if (closed) { + return 0; + } + + if (null == outputBuffer) { + throw new IOException("output buffer not set"); + } + + this.nativeWriter = new ByteBufferDataWriter(target); + outputBuffer.rewind(); + + int written = 0; + boolean firstKV = true; + + if (inputKVBufferd) { + written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue); + inputKVBufferd = false; + firstKV = false; + } + + while (rIter.next()) { + inputKVBufferd = false; + tmpInputKey.readFields(rIter.getKey()); + tmpInputValue.readFields(rIter.getValue()); + serializer.updateLength(tmpInputKey, tmpInputValue); + + final int kvSize = tmpInputKey.length + tmpInputValue.length + KV_HEADER_LENGTH; + + if (!firstKV && nativeWriter.shortOfSpace(kvSize)) { + inputKVBufferd = true; + break; + } else { + written += serializer.serializeKV(nativeWriter, tmpInputKey, tmpInputValue); + firstKV = false; + } + } + + if (nativeWriter.hasUnFlushedData()) { + nativeWriter.flush(); + } + return written; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (null != rIter) { + rIter.close(); + } + if (null != nativeWriter) { + nativeWriter.close(); + } + closed = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java new file mode 100644 index 0000000..704b664 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPuller.java @@ -0,0 +1,187 @@ +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.DataReceiver; +import org.apache.hadoop.mapred.nativetask.NativeDataSource; +import org.apache.hadoop.mapred.nativetask.buffer.BufferType; +import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader; +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.apache.hadoop.util.Progress; + +/** + * actively signal a {@link BufferPullee} to load data into buffer and receive + */ +public class BufferPuller implements RawKeyValueIterator, DataReceiver { + + private static Log LOG = LogFactory.getLog(BufferPuller.class); + + public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH; + + byte[] keyBytes = new byte[0]; + byte[] valueBytes = new byte[0]; + + private InputBuffer inputBuffer; + private InputBuffer asideBuffer; + + int remain = 0; + + private ByteBufferDataReader nativeReader; + + DataInputBuffer keyBuffer = new DataInputBuffer(); + DataInputBuffer valueBuffer = new DataInputBuffer(); + + private boolean noMoreData = false; + + private NativeDataSource input; + private boolean closed = false; + + public BufferPuller(NativeDataSource handler) throws IOException { + this.input = handler; + this.inputBuffer = handler.getInputBuffer(); + nativeReader = new ByteBufferDataReader(null); + this.asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, inputBuffer.capacity()); + } + + @Override + public DataInputBuffer getKey() throws IOException { + return keyBuffer; + } + + @Override + public DataInputBuffer getValue() throws IOException { + return valueBuffer; + } + + public void reset() { + noMoreData = false; + } + + @Override + public boolean next() throws IOException { + if (closed) { + return false; + } + + if (noMoreData) { + return false; + } + final int asideRemain = asideBuffer.remaining(); + final int inputRemain = inputBuffer.remaining(); + + if (asideRemain == 0 && inputRemain == 0) { + input.loadData(); + } + + if (asideBuffer.remaining() > 0) { + return nextKeyValue(asideBuffer); + } else if (inputBuffer.remaining() > 0) { + return nextKeyValue(inputBuffer); + } else { + noMoreData = true; + return false; + } + } + + private boolean nextKeyValue(InputBuffer buffer) throws IOException { + if (closed) { + return false; + } + + nativeReader.reset(buffer); + + final int keyLength = nativeReader.readInt(); + if (keyBytes.length < keyLength) { + keyBytes = new byte[keyLength]; + } + + final int valueLength = nativeReader.readInt(); + if (valueBytes.length < valueLength) { + valueBytes = new byte[valueLength]; + } + + nativeReader.read(keyBytes, 0, keyLength); + nativeReader.read(valueBytes, 0, valueLength); + + keyBuffer.reset(keyBytes, keyLength); + valueBuffer.reset(valueBytes, valueLength); + + return true; + } + + @Override + public boolean receiveData() throws IOException { + if (closed) { + return false; + } + + final ByteBuffer input = inputBuffer.getByteBuffer(); + + if (null != asideBuffer && asideBuffer.length() > 0) { + if (asideBuffer.remaining() > 0) { + final byte[] output = asideBuffer.getByteBuffer().array(); + final int write = Math.min(asideBuffer.remaining(), input.remaining()); + input.get(output, asideBuffer.position(), write); + asideBuffer.position(asideBuffer.position() + write); + } + + if (asideBuffer.remaining() == 0) { + asideBuffer.position(0); + } + } + + if (input.remaining() == 0) { + return true; + } + + if (input.remaining() < KV_HEADER_LENGTH) { + throw new IOException("incomplete data, input length is: " + input.remaining()); + } + final int position = input.position(); + final int keyLength = input.getInt(); + final int valueLength = input.getInt(); + input.position(position); + final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH; + final int remaining = input.remaining(); + + if (kvLength > remaining) { + if (null == asideBuffer || asideBuffer.capacity() < kvLength) { + asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength); + } + asideBuffer.rewind(0, kvLength); + + input.get(asideBuffer.array(), 0, remaining); + asideBuffer.position(remaining); + } + return true; + } + + @Override + public Progress getProgress() { + return null; + } + + /** + * Closes the iterator so that the underlying streams can be closed. + * + * @throws IOException + */ + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (null != nativeReader) { + nativeReader.close(); + } + closed = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java new file mode 100644 index 0000000..8decad8 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPushee.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.buffer.BufferType; +import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataReader; +import org.apache.hadoop.mapred.nativetask.buffer.InputBuffer; +import org.apache.hadoop.mapred.nativetask.serde.KVSerializer; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; + +/** + * collect data when signaled + */ +public class BufferPushee<OK, OV> implements Closeable { + + private static Log LOG = LogFactory.getLog(BufferPushee.class); + + public final static int KV_HEADER_LENGTH = Constants.SIZEOF_KV_LENGTH; + + private InputBuffer asideBuffer; + private final SizedWritable<OK> tmpOutputKey; + private final SizedWritable<OV> tmpOutputValue; + private RecordWriter<OK, OV> writer; + private ByteBufferDataReader nativeReader; + + private KVSerializer<OK, OV> deserializer; + private boolean closed = false; + + public BufferPushee(Class<OK> oKClass, Class<OV> oVClass, RecordWriter<OK, OV> writer) throws IOException { + tmpOutputKey = new SizedWritable<OK>(oKClass); + tmpOutputValue = new SizedWritable<OV>(oVClass); + + this.writer = writer; + + if (null != oKClass && null != oVClass) { + this.deserializer = new KVSerializer<OK, OV>(oKClass, oVClass); + } + this.nativeReader = new ByteBufferDataReader(null); + } + + public boolean collect(InputBuffer buffer) throws IOException { + if (closed) { + return false; + } + + final ByteBuffer input = buffer.getByteBuffer(); + if (null != asideBuffer && asideBuffer.length() > 0) { + if (asideBuffer.remaining() > 0) { + final byte[] output = asideBuffer.getByteBuffer().array(); + final int write = Math.min(asideBuffer.remaining(), input.remaining()); + input.get(output, asideBuffer.position(), write); + asideBuffer.position(asideBuffer.position() + write); + } + + if (asideBuffer.remaining() == 0 && asideBuffer.position() > 0) { + asideBuffer.position(0); + write(asideBuffer); + asideBuffer.rewind(0, 0); + } + } + + if (input.remaining() == 0) { + return true; + } + + if (input.remaining() < KV_HEADER_LENGTH) { + throw new IOException("incomplete data, input length is: " + input.remaining()); + } + final int position = input.position(); + final int keyLength = input.getInt(); + final int valueLength = input.getInt(); + input.position(position); + final int kvLength = keyLength + valueLength + KV_HEADER_LENGTH; + final int remaining = input.remaining(); + + if (kvLength > remaining) { + if (null == asideBuffer || asideBuffer.capacity() < kvLength) { + asideBuffer = new InputBuffer(BufferType.HEAP_BUFFER, kvLength); + } + asideBuffer.rewind(0, kvLength); + + input.get(asideBuffer.array(), 0, remaining); + asideBuffer.position(remaining); + } else { + write(buffer); + } + return true; + } + + @SuppressWarnings("unchecked") + private boolean write(InputBuffer input) throws IOException { + if (closed) { + return false; + } + int totalRead = 0; + final int remain = input.remaining(); + this.nativeReader.reset(input); + while (remain > totalRead) { + final int read = deserializer.deserializeKV(nativeReader, tmpOutputKey, tmpOutputValue); + if (read != 0) { + totalRead += read; + writer.write((OK) (tmpOutputKey.v), (OV) (tmpOutputValue.v)); + } + } + if (remain != totalRead) { + throw new IOException("We expect to read " + remain + ", but we actually read: " + totalRead); + } + return true; + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + if (null != writer) { + writer.close(null); + } + if (null != nativeReader) { + nativeReader.close(); + } + closed = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java new file mode 100644 index 0000000..3713078 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/BufferPusher.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.nativetask.NativeDataTarget; +import org.apache.hadoop.mapred.nativetask.buffer.ByteBufferDataWriter; +import org.apache.hadoop.mapred.nativetask.serde.IKVSerializer; +import org.apache.hadoop.mapred.nativetask.serde.KVSerializer; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; + +/** + * actively push data into a buffer and signal a {@link BufferPushee} to collect it + */ +public class BufferPusher<K, V> implements OutputCollector<K, V> { + + private static Log LOG = LogFactory.getLog(BufferPusher.class); + + private final SizedWritable<K> tmpInputKey; + private final SizedWritable<V> tmpInputValue; + private ByteBufferDataWriter out; + IKVSerializer serializer; + private boolean closed = false; + + public BufferPusher(Class<K> iKClass, Class<V> iVClass, NativeDataTarget target) throws IOException { + tmpInputKey = new SizedWritable<K>(iKClass); + tmpInputValue = new SizedWritable<V>(iVClass); + + if (null != iKClass && null != iVClass) { + this.serializer = new KVSerializer<K, V>(iKClass, iVClass); + } + this.out = new ByteBufferDataWriter(target); + } + + public void collect(K key, V value, int partition) throws IOException { + tmpInputKey.reset(key); + tmpInputValue.reset(value); + serializer.serializePartitionKV(out, partition, tmpInputKey, tmpInputValue); + }; + + @Override + public void collect(K key, V value) throws IOException { + if (closed) { + return; + } + tmpInputKey.reset(key); + tmpInputValue.reset(value); + serializer.serializeKV(out, tmpInputKey, tmpInputValue); + }; + + public void flush() throws IOException { + if (null != out) { + if (out.hasUnFlushedData()) { + out.flush(); + } + } + } + + public void close() throws IOException { + if (closed) { + return; + } + if (null != out) { + out.close(); + } + closed = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java new file mode 100644 index 0000000..6a57683 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/CombinerHandler.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.handlers; + +import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Task.CombinerRunner; +import org.apache.hadoop.mapred.nativetask.Command; +import org.apache.hadoop.mapred.nativetask.CommandDispatcher; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.DataChannel; +import org.apache.hadoop.mapred.nativetask.ICombineHandler; +import org.apache.hadoop.mapred.nativetask.INativeHandler; +import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor; +import org.apache.hadoop.mapred.nativetask.TaskContext; +import org.apache.hadoop.mapred.nativetask.serde.SerializationFramework; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; +import org.apache.hadoop.mapreduce.MRJobConfig; + +public class CombinerHandler<K, V> implements ICombineHandler, CommandDispatcher { + + public static String NAME = "NativeTask.CombineHandler"; + private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class); + public static Command LOAD = new Command(1, "Load"); + public static Command COMBINE = new Command(4, "Combine"); + public final CombinerRunner<K, V> combinerRunner; + + private final INativeHandler nativeHandler; + private final BufferPuller puller; + private final BufferPusher<K, V> kvPusher; + private boolean closed = false; + + public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException { + final JobConf conf = new JobConf(context.getConf()); + conf.set(Constants.SERIALIZATION_FRAMEWORK, + String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType())); + String combinerClazz = conf.get(Constants.MAPRED_COMBINER_CLASS); + if (null == combinerClazz) { + combinerClazz = conf.get(MRJobConfig.COMBINE_CLASS_ATTR); + } + + if (null == combinerClazz) { + return null; + } else { + LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz); + } + + final Counter combineInputCounter = context.getTaskReporter().getCounter(COMBINE_INPUT_RECORDS); + + final CombinerRunner<K, V> combinerRunner = CombinerRunner.create(conf, context.getTaskAttemptId(), + combineInputCounter, context.getTaskReporter(), null); + + final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, conf, DataChannel.INOUT); + final BufferPusher<K, V> pusher = new BufferPusher<K, V>(context.getInputKeyClass(), context.getInputValueClass(), + nativeHandler); + final BufferPuller puller = new BufferPuller(nativeHandler); + return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher); + } + + public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller, + BufferPusher<K, V> kvPusher) throws IOException { + this.nativeHandler = nativeHandler; + this.combinerRunner = combiner; + this.puller = puller; + this.kvPusher = kvPusher; + nativeHandler.setCommandDispatcher(this); + nativeHandler.setDataReceiver(puller); + } + + @Override + public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException { + if (null == command) { + return null; + } + if (command.equals(COMBINE)) { + combine(); + } + return null; + + } + + @Override + public void combine() throws IOException{ + try { + puller.reset(); + combinerRunner.combine(puller, kvPusher); + kvPusher.flush(); + return; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public long getId() { + return nativeHandler.getNativeHandler(); + } + + @Override + public void close() throws IOException { + + if (closed) { + return; + } + + if (null != puller) { + puller.close(); + } + + if (null != kvPusher) { + kvPusher.close(); + } + + if (null != nativeHandler) { + nativeHandler.close(); + } + closed = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java new file mode 100644 index 0000000..ff472a6 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/IDataLoader.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.IOException; + +/** + * an IDataLoader loads data on demand + */ +public interface IDataLoader { + + /** + * @return size of data loaded + * @throws IOException + */ + public int load() throws IOException; + + public void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java new file mode 100644 index 0000000..678e13d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/handlers/NativeCollectorOnlyHandler.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.handlers; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapred.nativetask.Command; +import org.apache.hadoop.mapred.nativetask.CommandDispatcher; +import org.apache.hadoop.mapred.nativetask.DataChannel; +import org.apache.hadoop.mapred.nativetask.ICombineHandler; +import org.apache.hadoop.mapred.nativetask.INativeHandler; +import org.apache.hadoop.mapred.nativetask.NativeBatchProcessor; +import org.apache.hadoop.mapred.nativetask.TaskContext; +import org.apache.hadoop.mapred.nativetask.util.NativeTaskOutput; +import org.apache.hadoop.mapred.nativetask.util.OutputUtil; +import org.apache.hadoop.mapred.nativetask.util.ReadWriteBuffer; + +/** + * Java Record Reader + Java Mapper + Native Collector + */ +@SuppressWarnings("unchecked") +public class NativeCollectorOnlyHandler<K, V> implements CommandDispatcher, Closeable { + + public static String NAME = "NativeTask.MCollectorOutputHandler"; + private static Log LOG = LogFactory.getLog(NativeCollectorOnlyHandler.class); + public static Command GET_OUTPUT_PATH = new Command(100, "GET_OUTPUT_PATH"); + public static Command GET_OUTPUT_INDEX_PATH = new Command(101, "GET_OUTPUT_INDEX_PATH"); + public static Command GET_SPILL_PATH = new Command(102, "GET_SPILL_PATH"); + public static Command GET_COMBINE_HANDLER = new Command(103, "GET_COMBINE_HANDLER"); + + private NativeTaskOutput output; + private int spillNumber = 0; + private ICombineHandler combinerHandler = null; + private final BufferPusher<K, V> kvPusher; + private final INativeHandler nativeHandler; + private boolean closed = false; + + public static <K, V> NativeCollectorOnlyHandler<K, V> create(TaskContext context) throws IOException { + + + ICombineHandler combinerHandler = null; + try { + final TaskContext combineContext = context.copyOf(); + combineContext.setInputKeyClass(context.getOuputKeyClass()); + combineContext.setInputValueClass(context.getOutputValueClass()); + + combinerHandler = CombinerHandler.create(combineContext); + } catch (final ClassNotFoundException e) { + throw new IOException(e); + } + + if (null != combinerHandler) { + LOG.info("[NativeCollectorOnlyHandler] combiner is not null"); + } + + final INativeHandler nativeHandler = NativeBatchProcessor.create(NAME, context.getConf(), DataChannel.OUT); + final BufferPusher<K, V> kvPusher = new BufferPusher<K, V>(context.getOuputKeyClass(), context.getOutputValueClass(), + nativeHandler); + + return new NativeCollectorOnlyHandler<K, V>(context, nativeHandler, kvPusher, combinerHandler); + } + + protected NativeCollectorOnlyHandler(TaskContext context, INativeHandler nativeHandler, + BufferPusher<K, V> kvPusher, ICombineHandler combiner) throws IOException { + Configuration conf = context.getConf(); + TaskAttemptID id = context.getTaskAttemptId(); + if (null == id) { + this.output = OutputUtil.createNativeTaskOutput(conf, ""); + } else { + this.output = OutputUtil.createNativeTaskOutput(context.getConf(), context.getTaskAttemptId() + .toString()); + } + this.combinerHandler = combiner; + this.kvPusher = kvPusher; + this.nativeHandler = nativeHandler; + nativeHandler.setCommandDispatcher(this); + } + + public void collect(K key, V value, int partition) throws IOException { + kvPusher.collect(key, value, partition); + }; + + public void flush() throws IOException { + } + + @Override + public void close() throws IOException { + if (closed) { + return; + } + + if (null != kvPusher) { + kvPusher.close(); + } + + if (null != combinerHandler) { + combinerHandler.close(); + } + + if (null != nativeHandler) { + nativeHandler.close(); + } + closed = true; + } + + @Override + public ReadWriteBuffer onCall(Command command, ReadWriteBuffer parameter) throws IOException { + Path p = null; + if (null == command) { + return null; + } + + if (command.equals(GET_OUTPUT_PATH)) { + p = output.getOutputFileForWrite(-1); + } else if (command.equals(GET_OUTPUT_INDEX_PATH)) { + p = output.getOutputIndexFileForWrite(-1); + } else if (command.equals(GET_SPILL_PATH)) { + p = output.getSpillFileForWrite(spillNumber++, -1); + + } else if (command.equals(GET_COMBINE_HANDLER)) { + if (null == combinerHandler) { + return null; + } + final ReadWriteBuffer result = new ReadWriteBuffer(8); + + result.writeLong(combinerHandler.getId()); + return result; + } else { + throw new IOException("Illegal command: " + command.toString()); + } + if (p != null) { + final ReadWriteBuffer result = new ReadWriteBuffer(); + result.writeString(p.toUri().getPath()); + return result; + } else { + throw new IOException("MapOutputFile can't allocate spill/output file"); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java new file mode 100644 index 0000000..9a026be --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BoolWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class BoolWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java new file mode 100644 index 0000000..1ec2fdb --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/ByteWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class ByteWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 1; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java new file mode 100644 index 0000000..2bd18d7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/BytesWritableSerializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class BytesWritableSerializer implements INativeComparable, INativeSerializer<BytesWritable> { + + @Override + public int getLength(BytesWritable w) throws IOException { + return w.getLength(); + } + + @Override + public void serialize(BytesWritable w, DataOutput out) throws IOException { + out.write(w.getBytes(), 0, w.getLength()); + } + + @Override + public void deserialize(DataInput in, int length, BytesWritable w) throws IOException { + w.setSize(length); + in.readFully(w.getBytes(), 0, length); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java new file mode 100644 index 0000000..d4fc7e0 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DefaultSerializer.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class DefaultSerializer implements INativeSerializer<Writable> { + + static class ModifiedByteArrayOutputStream extends ByteArrayOutputStream { + + public byte[] getBuffer() { + return this.buf; + } + } + + private final ModifiedByteArrayOutputStream outBuffer = new ModifiedByteArrayOutputStream(); + private final DataOutputStream outData = new DataOutputStream(outBuffer); + private Writable buffered = null; + private int bufferedLength = -1; + + @Override + public int getLength(Writable w) throws IOException { + // if (w == buffered) { + // return bufferedLength; + // } + buffered = null; + bufferedLength = -1; + + outBuffer.reset(); + w.write(outData); + bufferedLength = outBuffer.size(); + buffered = w; + return bufferedLength; + } + + @Override + public void serialize(Writable w, DataOutput out) throws IOException { + w.write(out); + } + + @Override + public void deserialize(DataInput in, int length, Writable w) throws IOException { + w.readFields(in); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java new file mode 100644 index 0000000..8de0fba --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/DoubleWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class DoubleWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 8; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java new file mode 100644 index 0000000..4a2366c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/FloatWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class FloatWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 4; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java new file mode 100644 index 0000000..64c5810 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IKVSerializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream; +import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; + +/** + * serializes key-value pair + */ +public interface IKVSerializer { + + /** + * update the length field of SizedWritable + * @param key + * @param value + * @throws IOException + */ + public void updateLength(SizedWritable key, SizedWritable value) throws IOException; + + /** + * + * @param out + * @param key + * @param value + * @return bytes written + * @throws IOException + */ + public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException; + + /** + * serialize partitionId as well + * @param out + * @param partitionId + * @param key + * @param value + * @return + * @throws IOException + */ + public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value) + throws IOException; + + /** + * + * @param in + * @param key + * @param value + * @return bytes read + * @throws IOException + */ + public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java new file mode 100644 index 0000000..f61d12d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/INativeSerializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * an INativeSerializer serializes and deserializes data transferred between + * Java and native. {@link DefaultSerializer} provides default implementations. + * + * Note: if you implemented your customized NativeSerializer instead of DefaultSerializer, + * you have to make sure the native side can serialize it correctly. + * + */ +public interface INativeSerializer<T> { + + /** + * get length of data to be serialized. If the data length is already known (like IntWritable) + * and could immediately be returned from this method, it is good chance to implement customized + * NativeSerializer for efficiency + */ + public int getLength(T w) throws IOException; + + public void serialize(T w, DataOutput out) throws IOException; + + public void deserialize(DataInput in, int length, T w) throws IOException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java new file mode 100644 index 0000000..e7e19a9 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/IntWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class IntWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 4; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java new file mode 100644 index 0000000..4b76df4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/KVSerializer.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.Constants; +import org.apache.hadoop.mapred.nativetask.buffer.DataInputStream; +import org.apache.hadoop.mapred.nativetask.buffer.DataOutputStream; +import org.apache.hadoop.mapred.nativetask.util.SizedWritable; + +public class KVSerializer<K, V> implements IKVSerializer { + + + private static final Log LOG = LogFactory.getLog(KVSerializer.class); + + public static int KV_HEAD_LENGTH = Constants.SIZEOF_KV_LENGTH; + + private final INativeSerializer<Writable> keySerializer; + private final INativeSerializer<Writable> valueSerializer; + + public KVSerializer(Class<K> kclass, Class<V> vclass) throws IOException { + + this.keySerializer = NativeSerialization.getInstance().getSerializer(kclass); + this.valueSerializer = NativeSerialization.getInstance().getSerializer(vclass); + } + + @Override + public void updateLength(SizedWritable key, SizedWritable value) throws IOException { + key.length = keySerializer.getLength(key.v); + value.length = valueSerializer.getLength(value.v); + return; + } + + @Override + public int serializeKV(DataOutputStream out, SizedWritable key, SizedWritable value) throws IOException { + return serializePartitionKV(out, -1, key, value); + } + + @Override + public int serializePartitionKV(DataOutputStream out, int partitionId, SizedWritable key, SizedWritable value) + throws IOException { + + if (key.length == SizedWritable.INVALID_LENGTH || value.length == SizedWritable.INVALID_LENGTH) { + updateLength(key, value); + } + + final int keyLength = key.length; + final int valueLength = value.length; + + int bytesWritten = KV_HEAD_LENGTH + keyLength + valueLength; + if (partitionId != -1) { + bytesWritten += Constants.SIZEOF_PARTITION_LENGTH; + } + + if (out.hasUnFlushedData() && out.shortOfSpace(bytesWritten)) { + out.flush(); + } + + if (partitionId != -1) { + out.writeInt(partitionId); + } + + out.writeInt(keyLength); + out.writeInt(valueLength); + + keySerializer.serialize(key.v, out); + valueSerializer.serialize(value.v, out); + + return bytesWritten; + } + + @Override + public int deserializeKV(DataInputStream in, SizedWritable key, SizedWritable value) throws IOException { + + if (!in.hasUnReadData()) { + return 0; + } + + key.length = in.readInt(); + value.length = in.readInt(); + + keySerializer.deserialize(in, key.length, key.v); + valueSerializer.deserialize(in, value.length, value.v); + + return key.length + value.length + KV_HEAD_LENGTH; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java new file mode 100644 index 0000000..ec326ca --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/LongWritableSerializer.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class LongWritableSerializer extends DefaultSerializer implements + INativeComparable { + @Override + public int getLength(Writable w) throws IOException { + return 8; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java new file mode 100644 index 0000000..f5a033d --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NativeSerialization.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.io.Writable; + +public class NativeSerialization { + + private final ConcurrentHashMap<String, Class<?>> map = new ConcurrentHashMap<String, Class<?>>(); + + public boolean accept(Class<?> c) { + return Writable.class.isAssignableFrom(c); + } + + @SuppressWarnings("unchecked") + public INativeSerializer<Writable> getSerializer(Class<?> c) throws IOException { + + if (null == c) { + return null; + } + if (!Writable.class.isAssignableFrom(c)) { + throw new IOException("Cannot serialize type " + c.getName() + ", we only accept subclass of Writable"); + } + final String name = c.getName(); + final Class<?> serializer = map.get(name); + + if (null != serializer) { + try { + return (INativeSerializer<Writable>) serializer.newInstance(); + } catch (final Exception e) { + throw new IOException(e); + } + } + return new DefaultSerializer(); + } + + public void register(String klass, Class<?> serializer) throws IOException { + if (null == klass || null == serializer) { + throw new IOException("invalid arguments, klass or serializer is null"); + } + + if (!INativeSerializer.class.isAssignableFrom(serializer)) { + throw new IOException("Serializer is not assigable from INativeSerializer"); + } + + final Class<?> storedSerializer = map.get(klass); + if (null == storedSerializer) { + map.put(klass, serializer); + return; + } else { + if (!storedSerializer.getName().equals(serializer.getName())) { + throw new IOException("Error! Serializer already registered, exist: " + storedSerializer.getName() + ", new: " + + serializer.getName()); + } + } + } + + public void reset() { + map.clear(); + } + + private static NativeSerialization instance = new NativeSerialization(); + + public static NativeSerialization getInstance() { + return instance; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java new file mode 100644 index 0000000..afa4e8e --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/NullWritableSerializer.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class NullWritableSerializer extends DefaultSerializer implements + INativeComparable { + + @Override + public int getLength(Writable w) throws IOException { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java new file mode 100644 index 0000000..e95a0c4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/SerializationFramework.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred.nativetask.serde; + +public enum SerializationFramework { + WRITABLE_SERIALIZATION(0), NATIVE_SERIALIZATION(1); + + private int type; + + SerializationFramework(int type) { + this.type = type; + } + + public int getType() { + return type; + } +}; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java new file mode 100644 index 0000000..63a64de --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/TextSerializer.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class TextSerializer implements INativeSerializer<Text>, INativeComparable { + + public TextSerializer() throws SecurityException, NoSuchMethodException { + } + + @Override + public int getLength(Text w) throws IOException { + return w.getLength(); + } + + @Override + public void serialize(Text w, DataOutput out) throws IOException { + out.write(w.getBytes(), 0, w.getLength()); + } + + @Override + public void deserialize(DataInput in, int length, Text w) throws IOException { + try { + w.setCapacity(length, true); + w.setLength(length); + } catch (final Exception e) { + throw new IOException(e); + } + final byte[] bytes = w.getBytes(); + in.readFully(bytes, 0, length); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java new file mode 100644 index 0000000..0e142f3 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VIntWritableSerializer.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class VIntWritableSerializer extends DefaultSerializer implements + INativeComparable { +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/b2551c06/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java new file mode 100644 index 0000000..d66e179 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/java/org/apache/hadoop/mapred/nativetask/serde/VLongWritableSerializer.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred.nativetask.serde; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.nativetask.INativeComparable; + +public class VLongWritableSerializer extends DefaultSerializer implements + INativeComparable { +} \ No newline at end of file