http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DataReaderProperties.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java b/orc/src/java/org/apache/orc/impl/DataReaderProperties.java deleted file mode 100644 index 22301e8..0000000 --- a/orc/src/java/org/apache/orc/impl/DataReaderProperties.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.orc.CompressionKind; - -import javax.annotation.Nullable; - -public final class DataReaderProperties { - - private final FileSystem fileSystem; - private final Path path; - private final CompressionKind compression; - private final boolean zeroCopy; - private final int typeCount; - private final int bufferSize; - - private DataReaderProperties(Builder builder) { - this.fileSystem = builder.fileSystem; - this.path = builder.path; - this.compression = builder.compression; - this.zeroCopy = builder.zeroCopy; - this.typeCount = builder.typeCount; - this.bufferSize = builder.bufferSize; - } - - public FileSystem getFileSystem() { - return fileSystem; - } - - public Path getPath() { - return path; - } - - public CompressionKind getCompression() { - return compression; - } - - public boolean getZeroCopy() { - return zeroCopy; - } - - public int getTypeCount() { - return typeCount; - } - - public int getBufferSize() { - return bufferSize; - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private FileSystem fileSystem; - private Path path; - private CompressionKind compression; - private boolean zeroCopy; - private int typeCount; - private int bufferSize; - - private Builder() { - - } - - public Builder withFileSystem(FileSystem fileSystem) { - this.fileSystem = fileSystem; - return this; - } - - public Builder withPath(Path path) { - this.path = path; - return this; - } - - public Builder withCompression(CompressionKind value) { - this.compression = value; - return this; - } - - public Builder withZeroCopy(boolean zeroCopy) { - this.zeroCopy = zeroCopy; - return this; - } - - public Builder withTypeCount(int value) { - this.typeCount = value; - return this; - } - - public Builder withBufferSize(int value) { - this.bufferSize = value; - return this; - } - - public DataReaderProperties build() { - Preconditions.checkNotNull(fileSystem); - Preconditions.checkNotNull(path); - - return new DataReaderProperties(this); - } - - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java deleted file mode 100644 index 7e0110d..0000000 --- a/orc/src/java/org/apache/orc/impl/DirectDecompressionCodec.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import org.apache.orc.CompressionCodec; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface DirectDecompressionCodec extends CompressionCodec { - public boolean isAvailable(); - public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicByteArray.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java b/orc/src/java/org/apache/orc/impl/DynamicByteArray.java deleted file mode 100644 index 986c2ac..0000000 --- a/orc/src/java/org/apache/orc/impl/DynamicByteArray.java +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -import org.apache.hadoop.io.Text; - -/** - * A class that is a growable array of bytes. Growth is managed in terms of - * chunks that are allocated when needed. - */ -public final class DynamicByteArray { - static final int DEFAULT_CHUNKSIZE = 32 * 1024; - static final int DEFAULT_NUM_CHUNKS = 128; - - private final int chunkSize; // our allocation sizes - private byte[][] data; // the real data - private int length; // max set element index +1 - private int initializedChunks = 0; // the number of chunks created - - public DynamicByteArray() { - this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE); - } - - public DynamicByteArray(int numChunks, int chunkSize) { - if (chunkSize == 0) { - throw new IllegalArgumentException("bad chunksize"); - } - this.chunkSize = chunkSize; - data = new byte[numChunks][]; - } - - /** - * Ensure that the given index is valid. - */ - private void grow(int chunkIndex) { - if (chunkIndex >= initializedChunks) { - if (chunkIndex >= data.length) { - int newSize = Math.max(chunkIndex + 1, 2 * data.length); - byte[][] newChunk = new byte[newSize][]; - System.arraycopy(data, 0, newChunk, 0, data.length); - data = newChunk; - } - for(int i=initializedChunks; i <= chunkIndex; ++i) { - data[i] = new byte[chunkSize]; - } - initializedChunks = chunkIndex + 1; - } - } - - public byte get(int index) { - if (index >= length) { - throw new IndexOutOfBoundsException("Index " + index + - " is outside of 0.." + - (length - 1)); - } - int i = index / chunkSize; - int j = index % chunkSize; - return data[i][j]; - } - - public void set(int index, byte value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] = value; - } - - public int add(byte value) { - int i = length / chunkSize; - int j = length % chunkSize; - grow(i); - data[i][j] = value; - int result = length; - length += 1; - return result; - } - - /** - * Copy a slice of a byte array into our buffer. - * @param value the array to copy from - * @param valueOffset the first location to copy from value - * @param valueLength the number of bytes to copy from value - * @return the offset of the start of the value - */ - public int add(byte[] value, int valueOffset, int valueLength) { - int i = length / chunkSize; - int j = length % chunkSize; - grow((length + valueLength) / chunkSize); - int remaining = valueLength; - while (remaining > 0) { - int size = Math.min(remaining, chunkSize - j); - System.arraycopy(value, valueOffset, data[i], j, size); - remaining -= size; - valueOffset += size; - i += 1; - j = 0; - } - int result = length; - length += valueLength; - return result; - } - - /** - * Read the entire stream into this array. - * @param in the stream to read from - * @throws IOException - */ - public void readAll(InputStream in) throws IOException { - int currentChunk = length / chunkSize; - int currentOffset = length % chunkSize; - grow(currentChunk); - int currentLength = in.read(data[currentChunk], currentOffset, - chunkSize - currentOffset); - while (currentLength > 0) { - length += currentLength; - currentOffset = length % chunkSize; - if (currentOffset == 0) { - currentChunk = length / chunkSize; - grow(currentChunk); - } - currentLength = in.read(data[currentChunk], currentOffset, - chunkSize - currentOffset); - } - } - - /** - * Byte compare a set of bytes against the bytes in this dynamic array. - * @param other source of the other bytes - * @param otherOffset start offset in the other array - * @param otherLength number of bytes in the other array - * @param ourOffset the offset in our array - * @param ourLength the number of bytes in our array - * @return negative for less, 0 for equal, positive for greater - */ - public int compare(byte[] other, int otherOffset, int otherLength, - int ourOffset, int ourLength) { - int currentChunk = ourOffset / chunkSize; - int currentOffset = ourOffset % chunkSize; - int maxLength = Math.min(otherLength, ourLength); - while (maxLength > 0 && - other[otherOffset] == data[currentChunk][currentOffset]) { - otherOffset += 1; - currentOffset += 1; - if (currentOffset == chunkSize) { - currentChunk += 1; - currentOffset = 0; - } - maxLength -= 1; - } - if (maxLength == 0) { - return otherLength - ourLength; - } - int otherByte = 0xff & other[otherOffset]; - int ourByte = 0xff & data[currentChunk][currentOffset]; - return otherByte > ourByte ? 1 : -1; - } - - /** - * Get the size of the array. - * @return the number of bytes in the array - */ - public int size() { - return length; - } - - /** - * Clear the array to its original pristine state. - */ - public void clear() { - length = 0; - for(int i=0; i < data.length; ++i) { - data[i] = null; - } - initializedChunks = 0; - } - - /** - * Set a text value from the bytes in this dynamic array. - * @param result the value to set - * @param offset the start of the bytes to copy - * @param length the number of bytes to copy - */ - public void setText(Text result, int offset, int length) { - result.clear(); - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - int currentLength = Math.min(length, chunkSize - currentOffset); - while (length > 0) { - result.append(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(length, chunkSize - currentOffset); - } - } - - /** - * Write out a range of this dynamic array to an output stream. - * @param out the stream to write to - * @param offset the first offset to write - * @param length the number of bytes to write - * @throws IOException - */ - public void write(OutputStream out, int offset, - int length) throws IOException { - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - while (length > 0) { - int currentLength = Math.min(length, chunkSize - currentOffset); - out.write(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - } - } - - @Override - public String toString() { - int i; - StringBuilder sb = new StringBuilder(length * 3); - - sb.append('{'); - int l = length - 1; - for (i=0; i<l; i++) { - sb.append(Integer.toHexString(get(i))); - sb.append(','); - } - sb.append(get(i)); - sb.append('}'); - - return sb.toString(); - } - - public void setByteBuffer(ByteBuffer result, int offset, int length) { - result.clear(); - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - int currentLength = Math.min(length, chunkSize - currentOffset); - while (length > 0) { - result.put(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(length, chunkSize - currentOffset); - } - } - - /** - * Gets all the bytes of the array. - * - * @return Bytes of the array - */ - public byte[] get() { - byte[] result = null; - if (length > 0) { - int currentChunk = 0; - int currentOffset = 0; - int currentLength = Math.min(length, chunkSize); - int destOffset = 0; - result = new byte[length]; - int totalLength = length; - while (totalLength > 0) { - System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength); - destOffset += currentLength; - totalLength -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(totalLength, chunkSize - currentOffset); - } - } - return result; - } - - /** - * Get the size of the buffers. - */ - public long getSizeInBytes() { - return initializedChunks * chunkSize; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/DynamicIntArray.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java b/orc/src/java/org/apache/orc/impl/DynamicIntArray.java deleted file mode 100644 index 3b2884b..0000000 --- a/orc/src/java/org/apache/orc/impl/DynamicIntArray.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -/** - * Dynamic int array that uses primitive types and chunks to avoid copying - * large number of integers when it resizes. - * - * The motivation for this class is memory optimization, i.e. space efficient - * storage of potentially huge arrays without good a-priori size guesses. - * - * The API of this class is between a primitive array and a AbstractList. It's - * not a Collection implementation because it handles primitive types, but the - * API could be extended to support iterators and the like. - * - * NOTE: Like standard Collection implementations/arrays, this class is not - * synchronized. - */ -public final class DynamicIntArray { - static final int DEFAULT_CHUNKSIZE = 8 * 1024; - static final int INIT_CHUNKS = 128; - - private final int chunkSize; // our allocation size - private int[][] data; // the real data - private int length; // max set element index +1 - private int initializedChunks = 0; // the number of created chunks - - public DynamicIntArray() { - this(DEFAULT_CHUNKSIZE); - } - - public DynamicIntArray(int chunkSize) { - this.chunkSize = chunkSize; - - data = new int[INIT_CHUNKS][]; - } - - /** - * Ensure that the given index is valid. - */ - private void grow(int chunkIndex) { - if (chunkIndex >= initializedChunks) { - if (chunkIndex >= data.length) { - int newSize = Math.max(chunkIndex + 1, 2 * data.length); - int[][] newChunk = new int[newSize][]; - System.arraycopy(data, 0, newChunk, 0, data.length); - data = newChunk; - } - for (int i=initializedChunks; i <= chunkIndex; ++i) { - data[i] = new int[chunkSize]; - } - initializedChunks = chunkIndex + 1; - } - } - - public int get(int index) { - if (index >= length) { - throw new IndexOutOfBoundsException("Index " + index + - " is outside of 0.." + - (length - 1)); - } - int i = index / chunkSize; - int j = index % chunkSize; - return data[i][j]; - } - - public void set(int index, int value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] = value; - } - - public void increment(int index, int value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] += value; - } - - public void add(int value) { - int i = length / chunkSize; - int j = length % chunkSize; - grow(i); - data[i][j] = value; - length += 1; - } - - public int size() { - return length; - } - - public void clear() { - length = 0; - for(int i=0; i < data.length; ++i) { - data[i] = null; - } - initializedChunks = 0; - } - - public String toString() { - int i; - StringBuilder sb = new StringBuilder(length * 4); - - sb.append('{'); - int l = length - 1; - for (i=0; i<l; i++) { - sb.append(get(i)); - sb.append(','); - } - sb.append(get(i)); - sb.append('}'); - - return sb.toString(); - } - - public int getSizeInBytes() { - return 4 * initializedChunks * chunkSize; - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims.java b/orc/src/java/org/apache/orc/impl/HadoopShims.java deleted file mode 100644 index ef7d70f..0000000 --- a/orc/src/java/org/apache/orc/impl/HadoopShims.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.VersionInfo; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -public interface HadoopShims { - - enum DirectCompressionType { - NONE, - ZLIB_NOHEADER, - ZLIB, - SNAPPY, - } - - interface DirectDecompressor { - void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException; - } - - /** - * Get a direct decompressor codec, if it is available - * @param codec - * @return - */ - DirectDecompressor getDirectDecompressor(DirectCompressionType codec); - - /** - * a hadoop.io ByteBufferPool shim. - */ - public interface ByteBufferPoolShim { - /** - * Get a new ByteBuffer from the pool. The pool can provide this from - * removing a buffer from its internal cache, or by allocating a - * new buffer. - * - * @param direct Whether the buffer should be direct. - * @param length The minimum length the buffer will have. - * @return A new ByteBuffer. Its capacity can be less - * than what was requested, but must be at - * least 1 byte. - */ - ByteBuffer getBuffer(boolean direct, int length); - - /** - * Release a buffer back to the pool. - * The pool may choose to put this buffer into its cache/free it. - * - * @param buffer a direct bytebuffer - */ - void putBuffer(ByteBuffer buffer); - } - - /** - * Provides an HDFS ZeroCopyReader shim. - * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to) - * @param in ByteBufferPoolShim to allocate fallback buffers with - * - * @return returns null if not supported - */ - public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException; - - public interface ZeroCopyReaderShim extends Closeable { - /** - * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer. - * Also move the in stream by that amount. The data read can be small than maxLength. - * - * @return ByteBuffer read from the stream, - */ - public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException; - /** - * Release a ByteBuffer obtained from a read on the - * Also move the in stream by that amount. The data read can be small than maxLength. - * - */ - public void releaseBuffer(ByteBuffer buffer); - - /** - * Close the underlying stream. - * @throws IOException - */ - public void close() throws IOException; - } - /** - * Read data into a Text object in the fastest way possible - */ - public interface TextReaderShim { - /** - * @param txt - * @param size - * @return bytes read - * @throws IOException - */ - void read(Text txt, int size) throws IOException; - } - - /** - * Wrap a TextReaderShim around an input stream. The reader shim will not - * buffer any reads from the underlying stream and will only consume bytes - * which are required for TextReaderShim.read() input. - */ - public TextReaderShim getTextReaderShim(InputStream input) throws IOException; - - class Factory { - private static HadoopShims SHIMS = null; - - public static synchronized HadoopShims get() { - if (SHIMS == null) { - String[] versionParts = VersionInfo.getVersion().split("[.]"); - int major = Integer.parseInt(versionParts[0]); - int minor = Integer.parseInt(versionParts[1]); - if (major < 2 || (major == 2 && minor < 3)) { - SHIMS = new HadoopShims_2_2(); - } else { - SHIMS = new HadoopShimsCurrent(); - } - } - return SHIMS; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java deleted file mode 100644 index 5c53f74..0000000 --- a/orc/src/java/org/apache/orc/impl/HadoopShimsCurrent.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; -import org.apache.hadoop.io.compress.zlib.ZlibDecompressor; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * Shims for recent versions of Hadoop - */ -public class HadoopShimsCurrent implements HadoopShims { - - private static class DirectDecompressWrapper implements DirectDecompressor { - private final org.apache.hadoop.io.compress.DirectDecompressor root; - - DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) { - this.root = root; - } - - public void decompress(ByteBuffer input, - ByteBuffer output) throws IOException { - root.decompress(input, output); - } - } - - public DirectDecompressor getDirectDecompressor( - DirectCompressionType codec) { - switch (codec) { - case ZLIB: - return new DirectDecompressWrapper - (new ZlibDecompressor.ZlibDirectDecompressor()); - case ZLIB_NOHEADER: - return new DirectDecompressWrapper - (new ZlibDecompressor.ZlibDirectDecompressor - (ZlibDecompressor.CompressionHeader.NO_HEADER, 0)); - case SNAPPY: - return new DirectDecompressWrapper - (new SnappyDecompressor.SnappyDirectDecompressor()); - default: - return null; - } - } - - @Override - public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, - ByteBufferPoolShim pool - ) throws IOException { - return ZeroCopyShims.getZeroCopyReader(in, pool); - } - - private final class FastTextReaderShim implements TextReaderShim { - private final DataInputStream din; - - public FastTextReaderShim(InputStream in) { - this.din = new DataInputStream(in); - } - - @Override - public void read(Text txt, int len) throws IOException { - txt.readWithKnownLength(din, len); - } - } - - @Override - public TextReaderShim getTextReaderShim(InputStream in) throws IOException { - return new FastTextReaderShim(in); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java deleted file mode 100644 index 3f65e74..0000000 --- a/orc/src/java/org/apache/orc/impl/HadoopShims_2_2.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Method; - -/** - * Shims for versions of Hadoop up to and including 2.2.x - */ -public class HadoopShims_2_2 implements HadoopShims { - - final boolean zeroCopy; - final boolean fastRead; - - HadoopShims_2_2() { - boolean zcr = false; - try { - Class.forName("org.apache.hadoop.fs.CacheFlag", false, - HadoopShims_2_2.class.getClassLoader()); - zcr = true; - } catch (ClassNotFoundException ce) { - } - zeroCopy = zcr; - boolean fastRead = false; - if (zcr) { - for (Method m : Text.class.getMethods()) { - if ("readWithKnownLength".equals(m.getName())) { - fastRead = true; - } - } - } - this.fastRead = fastRead; - } - - public DirectDecompressor getDirectDecompressor( - DirectCompressionType codec) { - return null; - } - - @Override - public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, - ByteBufferPoolShim pool - ) throws IOException { - if(zeroCopy) { - return ZeroCopyShims.getZeroCopyReader(in, pool); - } - /* not supported */ - return null; - } - - private final class BasicTextReaderShim implements TextReaderShim { - private final InputStream in; - - public BasicTextReaderShim(InputStream in) { - this.in = in; - } - - @Override - public void read(Text txt, int len) throws IOException { - int offset = 0; - byte[] bytes = new byte[len]; - while (len > 0) { - int written = in.read(bytes, offset, len); - if (written < 0) { - throw new EOFException("Can't finish read from " + in + " read " - + (offset) + " bytes out of " + bytes.length); - } - len -= written; - offset += written; - } - txt.set(bytes); - } - } - - @Override - public TextReaderShim getTextReaderShim(InputStream in) throws IOException { - return new BasicTextReaderShim(in); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/InStream.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/InStream.java b/orc/src/java/org/apache/orc/impl/InStream.java deleted file mode 100644 index 851f645..0000000 --- a/orc/src/java/org/apache/orc/impl/InStream.java +++ /dev/null @@ -1,498 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.ListIterator; - -import org.apache.orc.CompressionCodec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.io.DiskRange; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.CodedInputStream; - -public abstract class InStream extends InputStream { - - private static final Logger LOG = LoggerFactory.getLogger(InStream.class); - public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB - - protected final String name; - protected long length; - - public InStream(String name, long length) { - this.name = name; - this.length = length; - } - - public String getStreamName() { - return name; - } - - public long getStreamLength() { - return length; - } - - @Override - public abstract void close(); - - public static class UncompressedStream extends InStream { - private List<DiskRange> bytes; - private long length; - protected long currentOffset; - private ByteBuffer range; - private int currentRange; - - public UncompressedStream(String name, List<DiskRange> input, long length) { - super(name, length); - reset(input, length); - } - - protected void reset(List<DiskRange> input, long length) { - this.bytes = input; - this.length = length; - currentRange = 0; - currentOffset = 0; - range = null; - } - - @Override - public int read() { - if (range == null || range.remaining() == 0) { - if (currentOffset == length) { - return -1; - } - seek(currentOffset); - } - currentOffset += 1; - return 0xff & range.get(); - } - - @Override - public int read(byte[] data, int offset, int length) { - if (range == null || range.remaining() == 0) { - if (currentOffset == this.length) { - return -1; - } - seek(currentOffset); - } - int actualLength = Math.min(length, range.remaining()); - range.get(data, offset, actualLength); - currentOffset += actualLength; - return actualLength; - } - - @Override - public int available() { - if (range != null && range.remaining() > 0) { - return range.remaining(); - } - return (int) (length - currentOffset); - } - - @Override - public void close() { - currentRange = bytes.size(); - currentOffset = length; - // explicit de-ref of bytes[] - bytes.clear(); - } - - @Override - public void seek(PositionProvider index) throws IOException { - seek(index.getNext()); - } - - public void seek(long desired) { - if (desired == 0 && bytes.isEmpty()) { - logEmptySeek(name); - return; - } - int i = 0; - for (DiskRange curRange : bytes) { - if (desired == 0 && curRange.getData().remaining() == 0) { - logEmptySeek(name); - return; - } - if (curRange.getOffset() <= desired && - (desired - curRange.getOffset()) < curRange.getLength()) { - currentOffset = desired; - currentRange = i; - this.range = curRange.getData().duplicate(); - int pos = range.position(); - pos += (int)(desired - curRange.getOffset()); // this is why we duplicate - this.range.position(pos); - return; - } - ++i; - } - // if they are seeking to the precise end, go ahead and let them go there - int segments = bytes.size(); - if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { - currentOffset = desired; - currentRange = segments - 1; - DiskRange curRange = bytes.get(currentRange); - this.range = curRange.getData().duplicate(); - int pos = range.position(); - pos += (int)(desired - curRange.getOffset()); // this is why we duplicate - this.range.position(pos); - return; - } - throw new IllegalArgumentException("Seek in " + name + " to " + - desired + " is outside of the data"); - } - - @Override - public String toString() { - return "uncompressed stream " + name + " position: " + currentOffset + - " length: " + length + " range: " + currentRange + - " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit()); - } - } - - private static ByteBuffer allocateBuffer(int size, boolean isDirect) { - // TODO: use the same pool as the ORC readers - if (isDirect) { - return ByteBuffer.allocateDirect(size); - } else { - return ByteBuffer.allocate(size); - } - } - - private static class CompressedStream extends InStream { - private final List<DiskRange> bytes; - private final int bufferSize; - private ByteBuffer uncompressed; - private final CompressionCodec codec; - private ByteBuffer compressed; - private long currentOffset; - private int currentRange; - private boolean isUncompressedOriginal; - - public CompressedStream(String name, List<DiskRange> input, long length, - CompressionCodec codec, int bufferSize) { - super(name, length); - this.bytes = input; - this.codec = codec; - this.bufferSize = bufferSize; - currentOffset = 0; - currentRange = 0; - } - - private void allocateForUncompressed(int size, boolean isDirect) { - uncompressed = allocateBuffer(size, isDirect); - } - - private void readHeader() throws IOException { - if (compressed == null || compressed.remaining() <= 0) { - seek(currentOffset); - } - if (compressed.remaining() > OutStream.HEADER_SIZE) { - int b0 = compressed.get() & 0xff; - int b1 = compressed.get() & 0xff; - int b2 = compressed.get() & 0xff; - boolean isOriginal = (b0 & 0x01) == 1; - int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); - - if (chunkLength > bufferSize) { - throw new IllegalArgumentException("Buffer size too small. size = " + - bufferSize + " needed = " + chunkLength); - } - // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always - assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream"; - currentOffset += OutStream.HEADER_SIZE; - - ByteBuffer slice = this.slice(chunkLength); - - if (isOriginal) { - uncompressed = slice; - isUncompressedOriginal = true; - } else { - if (isUncompressedOriginal) { - allocateForUncompressed(bufferSize, slice.isDirect()); - isUncompressedOriginal = false; - } else if (uncompressed == null) { - allocateForUncompressed(bufferSize, slice.isDirect()); - } else { - uncompressed.clear(); - } - codec.decompress(slice, uncompressed); - } - } else { - throw new IllegalStateException("Can't read header at " + this); - } - } - - @Override - public int read() throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == length) { - return -1; - } - readHeader(); - } - return 0xff & uncompressed.get(); - } - - @Override - public int read(byte[] data, int offset, int length) throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == this.length) { - return -1; - } - readHeader(); - } - int actualLength = Math.min(length, uncompressed.remaining()); - uncompressed.get(data, offset, actualLength); - return actualLength; - } - - @Override - public int available() throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == length) { - return 0; - } - readHeader(); - } - return uncompressed.remaining(); - } - - @Override - public void close() { - uncompressed = null; - compressed = null; - currentRange = bytes.size(); - currentOffset = length; - bytes.clear(); - } - - @Override - public void seek(PositionProvider index) throws IOException { - seek(index.getNext()); - long uncompressedBytes = index.getNext(); - if (uncompressedBytes != 0) { - readHeader(); - uncompressed.position(uncompressed.position() + - (int) uncompressedBytes); - } else if (uncompressed != null) { - // mark the uncompressed buffer as done - uncompressed.position(uncompressed.limit()); - } - } - - /* slices a read only contiguous buffer of chunkLength */ - private ByteBuffer slice(int chunkLength) throws IOException { - int len = chunkLength; - final long oldOffset = currentOffset; - ByteBuffer slice; - if (compressed.remaining() >= len) { - slice = compressed.slice(); - // simple case - slice.limit(len); - currentOffset += len; - compressed.position(compressed.position() + len); - return slice; - } else if (currentRange >= (bytes.size() - 1)) { - // nothing has been modified yet - throw new IOException("EOF in " + this + " while trying to read " + - chunkLength + " bytes"); - } - - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)", - compressed.remaining(), len)); - } - - // we need to consolidate 2 or more buffers into 1 - // first copy out compressed buffers - ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); - currentOffset += compressed.remaining(); - len -= compressed.remaining(); - copy.put(compressed); - ListIterator<DiskRange> iter = bytes.listIterator(currentRange); - - while (len > 0 && iter.hasNext()) { - ++currentRange; - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); - } - DiskRange range = iter.next(); - compressed = range.getData().duplicate(); - if (compressed.remaining() >= len) { - slice = compressed.slice(); - slice.limit(len); - copy.put(slice); - currentOffset += len; - compressed.position(compressed.position() + len); - return copy; - } - currentOffset += compressed.remaining(); - len -= compressed.remaining(); - copy.put(compressed); - } - - // restore offsets for exception clarity - seek(oldOffset); - throw new IOException("EOF in " + this + " while trying to read " + - chunkLength + " bytes"); - } - - private void seek(long desired) throws IOException { - if (desired == 0 && bytes.isEmpty()) { - logEmptySeek(name); - return; - } - int i = 0; - for (DiskRange range : bytes) { - if (range.getOffset() <= desired && desired < range.getEnd()) { - currentRange = i; - compressed = range.getData().duplicate(); - int pos = compressed.position(); - pos += (int)(desired - range.getOffset()); - compressed.position(pos); - currentOffset = desired; - return; - } - ++i; - } - // if they are seeking to the precise end, go ahead and let them go there - int segments = bytes.size(); - if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { - DiskRange range = bytes.get(segments - 1); - currentRange = segments - 1; - compressed = range.getData().duplicate(); - compressed.position(compressed.limit()); - currentOffset = desired; - return; - } - throw new IOException("Seek outside of data in " + this + " to " + desired); - } - - private String rangeString() { - StringBuilder builder = new StringBuilder(); - int i = 0; - for (DiskRange range : bytes) { - if (i != 0) { - builder.append("; "); - } - builder.append(" range " + i + " = " + range.getOffset() - + " to " + (range.getEnd() - range.getOffset())); - ++i; - } - return builder.toString(); - } - - @Override - public String toString() { - return "compressed stream " + name + " position: " + currentOffset + - " length: " + length + " range: " + currentRange + - " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) + - rangeString() + - (uncompressed == null ? "" : - " uncompressed: " + uncompressed.position() + " to " + - uncompressed.limit()); - } - } - - public abstract void seek(PositionProvider index) throws IOException; - - private static void logEmptySeek(String name) { - if (LOG.isWarnEnabled()) { - LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream."); - } - } - - /** - * Create an input stream from a list of buffers. - * @param streamName the name of the stream - * @param buffers the list of ranges of bytes for the stream - * @param offsets a list of offsets (the same length as input) that must - * contain the first offset of the each set of bytes in input - * @param length the length in bytes of the stream - * @param codec the compression codec - * @param bufferSize the compression buffer size - * @return an input stream - * @throws IOException - */ - @VisibleForTesting - @Deprecated - public static InStream create(String streamName, - ByteBuffer[] buffers, - long[] offsets, - long length, - CompressionCodec codec, - int bufferSize) throws IOException { - List<DiskRange> input = new ArrayList<DiskRange>(buffers.length); - for (int i = 0; i < buffers.length; ++i) { - input.add(new BufferChunk(buffers[i], offsets[i])); - } - return create(streamName, input, length, codec, bufferSize); - } - - /** - * Create an input stream from a list of disk ranges with data. - * @param name the name of the stream - * @param input the list of ranges of bytes for the stream; from disk or cache - * @param length the length in bytes of the stream - * @param codec the compression codec - * @param bufferSize the compression buffer size - * @return an input stream - * @throws IOException - */ - public static InStream create(String name, - List<DiskRange> input, - long length, - CompressionCodec codec, - int bufferSize) throws IOException { - if (codec == null) { - return new UncompressedStream(name, input, length); - } else { - return new CompressedStream(name, input, length, codec, bufferSize); - } - } - - /** - * Creates coded input stream (used for protobuf message parsing) with higher message size limit. - * - * @param name the name of the stream - * @param input the list of ranges of bytes for the stream; from disk or cache - * @param length the length in bytes of the stream - * @param codec the compression codec - * @param bufferSize the compression buffer size - * @return coded input stream - * @throws IOException - */ - public static CodedInputStream createCodedInputStream( - String name, - List<DiskRange> input, - long length, - CompressionCodec codec, - int bufferSize) throws IOException { - InStream inStream = create(name, input, length, codec, bufferSize); - CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); - codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); - return codedInputStream; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/IntegerReader.java b/orc/src/java/org/apache/orc/impl/IntegerReader.java deleted file mode 100644 index 3e64d54..0000000 --- a/orc/src/java/org/apache/orc/impl/IntegerReader.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.io.IOException; - -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; - -/** - * Interface for reading integers. - */ -public interface IntegerReader { - - /** - * Seek to the position provided by index. - * @param index - * @throws IOException - */ - void seek(PositionProvider index) throws IOException; - - /** - * Skip number of specified rows. - * @param numValues - * @throws IOException - */ - void skip(long numValues) throws IOException; - - /** - * Check if there are any more values left. - * @return - * @throws IOException - */ - boolean hasNext() throws IOException; - - /** - * Return the next available value. - * @return - * @throws IOException - */ - long next() throws IOException; - - /** - * Return the next available vector for values. - * @param column the column being read - * @param data the vector to read into - * @param length the number of numbers to read - * @throws IOException - */ - void nextVector(ColumnVector column, - long[] data, - int length - ) throws IOException; - - /** - * Return the next available vector for values. Does not change the - * value of column.isRepeating. - * @param column the column being read - * @param data the vector to read into - * @param length the number of numbers to read - * @throws IOException - */ - void nextVector(ColumnVector column, - int[] data, - int length - ) throws IOException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/IntegerWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/IntegerWriter.java b/orc/src/java/org/apache/orc/impl/IntegerWriter.java deleted file mode 100644 index 419054f..0000000 --- a/orc/src/java/org/apache/orc/impl/IntegerWriter.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.io.IOException; - -/** - * Interface for writing integers. - */ -public interface IntegerWriter { - - /** - * Get position from the stream. - * @param recorder - * @throws IOException - */ - void getPosition(PositionRecorder recorder) throws IOException; - - /** - * Write the integer value - * @param value - * @throws IOException - */ - void write(long value) throws IOException; - - /** - * Flush the buffer - * @throws IOException - */ - void flush() throws IOException; -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/MemoryManager.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/MemoryManager.java b/orc/src/java/org/apache/orc/impl/MemoryManager.java deleted file mode 100644 index 757c0b4..0000000 --- a/orc/src/java/org/apache/orc/impl/MemoryManager.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.orc.OrcConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Preconditions; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Implements a memory manager that keeps a global context of how many ORC - * writers there are and manages the memory between them. For use cases with - * dynamic partitions, it is easy to end up with many writers in the same task. - * By managing the size of each allocation, we try to cut down the size of each - * allocation and keep the task from running out of memory. - * - * This class is not thread safe, but is re-entrant - ensure creation and all - * invocations are triggered from the same thread. - */ -public class MemoryManager { - - private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class); - - /** - * How often should we check the memory sizes? Measured in rows added - * to all of the writers. - */ - private static final int ROWS_BETWEEN_CHECKS = 5000; - private final long totalMemoryPool; - private final Map<Path, WriterInfo> writerList = - new HashMap<Path, WriterInfo>(); - private long totalAllocation = 0; - private double currentScale = 1; - private int rowsAddedSinceCheck = 0; - private final OwnedLock ownerLock = new OwnedLock(); - - @SuppressWarnings("serial") - private static class OwnedLock extends ReentrantLock { - public Thread getOwner() { - return super.getOwner(); - } - } - - private static class WriterInfo { - long allocation; - Callback callback; - WriterInfo(long allocation, Callback callback) { - this.allocation = allocation; - this.callback = callback; - } - } - - public interface Callback { - /** - * The writer needs to check its memory usage - * @param newScale the current scale factor for memory allocations - * @return true if the writer was over the limit - * @throws IOException - */ - boolean checkMemory(double newScale) throws IOException; - } - - /** - * Create the memory manager. - * @param conf use the configuration to find the maximum size of the memory - * pool. - */ - public MemoryManager(Configuration conf) { - double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); - totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax() * maxLoad); - ownerLock.lock(); - } - - /** - * Light weight thread-safety check for multi-threaded access patterns - */ - private void checkOwner() { - if (!ownerLock.isHeldByCurrentThread()) { - LOG.warn("Owner thread expected {}, got {}", - ownerLock.getOwner(), Thread.currentThread()); - } - } - - /** - * Add a new writer's memory allocation to the pool. We use the path - * as a unique key to ensure that we don't get duplicates. - * @param path the file that is being written - * @param requestedAllocation the requested buffer size - */ - public void addWriter(Path path, long requestedAllocation, - Callback callback) throws IOException { - checkOwner(); - WriterInfo oldVal = writerList.get(path); - // this should always be null, but we handle the case where the memory - // manager wasn't told that a writer wasn't still in use and the task - // starts writing to the same path. - if (oldVal == null) { - oldVal = new WriterInfo(requestedAllocation, callback); - writerList.put(path, oldVal); - totalAllocation += requestedAllocation; - } else { - // handle a new writer that is writing to the same path - totalAllocation += requestedAllocation - oldVal.allocation; - oldVal.allocation = requestedAllocation; - oldVal.callback = callback; - } - updateScale(true); - } - - /** - * Remove the given writer from the pool. - * @param path the file that has been closed - */ - public void removeWriter(Path path) throws IOException { - checkOwner(); - WriterInfo val = writerList.get(path); - if (val != null) { - writerList.remove(path); - totalAllocation -= val.allocation; - if (writerList.isEmpty()) { - rowsAddedSinceCheck = 0; - } - updateScale(false); - } - if(writerList.isEmpty()) { - rowsAddedSinceCheck = 0; - } - } - - /** - * Get the total pool size that is available for ORC writers. - * @return the number of bytes in the pool - */ - public long getTotalMemoryPool() { - return totalMemoryPool; - } - - /** - * The scaling factor for each allocation to ensure that the pool isn't - * oversubscribed. - * @return a fraction between 0.0 and 1.0 of the requested size that is - * available for each writer. - */ - public double getAllocationScale() { - return currentScale; - } - - /** - * Give the memory manager an opportunity for doing a memory check. - * @param rows number of rows added - * @throws IOException - */ - public void addedRow(int rows) throws IOException { - rowsAddedSinceCheck += rows; - if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { - notifyWriters(); - } - } - - /** - * Notify all of the writers that they should check their memory usage. - * @throws IOException - */ - public void notifyWriters() throws IOException { - checkOwner(); - LOG.debug("Notifying writers after " + rowsAddedSinceCheck); - for(WriterInfo writer: writerList.values()) { - boolean flushed = writer.callback.checkMemory(currentScale); - if (LOG.isDebugEnabled() && flushed) { - LOG.debug("flushed " + writer.toString()); - } - } - rowsAddedSinceCheck = 0; - } - - /** - * Update the currentScale based on the current allocation and pool size. - * This also updates the notificationTrigger. - * @param isAllocate is this an allocation? - */ - private void updateScale(boolean isAllocate) throws IOException { - if (totalAllocation <= totalMemoryPool) { - currentScale = 1; - } else { - currentScale = (double) totalMemoryPool / totalAllocation; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java b/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java deleted file mode 100644 index 7ca9e1d..0000000 --- a/orc/src/java/org/apache/orc/impl/OrcAcidUtils.java +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.orc.Reader; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; - -public class OrcAcidUtils { - public static final String ACID_STATS = "hive.acid.stats"; - public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; - - /** - * Get the filename of the ORC ACID side file that contains the lengths - * of the intermediate footers. - * @param main the main ORC filename - * @return the name of the side file - */ - public static Path getSideFile(Path main) { - return new Path(main + DELTA_SIDE_FILE_SUFFIX); - } - - /** - * Read the side file to get the last flush length. - * @param fs the file system to use - * @param deltaFile the path of the delta file - * @return the maximum size of the file to use - * @throws IOException - */ - public static long getLastFlushLength(FileSystem fs, - Path deltaFile) throws IOException { - Path lengths = getSideFile(deltaFile); - long result = Long.MAX_VALUE; - if(!fs.exists(lengths)) { - return result; - } - try (FSDataInputStream stream = fs.open(lengths)) { - result = -1; - while (stream.available() > 0) { - result = stream.readLong(); - } - return result; - } catch (IOException ioe) { - return result; - } - } - - private static final Charset utf8 = Charset.forName("UTF-8"); - private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); - - public static AcidStats parseAcidStats(Reader reader) { - if (reader.hasMetadataValue(ACID_STATS)) { - try { - ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate(); - return new AcidStats(utf8Decoder.decode(val).toString()); - } catch (CharacterCodingException e) { - throw new IllegalArgumentException("Bad string encoding for " + - ACID_STATS, e); - } - } else { - return null; - } - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcIndex.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/OrcIndex.java b/orc/src/java/org/apache/orc/impl/OrcIndex.java deleted file mode 100644 index 50a15f2..0000000 --- a/orc/src/java/org/apache/orc/impl/OrcIndex.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.orc.OrcProto; - -public final class OrcIndex { - OrcProto.RowIndex[] rowGroupIndex; - OrcProto.BloomFilterIndex[] bloomFilterIndex; - - public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) { - this.rowGroupIndex = rgIndex; - this.bloomFilterIndex = bfIndex; - } - - public OrcProto.RowIndex[] getRowGroupIndex() { - return rowGroupIndex; - } - - public OrcProto.BloomFilterIndex[] getBloomFilterIndex() { - return bloomFilterIndex; - } - - public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) { - this.rowGroupIndex = rowGroupIndex; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OrcTail.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/OrcTail.java b/orc/src/java/org/apache/orc/impl/OrcTail.java deleted file mode 100644 index f095603..0000000 --- a/orc/src/java/org/apache/orc/impl/OrcTail.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import static org.apache.orc.impl.ReaderImpl.extractMetadata; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.orc.CompressionCodec; -import org.apache.orc.CompressionKind; -import org.apache.orc.OrcFile; -import org.apache.orc.OrcProto; -import org.apache.orc.StripeInformation; -import org.apache.orc.StripeStatistics; - -// TODO: Make OrcTail implement FileMetadata or Reader interface -public final class OrcTail { - // postscript + footer - Serialized in OrcSplit - private final OrcProto.FileTail fileTail; - // serialized representation of metadata, footer and postscript - private final ByteBuffer serializedTail; - // used to invalidate cache entries - private final long fileModificationTime; - // lazily deserialized - private OrcProto.Metadata metadata; - - public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail) { - this(fileTail, serializedTail, -1); - } - - public OrcTail(OrcProto.FileTail fileTail, ByteBuffer serializedTail, long fileModificationTime) { - this.fileTail = fileTail; - this.serializedTail = serializedTail; - this.fileModificationTime = fileModificationTime; - this.metadata = null; - } - - public ByteBuffer getSerializedTail() { - return serializedTail; - } - - public long getFileModificationTime() { - return fileModificationTime; - } - - public OrcProto.Footer getFooter() { - return fileTail.getFooter(); - } - - public OrcProto.PostScript getPostScript() { - return fileTail.getPostscript(); - } - - public OrcFile.WriterVersion getWriterVersion() { - OrcProto.PostScript ps = fileTail.getPostscript(); - return (ps.hasWriterVersion() - ? OrcFile.WriterVersion.from(ps.getWriterVersion()) : OrcFile.WriterVersion.ORIGINAL); - } - - public List<StripeInformation> getStripes() { - List<StripeInformation> result = new ArrayList<>(fileTail.getFooter().getStripesCount()); - for (OrcProto.StripeInformation stripeProto : fileTail.getFooter().getStripesList()) { - result.add(new ReaderImpl.StripeInformationImpl(stripeProto)); - } - return result; - } - - public CompressionKind getCompressionKind() { - return CompressionKind.valueOf(fileTail.getPostscript().getCompression().name()); - } - - public CompressionCodec getCompressionCodec() { - return PhysicalFsWriter.createCodec(getCompressionKind()); - } - - public int getCompressionBufferSize() { - return (int) fileTail.getPostscript().getCompressionBlockSize(); - } - - public List<StripeStatistics> getStripeStatistics() throws IOException { - List<StripeStatistics> result = new ArrayList<>(); - List<OrcProto.StripeStatistics> ssProto = getStripeStatisticsProto(); - if (ssProto != null) { - for (OrcProto.StripeStatistics ss : ssProto) { - result.add(new StripeStatistics(ss.getColStatsList())); - } - } - return result; - } - - public List<OrcProto.StripeStatistics> getStripeStatisticsProto() throws IOException { - if (serializedTail == null) return null; - if (metadata == null) { - metadata = extractMetadata(serializedTail, 0, - (int) fileTail.getPostscript().getMetadataLength(), - getCompressionCodec(), getCompressionBufferSize()); - // clear does not clear the contents but sets position to 0 and limit = capacity - serializedTail.clear(); - } - return metadata.getStripeStatsList(); - } - - public int getMetadataSize() { - return (int) getPostScript().getMetadataLength(); - } - - public List<OrcProto.Type> getTypes() { - return getFooter().getTypesList(); - } - - public OrcProto.FileTail getFileTail() { - return fileTail; - } - - public OrcProto.FileTail getMinimalFileTail() { - OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder(fileTail); - OrcProto.Footer.Builder footerBuilder = OrcProto.Footer.newBuilder(fileTail.getFooter()); - footerBuilder.clearStatistics(); - fileTailBuilder.setFooter(footerBuilder.build()); - OrcProto.FileTail result = fileTailBuilder.build(); - return result; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/OutStream.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/OutStream.java b/orc/src/java/org/apache/orc/impl/OutStream.java deleted file mode 100644 index 81662cc..0000000 --- a/orc/src/java/org/apache/orc/impl/OutStream.java +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import org.apache.orc.CompressionCodec; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public class OutStream extends PositionedOutputStream { - - public interface OutputReceiver { - /** - * Output the given buffer to the final destination - * @param buffer the buffer to output - * @throws IOException - */ - void output(ByteBuffer buffer) throws IOException; - } - - public static final int HEADER_SIZE = 3; - private final String name; - private final OutputReceiver receiver; - // if enabled the stream will be suppressed when writing stripe - private boolean suppress; - - /** - * Stores the uncompressed bytes that have been serialized, but not - * compressed yet. When this fills, we compress the entire buffer. - */ - private ByteBuffer current = null; - - /** - * Stores the compressed bytes until we have a full buffer and then outputs - * them to the receiver. If no compression is being done, this (and overflow) - * will always be null and the current buffer will be sent directly to the - * receiver. - */ - private ByteBuffer compressed = null; - - /** - * Since the compressed buffer may start with contents from previous - * compression blocks, we allocate an overflow buffer so that the - * output of the codec can be split between the two buffers. After the - * compressed buffer is sent to the receiver, the overflow buffer becomes - * the new compressed buffer. - */ - private ByteBuffer overflow = null; - private final int bufferSize; - private final CompressionCodec codec; - private long compressedBytes = 0; - private long uncompressedBytes = 0; - - public OutStream(String name, - int bufferSize, - CompressionCodec codec, - OutputReceiver receiver) throws IOException { - this.name = name; - this.bufferSize = bufferSize; - this.codec = codec; - this.receiver = receiver; - this.suppress = false; - } - - public void clear() throws IOException { - flush(); - suppress = false; - } - - /** - * Write the length of the compressed bytes. Life is much easier if the - * header is constant length, so just use 3 bytes. Considering most of the - * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should - * be plenty. We also use the low bit for whether it is the original or - * compressed bytes. - * @param buffer the buffer to write the header to - * @param position the position in the buffer to write at - * @param val the size in the file - * @param original is it uncompressed - */ - private static void writeHeader(ByteBuffer buffer, - int position, - int val, - boolean original) { - buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); - buffer.put(position + 1, (byte) (val >> 7)); - buffer.put(position + 2, (byte) (val >> 15)); - } - - private void getNewInputBuffer() throws IOException { - if (codec == null) { - current = ByteBuffer.allocate(bufferSize); - } else { - current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); - writeHeader(current, 0, bufferSize, true); - current.position(HEADER_SIZE); - } - } - - /** - * Allocate a new output buffer if we are compressing. - */ - private ByteBuffer getNewOutputBuffer() throws IOException { - return ByteBuffer.allocate(bufferSize + HEADER_SIZE); - } - - private void flip() throws IOException { - current.limit(current.position()); - current.position(codec == null ? 0 : HEADER_SIZE); - } - - @Override - public void write(int i) throws IOException { - if (current == null) { - getNewInputBuffer(); - } - if (current.remaining() < 1) { - spill(); - } - uncompressedBytes += 1; - current.put((byte) i); - } - - @Override - public void write(byte[] bytes, int offset, int length) throws IOException { - if (current == null) { - getNewInputBuffer(); - } - int remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); - uncompressedBytes += remaining; - length -= remaining; - while (length != 0) { - spill(); - offset += remaining; - remaining = Math.min(current.remaining(), length); - current.put(bytes, offset, remaining); - uncompressedBytes += remaining; - length -= remaining; - } - } - - private void spill() throws java.io.IOException { - // if there isn't anything in the current buffer, don't spill - if (current == null || - current.position() == (codec == null ? 0 : HEADER_SIZE)) { - return; - } - flip(); - if (codec == null) { - receiver.output(current); - getNewInputBuffer(); - } else { - if (compressed == null) { - compressed = getNewOutputBuffer(); - } else if (overflow == null) { - overflow = getNewOutputBuffer(); - } - int sizePosn = compressed.position(); - compressed.position(compressed.position() + HEADER_SIZE); - if (codec.compress(current, compressed, overflow)) { - uncompressedBytes = 0; - // move position back to after the header - current.position(HEADER_SIZE); - current.limit(current.capacity()); - // find the total bytes in the chunk - int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; - if (overflow != null) { - totalBytes += overflow.position(); - } - compressedBytes += totalBytes + HEADER_SIZE; - writeHeader(compressed, sizePosn, totalBytes, false); - // if we have less than the next header left, spill it. - if (compressed.remaining() < HEADER_SIZE) { - compressed.flip(); - receiver.output(compressed); - compressed = overflow; - overflow = null; - } - } else { - compressedBytes += uncompressedBytes + HEADER_SIZE; - uncompressedBytes = 0; - // we are using the original, but need to spill the current - // compressed buffer first. So back up to where we started, - // flip it and add it to done. - if (sizePosn != 0) { - compressed.position(sizePosn); - compressed.flip(); - receiver.output(compressed); - compressed = null; - // if we have an overflow, clear it and make it the new compress - // buffer - if (overflow != null) { - overflow.clear(); - compressed = overflow; - overflow = null; - } - } else { - compressed.clear(); - if (overflow != null) { - overflow.clear(); - } - } - - // now add the current buffer into the done list and get a new one. - current.position(0); - // update the header with the current length - writeHeader(current, 0, current.limit() - HEADER_SIZE, true); - receiver.output(current); - getNewInputBuffer(); - } - } - } - - @Override - public void getPosition(PositionRecorder recorder) throws IOException { - if (codec == null) { - recorder.addPosition(uncompressedBytes); - } else { - recorder.addPosition(compressedBytes); - recorder.addPosition(uncompressedBytes); - } - } - - @Override - public void flush() throws IOException { - spill(); - if (compressed != null && compressed.position() != 0) { - compressed.flip(); - receiver.output(compressed); - } - compressed = null; - uncompressedBytes = 0; - compressedBytes = 0; - overflow = null; - current = null; - } - - @Override - public String toString() { - return name; - } - - @Override - public long getBufferSize() { - long result = 0; - if (current != null) { - result += current.capacity(); - } - if (compressed != null) { - result += compressed.capacity(); - } - if (overflow != null) { - result += overflow.capacity(); - } - return result; - } - - /** - * Set suppress flag - */ - public void suppress() { - suppress = true; - } - - /** - * Returns the state of suppress flag - * @return value of suppress flag - */ - public boolean isSuppressed() { - return suppress; - } -} -
