http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java deleted file mode 100644 index 352776f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; - -import java.util.ArrayList; - -/** - * ColumnProjectionUtils. - * - */ -public final class ColumnProjectionUtils { - - public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; - - /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * - */ - public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) { - String id = toReadColumnIDString(ids); - setReadColumnIDConf(conf, id); - } - - /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * - */ - public static void appendReadColumnIDs(Configuration conf, - ArrayList<Integer> ids) { - String id = toReadColumnIDString(ids); - if (id != null) { - String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); - String newConfStr = id; - if (old != null) { - newConfStr = newConfStr + StringUtils.COMMA_STR + old; - } - - setReadColumnIDConf(conf, newConfStr); - } - } - - private static void setReadColumnIDConf(Configuration conf, String id) { - if (id == null || id.length() <= 0) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); - return; - } - - conf.set(READ_COLUMN_IDS_CONF_STR, id); - } - - private static String toReadColumnIDString(ArrayList<Integer> ids) { - String id = null; - if (ids != null) { - for (int i = 0; i < ids.size(); i++) { - if (i == 0) { - id = "" + ids.get(i); - } else { - id = id + StringUtils.COMMA_STR + ids.get(i); - } - } - } - return id; - } - - /** - * Returns an array of column ids(start from zero) which is set in the given - * parameter <tt>conf</tt>. - */ - public static ArrayList<Integer> getReadColumnIDs(Configuration conf) { - if (conf == null) { - return new ArrayList<Integer>(0); - } - String skips = conf.get(READ_COLUMN_IDS_CONF_STR, ""); - String[] list = StringUtils.split(skips); - ArrayList<Integer> result = new ArrayList<Integer>(list.length); - for (String element : list) { - // it may contain duplicates, remove duplicates - Integer toAdd = Integer.parseInt(element); - if (!result.contains(toAdd)) { - result.add(toAdd); - } - } - return result; - } - - /** - * Clears the read column ids set in the conf, and will read all columns. - */ - public static void setFullyReadColumns(Configuration conf) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); - } - - private ColumnProjectionUtils() { - // prevent instantiation - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java deleted file mode 100644 index 707d55a..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.IOException; - -/** - * Used to call back lazy decompression process. - * - * @see BytesRefWritable - */ -public interface LazyDecompressionCallback { - - byte[] decompress() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java deleted file mode 100644 index bb6af22..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.ByteArrayInputStream; - -/** - * A thread-not-safe version of ByteArrayInputStream, which removes all - * synchronized modifiers. - */ -public class NonSyncByteArrayInputStream extends ByteArrayInputStream { - public NonSyncByteArrayInputStream() { - super(new byte[] {}); - } - - public NonSyncByteArrayInputStream(byte[] bs) { - super(bs); - } - - public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) { - super(buf, offset, length); - } - - public void reset(byte[] input, int start, int length) { - buf = input; - count = start + length; - mark = start; - pos = start; - } - - public int getPosition() { - return pos; - } - - public int getLength() { - return count; - } - - /** - * {@inheritDoc} - */ - @Override - public int read() { - return (pos < count) ? (buf[pos++] & 0xff) : -1; - } - - /** - * {@inheritDoc} - */ - @Override - public int read(byte b[], int off, int len) { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (pos >= count) { - return -1; - } - if (pos + len > count) { - len = count - pos; - } - if (len <= 0) { - return 0; - } - System.arraycopy(buf, pos, b, off, len); - pos += len; - return len; - } - - /** - * {@inheritDoc} - */ - @Override - public long skip(long n) { - if (pos + n > count) { - n = count - pos; - } - if (n < 0) { - return 0; - } - pos += n; - return n; - } - - /** - * {@inheritDoc} - */ - @Override - public int available() { - return count - pos; - } - - public void seek(int pos) { - this.pos = pos; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java deleted file mode 100644 index 53a3dca..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.IOException; -import java.io.OutputStream; - -/** - * A thread-not-safe version of ByteArrayOutputStream, which removes all - * synchronized modifiers. - */ -public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream { - public NonSyncByteArrayOutputStream(int size) { - super(size); - } - - public NonSyncByteArrayOutputStream() { - super(64 * 1024); - } - - public byte[] getData() { - return buf; - } - - public int getLength() { - return count; - } - - /** - * {@inheritDoc} - */ - @Override - public void reset() { - count = 0; - } - - public void write(DataInput in, int length) throws IOException { - enLargeBuffer(length); - in.readFully(buf, count, length); - count += length; - } - - private byte[] vLongBytes = new byte[9]; - - public int writeVLongToByteArray(byte[] bytes, int offset, long l) { - if (l >= -112 && l <= 127) { - bytes[offset] = (byte) l; - return 1; - } - - int len = -112; - if (l < 0) { - l ^= -1L; // take one's complement' - len = -120; - } - - long tmp = l; - while (tmp != 0) { - tmp = tmp >> 8; - len--; - } - - bytes[offset++] = (byte) len; - len = (len < -120) ? -(len + 120) : -(len + 112); - - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); - } - return 1 + len; - } - - public int writeVLong(long l) { - int len = writeVLongToByteArray(vLongBytes, 0, l); - write(vLongBytes, 0, len); - return len; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(int b) { - enLargeBuffer(1); - buf[count] = (byte) b; - count += 1; - } - - private int enLargeBuffer(int increment) { - int temp = count + increment; - int newLen = temp; - if (temp > buf.length) { - if ((buf.length << 1) > temp) { - newLen = buf.length << 1; - } - byte newbuf[] = new byte[newLen]; - System.arraycopy(buf, 0, newbuf, 0, count); - buf = newbuf; - } - return newLen; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(byte b[], int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) - || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - enLargeBuffer(len); - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * {@inheritDoc} - */ - @Override - public void writeTo(OutputStream out) throws IOException { - out.write(buf, 0, count); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java deleted file mode 100644 index 46745ab..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java +++ /dev/null @@ -1,507 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.fs.Seekable; - -import java.io.*; - -/** - * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all - * synchronized modifiers. - */ -public class NonSyncDataInputBuffer extends FilterInputStream implements - DataInput, Seekable { - - private final NonSyncByteArrayInputStream buffer; - - byte[] buff = new byte[16]; - - /** Constructs a new empty buffer. */ - public NonSyncDataInputBuffer() { - this(new NonSyncByteArrayInputStream()); - } - - private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) { - super(buffer); - this.buffer = buffer; - } - - /** Resets the data that the buffer reads. */ - public void reset(byte[] input, int length) { - buffer.reset(input, 0, length); - } - - /** Resets the data that the buffer reads. */ - public void reset(byte[] input, int start, int length) { - buffer.reset(input, start, length); - } - - /** Returns the current position in the input. */ - public int getPosition() { - return buffer.getPosition(); - } - - /** Returns the length of the input. */ - public int getLength() { - return buffer.getLength(); - } - - /** - * Reads bytes from the source stream into the byte array <code>buffer</code>. - * The number of bytes actually read is returned. - * - * @param buffer - * the buffer to read bytes into - * @return the number of bytes actually read or -1 if end of stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - @Override - public final int read(byte[] buffer) throws IOException { - return in.read(buffer, 0, buffer.length); - } - - /** - * Read at most <code>length</code> bytes from this DataInputStream and stores - * them in byte array <code>buffer</code> starting at <code>offset</code>. - * Answer the number of bytes actually read or -1 if no bytes were read and - * end of stream was encountered. - * - * @param buffer - * the byte array in which to store the read bytes. - * @param offset - * the offset in <code>buffer</code> to store the read bytes. - * @param length - * the maximum number of bytes to store in <code>buffer</code>. - * @return the number of bytes actually read or -1 if end of stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - @Deprecated - @Override - public final int read(byte[] buffer, int offset, int length) - throws IOException { - return in.read(buffer, offset, length); - } - - /** - * Reads a boolean from this stream. - * - * @return the next boolean value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final boolean readBoolean() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp != 0; - } - - /** - * Reads an 8-bit byte value from this stream. - * - * @return the next byte value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final byte readByte() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return (byte) temp; - } - - /** - * Reads a 16-bit character value from this stream. - * - * @return the next <code>char</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - private int readToBuff(int count) throws IOException { - int offset = 0; - - while (offset < count) { - int bytesRead = in.read(buff, offset, count - offset); - if (bytesRead == -1) { - return bytesRead; - } - offset += bytesRead; - } - return offset; - } - - public final char readChar() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - - } - - /** - * Reads a 64-bit <code>double</code> value from this stream. - * - * @return the next <code>double</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - /** - * Reads a 32-bit <code>float</code> value from this stream. - * - * @return the next <code>float</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - /** - * Reads bytes from this stream into the byte array <code>buffer</code>. This - * method will block until <code>buffer.length</code> number of bytes have - * been read. - * - * @param buffer - * to read bytes into - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final void readFully(byte[] buffer) throws IOException { - readFully(buffer, 0, buffer.length); - } - - /** - * Reads bytes from this stream and stores them in the byte array - * <code>buffer</code> starting at the position <code>offset</code>. This - * method blocks until <code>count</code> bytes have been read. - * - * @param buffer - * the byte array into which the data is read - * @param offset - * the offset the operation start at - * @param length - * the maximum number of bytes to read - * - * @throws java.io.IOException - * if a problem occurs while reading from this stream - * @throws java.io.EOFException - * if reaches the end of the stream before enough bytes have been - * read - */ - public final void readFully(byte[] buffer, int offset, int length) - throws IOException { - if (length < 0) { - throw new IndexOutOfBoundsException(); - } - if (length == 0) { - return; - } - if (in == null || buffer == null) { - throw new NullPointerException("Null Pointer to underlying input stream"); - } - - if (offset < 0 || offset > buffer.length - length) { - throw new IndexOutOfBoundsException(); - } - while (length > 0) { - int result = in.read(buffer, offset, length); - if (result < 0) { - throw new EOFException(); - } - offset += result; - length -= result; - } - } - - /** - * Reads a 32-bit integer value from this stream. - * - * @return the next <code>int</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readInt() throws IOException { - if (readToBuff(4) < 0) { - throw new EOFException(); - } - return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) - | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); - } - - /** - * Answers a <code>String</code> representing the next line of text available - * in this BufferedReader. A line is represented by 0 or more characters - * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or - * end of stream. The <code>String</code> does not include the newline - * sequence. - * - * @return the contents of the line or null if no characters were read before - * end of stream. - * - * @throws java.io.IOException - * If the DataInputStream is already closed or some other IO error - * occurs. - * - * @deprecated Use BufferedReader - */ - @Deprecated - public final String readLine() throws IOException { - StringBuilder line = new StringBuilder(80); // Typical line length - boolean foundTerminator = false; - while (true) { - int nextByte = in.read(); - switch (nextByte) { - case -1: - if (line.length() == 0 && !foundTerminator) { - return null; - } - return line.toString(); - case (byte) '\r': - if (foundTerminator) { - ((PushbackInputStream) in).unread(nextByte); - return line.toString(); - } - foundTerminator = true; - /* Have to be able to peek ahead one byte */ - if (!(in.getClass() == PushbackInputStream.class)) { - in = new PushbackInputStream(in); - } - break; - case (byte) '\n': - return line.toString(); - default: - if (foundTerminator) { - ((PushbackInputStream) in).unread(nextByte); - return line.toString(); - } - line.append((char) nextByte); - } - } - } - - /** - * Reads a 64-bit <code>long</code> value from this stream. - * - * @return the next <code>long</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final long readLong() throws IOException { - if (readToBuff(8) < 0) { - throw new EOFException(); - } - int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) - | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); - int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16) - | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff); - - return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL); - } - - /** - * Reads a 16-bit <code>short</code> value from this stream. - * - * @return the next <code>short</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final short readShort() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - } - - /** - * Reads an unsigned 8-bit <code>byte</code> value from this stream and - * returns it as an int. - * - * @return the next unsigned byte value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readUnsignedByte() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp; - } - - /** - * Reads a 16-bit unsigned <code>short</code> value from this stream and - * returns it as an int. - * - * @return the next unsigned <code>short</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readUnsignedShort() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - } - - /** - * Reads a UTF format String from this Stream. - * - * @return the next UTF String from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final String readUTF() throws IOException { - return decodeUTF(readUnsignedShort()); - } - - String decodeUTF(int utfSize) throws IOException { - return decodeUTF(utfSize, this); - } - - private static String decodeUTF(int utfSize, DataInput in) throws IOException { - byte[] buf = new byte[utfSize]; - char[] out = new char[utfSize]; - in.readFully(buf, 0, utfSize); - - return convertUTF8WithBuf(buf, out, 0, utfSize); - } - - /** - * Reads a UTF format String from the DataInput Stream <code>in</code>. - * - * @param in - * the input stream to read from - * @return the next UTF String from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public static final String readUTF(DataInput in) throws IOException { - return decodeUTF(in.readUnsignedShort(), in); - } - - /** - * Skips <code>count</code> number of bytes in this stream. Subsequent - * <code>read()</code>'s will not return these bytes unless - * <code>reset()</code> is used. - * - * @param count - * the number of bytes to skip. - * @return the number of bytes actually skipped. - * - * @throws java.io.IOException - * If the stream is already closed or another IOException occurs. - */ - public final int skipBytes(int count) throws IOException { - int skipped = 0; - long skip; - while (skipped < count && (skip = in.skip(count - skipped)) != 0) { - skipped += skip; - } - if (skipped < 0) { - throw new EOFException(); - } - return skipped; - } - - public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset, - int utfSize) throws UTFDataFormatException { - int count = 0, s = 0, a; - while (count < utfSize) { - if ((out[s] = (char) buf[offset + count++]) < '\u0080') { - s++; - } else if (((a = out[s]) & 0xe0) == 0xc0) { - if (count >= utfSize) { - throw new UTFDataFormatException(); - } - int b = buf[count++]; - if ((b & 0xC0) != 0x80) { - throw new UTFDataFormatException(); - } - out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F)); - } else if ((a & 0xf0) == 0xe0) { - if (count + 1 >= utfSize) { - throw new UTFDataFormatException(); - } - int b = buf[count++]; - int c = buf[count++]; - if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) { - throw new UTFDataFormatException(); - } - out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F)); - } else { - throw new UTFDataFormatException(); - } - } - return new String(out, 0, s); - } - - @Override - public void seek(long pos) throws IOException { - buffer.seek((int)pos); - } - - @Override - public long getPos() throws IOException { - return buffer.getPosition(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java deleted file mode 100644 index 3944f38..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all - * synchronized modifiers. - */ -public class NonSyncDataOutputBuffer extends DataOutputStream { - - private final NonSyncByteArrayOutputStream buffer; - - /** Constructs a new empty buffer. */ - public NonSyncDataOutputBuffer() { - this(new NonSyncByteArrayOutputStream()); - } - - private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) { - super(buffer); - this.buffer = buffer; - } - - /** - * Returns the current contents of the buffer. Data is only valid to - * {@link #getLength()}. - */ - public byte[] getData() { - return buffer.getData(); - } - - /** Returns the length of the valid data currently in the buffer. */ - public int getLength() { - return buffer.getLength(); - } - - /** Resets the buffer to empty. */ - public NonSyncDataOutputBuffer reset() { - written = 0; - buffer.reset(); - return this; - } - - /** Writes bytes from a DataInput directly into the buffer. */ - public void write(DataInput in, int length) throws IOException { - buffer.write(in, length); - } - - @Override - public void write(int b) throws IOException { - buffer.write(b); - incCount(1); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - buffer.write(b, off, len); - incCount(len); - } - - public void writeTo(DataOutputStream out) throws IOException { - buffer.writeTo(out); - } - - private void incCount(int value) { - if (written + value < 0) { - written = Integer.MAX_VALUE; - } else { - written += value; - } - } -}
