http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java new file mode 100644 index 0000000..5e200a0 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java @@ -0,0 +1,261 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import com.google.common.base.Objects; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; + +/** + * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable, + * and is able to resize without recreating new array if not necessary. + * <p> + * + * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field, + * which is the desired valid number of <tt>BytesRefWritable</tt> it holds. + * <tt>resetValid</tt> can reset the valid, but it will not care the underlying + * BytesRefWritable. + */ + +public class BytesRefArrayWritable implements Writable, + Comparable<BytesRefArrayWritable> { + + private BytesRefWritable[] bytesRefWritables = null; + + private int valid = 0; + + /** + * Constructs an empty array with the specified capacity. + * + * @param capacity + * initial capacity + * @exception IllegalArgumentException + * if the specified initial capacity is negative + */ + public BytesRefArrayWritable(int capacity) { + if (capacity < 0) { + throw new IllegalArgumentException("Capacity can not be negative."); + } + bytesRefWritables = new BytesRefWritable[0]; + ensureCapacity(capacity); + } + + /** + * Constructs an empty array with a capacity of ten. + */ + public BytesRefArrayWritable() { + this(10); + } + + /** + * Returns the number of valid elements. + * + * @return the number of valid elements + */ + public int size() { + return valid; + } + + /** + * Gets the BytesRefWritable at the specified position. Make sure the position + * is valid by first call resetValid. + * + * @param index + * the position index, starting from zero + * @throws IndexOutOfBoundsException + */ + public BytesRefWritable get(int index) { + if (index >= valid) { + throw new IndexOutOfBoundsException( + "This BytesRefArrayWritable only has " + valid + " valid values."); + } + return bytesRefWritables[index]; + } + + /** + * Gets the BytesRefWritable at the specified position without checking. + * + * @param index + * the position index, starting from zero + * @throws IndexOutOfBoundsException + */ + public BytesRefWritable unCheckedGet(int index) { + return bytesRefWritables[index]; + } + + /** + * Set the BytesRefWritable at the specified position with the specified + * BytesRefWritable. + * + * @param index + * index position + * @param bytesRefWritable + * the new element + * @throws IllegalArgumentException + * if the specified new element is null + */ + public void set(int index, BytesRefWritable bytesRefWritable) { + if (bytesRefWritable == null) { + throw new IllegalArgumentException("Can not assign null."); + } + ensureCapacity(index + 1); + bytesRefWritables[index] = bytesRefWritable; + if (valid <= index) { + valid = index + 1; + } + } + + /** + * {@inheritDoc} + */ + @Override + public int compareTo(BytesRefArrayWritable other) { + if (other == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + if (this == other) { + return 0; + } + int sizeDiff = valid - other.valid; + if (sizeDiff != 0) { + return sizeDiff; + } + for (int i = 0; i < valid; i++) { + if (other.contains(bytesRefWritables[i])) { + continue; + } else { + return 1; + } + } + return 0; + } + + @Override + public int hashCode(){ + return Objects.hashCode(bytesRefWritables); + } + /** + * Returns <tt>true</tt> if this instance contains one or more the specified + * BytesRefWritable. + * + * @param bytesRefWritable + * BytesRefWritable element to be tested + * @return <tt>true</tt> if contains the specified element + * @throws IllegalArgumentException + * if the specified element is null + */ + public boolean contains(BytesRefWritable bytesRefWritable) { + if (bytesRefWritable == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + for (int i = 0; i < valid; i++) { + if (bytesRefWritables[i].equals(bytesRefWritable)) { + return true; + } + } + return false; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object o) { + if (o == null || !(o instanceof BytesRefArrayWritable)) { + return false; + } + return compareTo((BytesRefArrayWritable) o) == 0; + } + + /** + * Removes all elements. + */ + public void clear() { + valid = 0; + } + + /** + * enlarge the capacity if necessary, to ensure that it can hold the number of + * elements specified by newValidCapacity argument. It will also narrow the + * valid capacity when needed. Notice: it only enlarge or narrow the valid + * capacity with no care of the already stored invalid BytesRefWritable. + * + * @param newValidCapacity + * the desired capacity + */ + public void resetValid(int newValidCapacity) { + ensureCapacity(newValidCapacity); + valid = newValidCapacity; + } + + protected void ensureCapacity(int newCapacity) { + int size = bytesRefWritables.length; + if (size < newCapacity) { + bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity); + while (size < newCapacity) { + bytesRefWritables[size] = new BytesRefWritable(); + size++; + } + } + } + + /** + * {@inheritDoc} + */ + @Override + public void readFields(DataInput in) throws IOException { + int count = in.readInt(); + ensureCapacity(count); + for (int i = 0; i < count; i++) { + bytesRefWritables[i].readFields(in); + } + valid = count; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(valid); + + for (int i = 0; i < valid; i++) { + BytesRefWritable cu = bytesRefWritables[i]; + cu.write(out); + } + } + + static { + WritableFactories.setFactory(BytesRefArrayWritable.class, + new WritableFactory() { + + @Override + public Writable newInstance() { + return new BytesRefArrayWritable(); + } + + }); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java new file mode 100644 index 0000000..158c740 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.io.WritableFactory; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used + * to avoid unnecessary byte copy. + */ +public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> { + + private static final byte[] EMPTY_BYTES = new byte[0]; + public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable(); + + int start = 0; + int length = 0; + byte[] bytes = null; + + LazyDecompressionCallback lazyDecompressObj; + + /** + * Create a zero-size bytes. + */ + public BytesRefWritable() { + this(EMPTY_BYTES); + } + + /** + * Create a BytesRefWritable with <tt>length</tt> bytes. + */ + public BytesRefWritable(int length) { + assert length > 0; + this.length = length; + bytes = new byte[this.length]; + start = 0; + } + + /** + * Create a BytesRefWritable referenced to the given bytes. + */ + public BytesRefWritable(byte[] bytes) { + this.bytes = bytes; + length = bytes.length; + start = 0; + } + + /** + * Create a BytesRefWritable referenced to one section of the given bytes. The + * section is determined by argument <tt>offset</tt> and <tt>len</tt>. + */ + public BytesRefWritable(byte[] data, int offset, int len) { + bytes = data; + start = offset; + length = len; + } + + /** + * Create a BytesRefWritable referenced to one section of the given bytes. The + * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback + * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to + * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and + * <tt>len</tt> after uncompressing the data. + */ + public BytesRefWritable(LazyDecompressionCallback lazyDecompressData, + int offset, int len) { + lazyDecompressObj = lazyDecompressData; + start = offset; + length = len; + } + + private void lazyDecompress() throws IOException { + if (bytes == null && lazyDecompressObj != null) { + bytes = lazyDecompressObj.decompress(); + } + } + + /** + * Returns a copy of the underlying bytes referenced by this instance. + * + * @return a new copied byte array + * @throws java.io.IOException + */ + public byte[] getBytesCopy() throws IOException { + lazyDecompress(); + byte[] bb = new byte[length]; + System.arraycopy(bytes, start, bb, 0, length); + return bb; + } + + /** + * Returns the underlying bytes. + * + * @throws java.io.IOException + */ + public byte[] getData() throws IOException { + lazyDecompress(); + return bytes; + } + + /** + * readFields() will corrupt the array. So use the set method whenever + * possible. + * + * @see #readFields(java.io.DataInput) + */ + public void set(byte[] newData, int offset, int len) { + bytes = newData; + start = offset; + length = len; + lazyDecompressObj = null; + } + + /** + * readFields() will corrupt the array. So use the set method whenever + * possible. + * + * @see #readFields(java.io.DataInput) + */ + public void set(LazyDecompressionCallback newData, int offset, int len) { + bytes = null; + start = offset; + length = len; + lazyDecompressObj = newData; + } + + public void writeDataTo(DataOutput out) throws IOException { + lazyDecompress(); + out.write(bytes, start, length); + } + + /** + * Always reuse the bytes array if length of bytes array is equal or greater + * to the current record, otherwise create a new one. readFields will corrupt + * the array. Please use set() whenever possible. + * + * @see #set(byte[], int, int) + */ + public void readFields(DataInput in) throws IOException { + int len = in.readInt(); + if (len > bytes.length) { + bytes = new byte[len]; + } + start = 0; + length = len; + in.readFully(bytes, start, length); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + lazyDecompress(); + out.writeInt(length); + out.write(bytes, start, length); + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return super.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(3 * length); + for (int idx = start; idx < length; idx++) { + // if not the first, put a blank separator in + if (idx != 0) { + sb.append(' '); + } + String num = Integer.toHexString(0xff & bytes[idx]); + // if it is only one digit, add a leading 0. + if (num.length() < 2) { + sb.append('0'); + } + sb.append(num); + } + return sb.toString(); + } + + /** {@inheritDoc} */ + @Override + public int compareTo(BytesRefWritable other) { + if (other == null) { + throw new IllegalArgumentException("Argument can not be null."); + } + if (this == other) { + return 0; + } + try { + return WritableComparator.compareBytes(getData(), start, getLength(), + other.getData(), other.start, other.getLength()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object right_obj) { + if (right_obj == null || !(right_obj instanceof BytesRefWritable)) { + return false; + } + return compareTo((BytesRefWritable) right_obj) == 0; + } + + static { + WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() { + + @Override + public Writable newInstance() { + return new BytesRefWritable(); + } + + }); + } + + public int getLength() { + return length; + } + + public int getStart() { + return start; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java new file mode 100644 index 0000000..352776f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.StringUtils; + +import java.util.ArrayList; + +/** + * ColumnProjectionUtils. + * + */ +public final class ColumnProjectionUtils { + + public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; + + /** + * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column + * is included in the list, RCFile's reader will not skip its value. + * + */ + public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) { + String id = toReadColumnIDString(ids); + setReadColumnIDConf(conf, id); + } + + /** + * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column + * is included in the list, RCFile's reader will not skip its value. + * + */ + public static void appendReadColumnIDs(Configuration conf, + ArrayList<Integer> ids) { + String id = toReadColumnIDString(ids); + if (id != null) { + String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); + String newConfStr = id; + if (old != null) { + newConfStr = newConfStr + StringUtils.COMMA_STR + old; + } + + setReadColumnIDConf(conf, newConfStr); + } + } + + private static void setReadColumnIDConf(Configuration conf, String id) { + if (id == null || id.length() <= 0) { + conf.set(READ_COLUMN_IDS_CONF_STR, ""); + return; + } + + conf.set(READ_COLUMN_IDS_CONF_STR, id); + } + + private static String toReadColumnIDString(ArrayList<Integer> ids) { + String id = null; + if (ids != null) { + for (int i = 0; i < ids.size(); i++) { + if (i == 0) { + id = "" + ids.get(i); + } else { + id = id + StringUtils.COMMA_STR + ids.get(i); + } + } + } + return id; + } + + /** + * Returns an array of column ids(start from zero) which is set in the given + * parameter <tt>conf</tt>. + */ + public static ArrayList<Integer> getReadColumnIDs(Configuration conf) { + if (conf == null) { + return new ArrayList<Integer>(0); + } + String skips = conf.get(READ_COLUMN_IDS_CONF_STR, ""); + String[] list = StringUtils.split(skips); + ArrayList<Integer> result = new ArrayList<Integer>(list.length); + for (String element : list) { + // it may contain duplicates, remove duplicates + Integer toAdd = Integer.parseInt(element); + if (!result.contains(toAdd)) { + result.add(toAdd); + } + } + return result; + } + + /** + * Clears the read column ids set in the conf, and will read all columns. + */ + public static void setFullyReadColumns(Configuration conf) { + conf.set(READ_COLUMN_IDS_CONF_STR, ""); + } + + private ColumnProjectionUtils() { + // prevent instantiation + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java new file mode 100644 index 0000000..eab2356 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import java.io.IOException; + +/** + * Used to call back lazy decompression process. + * + * @see org.apache.tajo.storage.rcfile.BytesRefWritable + */ +public interface LazyDecompressionCallback { + + byte[] decompress() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java new file mode 100644 index 0000000..bb6af22 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import java.io.ByteArrayInputStream; + +/** + * A thread-not-safe version of ByteArrayInputStream, which removes all + * synchronized modifiers. + */ +public class NonSyncByteArrayInputStream extends ByteArrayInputStream { + public NonSyncByteArrayInputStream() { + super(new byte[] {}); + } + + public NonSyncByteArrayInputStream(byte[] bs) { + super(bs); + } + + public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) { + super(buf, offset, length); + } + + public void reset(byte[] input, int start, int length) { + buf = input; + count = start + length; + mark = start; + pos = start; + } + + public int getPosition() { + return pos; + } + + public int getLength() { + return count; + } + + /** + * {@inheritDoc} + */ + @Override + public int read() { + return (pos < count) ? (buf[pos++] & 0xff) : -1; + } + + /** + * {@inheritDoc} + */ + @Override + public int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + if (pos >= count) { + return -1; + } + if (pos + len > count) { + len = count - pos; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + /** + * {@inheritDoc} + */ + @Override + public long skip(long n) { + if (pos + n > count) { + n = count - pos; + } + if (n < 0) { + return 0; + } + pos += n; + return n; + } + + /** + * {@inheritDoc} + */ + @Override + public int available() { + return count - pos; + } + + public void seek(int pos) { + this.pos = pos; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java new file mode 100644 index 0000000..53a3dca --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.IOException; +import java.io.OutputStream; + +/** + * A thread-not-safe version of ByteArrayOutputStream, which removes all + * synchronized modifiers. + */ +public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream { + public NonSyncByteArrayOutputStream(int size) { + super(size); + } + + public NonSyncByteArrayOutputStream() { + super(64 * 1024); + } + + public byte[] getData() { + return buf; + } + + public int getLength() { + return count; + } + + /** + * {@inheritDoc} + */ + @Override + public void reset() { + count = 0; + } + + public void write(DataInput in, int length) throws IOException { + enLargeBuffer(length); + in.readFully(buf, count, length); + count += length; + } + + private byte[] vLongBytes = new byte[9]; + + public int writeVLongToByteArray(byte[] bytes, int offset, long l) { + if (l >= -112 && l <= 127) { + bytes[offset] = (byte) l; + return 1; + } + + int len = -112; + if (l < 0) { + l ^= -1L; // take one's complement' + len = -120; + } + + long tmp = l; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + bytes[offset++] = (byte) len; + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); + } + return 1 + len; + } + + public int writeVLong(long l) { + int len = writeVLongToByteArray(vLongBytes, 0, l); + write(vLongBytes, 0, len); + return len; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(int b) { + enLargeBuffer(1); + buf[count] = (byte) b; + count += 1; + } + + private int enLargeBuffer(int increment) { + int temp = count + increment; + int newLen = temp; + if (temp > buf.length) { + if ((buf.length << 1) > temp) { + newLen = buf.length << 1; + } + byte newbuf[] = new byte[newLen]; + System.arraycopy(buf, 0, newbuf, 0, count); + buf = newbuf; + } + return newLen; + } + + /** + * {@inheritDoc} + */ + @Override + public void write(byte b[], int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) + || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + enLargeBuffer(len); + System.arraycopy(b, off, buf, count, len); + count += len; + } + + /** + * {@inheritDoc} + */ + @Override + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java new file mode 100644 index 0000000..46745ab --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java @@ -0,0 +1,507 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import org.apache.hadoop.fs.Seekable; + +import java.io.*; + +/** + * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all + * synchronized modifiers. + */ +public class NonSyncDataInputBuffer extends FilterInputStream implements + DataInput, Seekable { + + private final NonSyncByteArrayInputStream buffer; + + byte[] buff = new byte[16]; + + /** Constructs a new empty buffer. */ + public NonSyncDataInputBuffer() { + this(new NonSyncByteArrayInputStream()); + } + + private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) { + super(buffer); + this.buffer = buffer; + } + + /** Resets the data that the buffer reads. */ + public void reset(byte[] input, int length) { + buffer.reset(input, 0, length); + } + + /** Resets the data that the buffer reads. */ + public void reset(byte[] input, int start, int length) { + buffer.reset(input, start, length); + } + + /** Returns the current position in the input. */ + public int getPosition() { + return buffer.getPosition(); + } + + /** Returns the length of the input. */ + public int getLength() { + return buffer.getLength(); + } + + /** + * Reads bytes from the source stream into the byte array <code>buffer</code>. + * The number of bytes actually read is returned. + * + * @param buffer + * the buffer to read bytes into + * @return the number of bytes actually read or -1 if end of stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + @Override + public final int read(byte[] buffer) throws IOException { + return in.read(buffer, 0, buffer.length); + } + + /** + * Read at most <code>length</code> bytes from this DataInputStream and stores + * them in byte array <code>buffer</code> starting at <code>offset</code>. + * Answer the number of bytes actually read or -1 if no bytes were read and + * end of stream was encountered. + * + * @param buffer + * the byte array in which to store the read bytes. + * @param offset + * the offset in <code>buffer</code> to store the read bytes. + * @param length + * the maximum number of bytes to store in <code>buffer</code>. + * @return the number of bytes actually read or -1 if end of stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + @Deprecated + @Override + public final int read(byte[] buffer, int offset, int length) + throws IOException { + return in.read(buffer, offset, length); + } + + /** + * Reads a boolean from this stream. + * + * @return the next boolean value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final boolean readBoolean() throws IOException { + int temp = in.read(); + if (temp < 0) { + throw new EOFException(); + } + return temp != 0; + } + + /** + * Reads an 8-bit byte value from this stream. + * + * @return the next byte value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final byte readByte() throws IOException { + int temp = in.read(); + if (temp < 0) { + throw new EOFException(); + } + return (byte) temp; + } + + /** + * Reads a 16-bit character value from this stream. + * + * @return the next <code>char</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + private int readToBuff(int count) throws IOException { + int offset = 0; + + while (offset < count) { + int bytesRead = in.read(buff, offset, count - offset); + if (bytesRead == -1) { + return bytesRead; + } + offset += bytesRead; + } + return offset; + } + + public final char readChar() throws IOException { + if (readToBuff(2) < 0) { + throw new EOFException(); + } + return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); + + } + + /** + * Reads a 64-bit <code>double</code> value from this stream. + * + * @return the next <code>double</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final double readDouble() throws IOException { + return Double.longBitsToDouble(readLong()); + } + + /** + * Reads a 32-bit <code>float</code> value from this stream. + * + * @return the next <code>float</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final float readFloat() throws IOException { + return Float.intBitsToFloat(readInt()); + } + + /** + * Reads bytes from this stream into the byte array <code>buffer</code>. This + * method will block until <code>buffer.length</code> number of bytes have + * been read. + * + * @param buffer + * to read bytes into + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final void readFully(byte[] buffer) throws IOException { + readFully(buffer, 0, buffer.length); + } + + /** + * Reads bytes from this stream and stores them in the byte array + * <code>buffer</code> starting at the position <code>offset</code>. This + * method blocks until <code>count</code> bytes have been read. + * + * @param buffer + * the byte array into which the data is read + * @param offset + * the offset the operation start at + * @param length + * the maximum number of bytes to read + * + * @throws java.io.IOException + * if a problem occurs while reading from this stream + * @throws java.io.EOFException + * if reaches the end of the stream before enough bytes have been + * read + */ + public final void readFully(byte[] buffer, int offset, int length) + throws IOException { + if (length < 0) { + throw new IndexOutOfBoundsException(); + } + if (length == 0) { + return; + } + if (in == null || buffer == null) { + throw new NullPointerException("Null Pointer to underlying input stream"); + } + + if (offset < 0 || offset > buffer.length - length) { + throw new IndexOutOfBoundsException(); + } + while (length > 0) { + int result = in.read(buffer, offset, length); + if (result < 0) { + throw new EOFException(); + } + offset += result; + length -= result; + } + } + + /** + * Reads a 32-bit integer value from this stream. + * + * @return the next <code>int</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final int readInt() throws IOException { + if (readToBuff(4) < 0) { + throw new EOFException(); + } + return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) + | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); + } + + /** + * Answers a <code>String</code> representing the next line of text available + * in this BufferedReader. A line is represented by 0 or more characters + * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or + * end of stream. The <code>String</code> does not include the newline + * sequence. + * + * @return the contents of the line or null if no characters were read before + * end of stream. + * + * @throws java.io.IOException + * If the DataInputStream is already closed or some other IO error + * occurs. + * + * @deprecated Use BufferedReader + */ + @Deprecated + public final String readLine() throws IOException { + StringBuilder line = new StringBuilder(80); // Typical line length + boolean foundTerminator = false; + while (true) { + int nextByte = in.read(); + switch (nextByte) { + case -1: + if (line.length() == 0 && !foundTerminator) { + return null; + } + return line.toString(); + case (byte) '\r': + if (foundTerminator) { + ((PushbackInputStream) in).unread(nextByte); + return line.toString(); + } + foundTerminator = true; + /* Have to be able to peek ahead one byte */ + if (!(in.getClass() == PushbackInputStream.class)) { + in = new PushbackInputStream(in); + } + break; + case (byte) '\n': + return line.toString(); + default: + if (foundTerminator) { + ((PushbackInputStream) in).unread(nextByte); + return line.toString(); + } + line.append((char) nextByte); + } + } + } + + /** + * Reads a 64-bit <code>long</code> value from this stream. + * + * @return the next <code>long</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final long readLong() throws IOException { + if (readToBuff(8) < 0) { + throw new EOFException(); + } + int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) + | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); + int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16) + | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff); + + return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL); + } + + /** + * Reads a 16-bit <code>short</code> value from this stream. + * + * @return the next <code>short</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final short readShort() throws IOException { + if (readToBuff(2) < 0) { + throw new EOFException(); + } + return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); + } + + /** + * Reads an unsigned 8-bit <code>byte</code> value from this stream and + * returns it as an int. + * + * @return the next unsigned byte value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final int readUnsignedByte() throws IOException { + int temp = in.read(); + if (temp < 0) { + throw new EOFException(); + } + return temp; + } + + /** + * Reads a 16-bit unsigned <code>short</code> value from this stream and + * returns it as an int. + * + * @return the next unsigned <code>short</code> value from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final int readUnsignedShort() throws IOException { + if (readToBuff(2) < 0) { + throw new EOFException(); + } + return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); + } + + /** + * Reads a UTF format String from this Stream. + * + * @return the next UTF String from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public final String readUTF() throws IOException { + return decodeUTF(readUnsignedShort()); + } + + String decodeUTF(int utfSize) throws IOException { + return decodeUTF(utfSize, this); + } + + private static String decodeUTF(int utfSize, DataInput in) throws IOException { + byte[] buf = new byte[utfSize]; + char[] out = new char[utfSize]; + in.readFully(buf, 0, utfSize); + + return convertUTF8WithBuf(buf, out, 0, utfSize); + } + + /** + * Reads a UTF format String from the DataInput Stream <code>in</code>. + * + * @param in + * the input stream to read from + * @return the next UTF String from the source stream. + * + * @throws java.io.IOException + * If a problem occurs reading from this DataInputStream. + * + */ + public static final String readUTF(DataInput in) throws IOException { + return decodeUTF(in.readUnsignedShort(), in); + } + + /** + * Skips <code>count</code> number of bytes in this stream. Subsequent + * <code>read()</code>'s will not return these bytes unless + * <code>reset()</code> is used. + * + * @param count + * the number of bytes to skip. + * @return the number of bytes actually skipped. + * + * @throws java.io.IOException + * If the stream is already closed or another IOException occurs. + */ + public final int skipBytes(int count) throws IOException { + int skipped = 0; + long skip; + while (skipped < count && (skip = in.skip(count - skipped)) != 0) { + skipped += skip; + } + if (skipped < 0) { + throw new EOFException(); + } + return skipped; + } + + public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset, + int utfSize) throws UTFDataFormatException { + int count = 0, s = 0, a; + while (count < utfSize) { + if ((out[s] = (char) buf[offset + count++]) < '\u0080') { + s++; + } else if (((a = out[s]) & 0xe0) == 0xc0) { + if (count >= utfSize) { + throw new UTFDataFormatException(); + } + int b = buf[count++]; + if ((b & 0xC0) != 0x80) { + throw new UTFDataFormatException(); + } + out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F)); + } else if ((a & 0xf0) == 0xe0) { + if (count + 1 >= utfSize) { + throw new UTFDataFormatException(); + } + int b = buf[count++]; + int c = buf[count++]; + if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) { + throw new UTFDataFormatException(); + } + out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F)); + } else { + throw new UTFDataFormatException(); + } + } + return new String(out, 0, s); + } + + @Override + public void seek(long pos) throws IOException { + buffer.seek((int)pos); + } + + @Override + public long getPos() throws IOException { + return buffer.getPosition(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java new file mode 100644 index 0000000..3944f38 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.rcfile; + +import java.io.DataInput; +import java.io.DataOutputStream; +import java.io.IOException; + +/** + * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all + * synchronized modifiers. + */ +public class NonSyncDataOutputBuffer extends DataOutputStream { + + private final NonSyncByteArrayOutputStream buffer; + + /** Constructs a new empty buffer. */ + public NonSyncDataOutputBuffer() { + this(new NonSyncByteArrayOutputStream()); + } + + private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) { + super(buffer); + this.buffer = buffer; + } + + /** + * Returns the current contents of the buffer. Data is only valid to + * {@link #getLength()}. + */ + public byte[] getData() { + return buffer.getData(); + } + + /** Returns the length of the valid data currently in the buffer. */ + public int getLength() { + return buffer.getLength(); + } + + /** Resets the buffer to empty. */ + public NonSyncDataOutputBuffer reset() { + written = 0; + buffer.reset(); + return this; + } + + /** Writes bytes from a DataInput directly into the buffer. */ + public void write(DataInput in, int length) throws IOException { + buffer.write(in, length); + } + + @Override + public void write(int b) throws IOException { + buffer.write(b); + incCount(1); + } + + @Override + public void write(byte b[], int off, int len) throws IOException { + buffer.write(b, off, len); + incCount(len); + } + + public void writeTo(DataOutputStream out) throws IOException { + buffer.writeTo(out); + } + + private void incCount(int value) { + if (written + value < 0) { + written = Integer.MAX_VALUE; + } else { + written += value; + } + } +}
