http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java new file mode 100644 index 0000000..bb73d53 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java @@ -0,0 +1,107 @@ +package org.apache.orc.impl; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; + +import javax.annotation.Nullable; + +public final class DataReaderProperties { + + private final FileSystem fileSystem; + private final Path path; + private final CompressionKind compression; + private final boolean zeroCopy; + private final int typeCount; + private final int bufferSize; + + private DataReaderProperties(Builder builder) { + this.fileSystem = builder.fileSystem; + this.path = builder.path; + this.compression = builder.compression; + this.zeroCopy = builder.zeroCopy; + this.typeCount = builder.typeCount; + this.bufferSize = builder.bufferSize; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public Path getPath() { + return path; + } + + public CompressionKind getCompression() { + return compression; + } + + public boolean getZeroCopy() { + return zeroCopy; + } + + public int getTypeCount() { + return typeCount; + } + + public int getBufferSize() { + return bufferSize; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private FileSystem fileSystem; + private Path path; + private CompressionKind compression; + private boolean zeroCopy; + private int typeCount; + private int bufferSize; + + private Builder() { + + } + + public Builder withFileSystem(FileSystem fileSystem) { + this.fileSystem = fileSystem; + return this; + } + + public Builder withPath(Path path) { + this.path = path; + return this; + } + + public Builder withCompression(CompressionKind value) { + this.compression = value; + return this; + } + + public Builder withZeroCopy(boolean zeroCopy) { + this.zeroCopy = zeroCopy; + return this; + } + + public Builder withTypeCount(int value) { + this.typeCount = value; + return this; + } + + public Builder withBufferSize(int value) { + this.bufferSize = value; + return this; + } + + public DataReaderProperties build() { + Preconditions.checkNotNull(fileSystem); + Preconditions.checkNotNull(path); + + return new DataReaderProperties(this); + } + + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java new file mode 100644 index 0000000..7e0110d --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/DirectDecompressionCodec.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import org.apache.orc.CompressionCodec; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public interface DirectDecompressionCodec extends CompressionCodec { + public boolean isAvailable(); + public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException; +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java new file mode 100644 index 0000000..986c2ac --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/DynamicByteArray.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.Text; + +/** + * A class that is a growable array of bytes. Growth is managed in terms of + * chunks that are allocated when needed. + */ +public final class DynamicByteArray { + static final int DEFAULT_CHUNKSIZE = 32 * 1024; + static final int DEFAULT_NUM_CHUNKS = 128; + + private final int chunkSize; // our allocation sizes + private byte[][] data; // the real data + private int length; // max set element index +1 + private int initializedChunks = 0; // the number of chunks created + + public DynamicByteArray() { + this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE); + } + + public DynamicByteArray(int numChunks, int chunkSize) { + if (chunkSize == 0) { + throw new IllegalArgumentException("bad chunksize"); + } + this.chunkSize = chunkSize; + data = new byte[numChunks][]; + } + + /** + * Ensure that the given index is valid. + */ + private void grow(int chunkIndex) { + if (chunkIndex >= initializedChunks) { + if (chunkIndex >= data.length) { + int newSize = Math.max(chunkIndex + 1, 2 * data.length); + byte[][] newChunk = new byte[newSize][]; + System.arraycopy(data, 0, newChunk, 0, data.length); + data = newChunk; + } + for(int i=initializedChunks; i <= chunkIndex; ++i) { + data[i] = new byte[chunkSize]; + } + initializedChunks = chunkIndex + 1; + } + } + + public byte get(int index) { + if (index >= length) { + throw new IndexOutOfBoundsException("Index " + index + + " is outside of 0.." + + (length - 1)); + } + int i = index / chunkSize; + int j = index % chunkSize; + return data[i][j]; + } + + public void set(int index, byte value) { + int i = index / chunkSize; + int j = index % chunkSize; + grow(i); + if (index >= length) { + length = index + 1; + } + data[i][j] = value; + } + + public int add(byte value) { + int i = length / chunkSize; + int j = length % chunkSize; + grow(i); + data[i][j] = value; + int result = length; + length += 1; + return result; + } + + /** + * Copy a slice of a byte array into our buffer. + * @param value the array to copy from + * @param valueOffset the first location to copy from value + * @param valueLength the number of bytes to copy from value + * @return the offset of the start of the value + */ + public int add(byte[] value, int valueOffset, int valueLength) { + int i = length / chunkSize; + int j = length % chunkSize; + grow((length + valueLength) / chunkSize); + int remaining = valueLength; + while (remaining > 0) { + int size = Math.min(remaining, chunkSize - j); + System.arraycopy(value, valueOffset, data[i], j, size); + remaining -= size; + valueOffset += size; + i += 1; + j = 0; + } + int result = length; + length += valueLength; + return result; + } + + /** + * Read the entire stream into this array. + * @param in the stream to read from + * @throws IOException + */ + public void readAll(InputStream in) throws IOException { + int currentChunk = length / chunkSize; + int currentOffset = length % chunkSize; + grow(currentChunk); + int currentLength = in.read(data[currentChunk], currentOffset, + chunkSize - currentOffset); + while (currentLength > 0) { + length += currentLength; + currentOffset = length % chunkSize; + if (currentOffset == 0) { + currentChunk = length / chunkSize; + grow(currentChunk); + } + currentLength = in.read(data[currentChunk], currentOffset, + chunkSize - currentOffset); + } + } + + /** + * Byte compare a set of bytes against the bytes in this dynamic array. + * @param other source of the other bytes + * @param otherOffset start offset in the other array + * @param otherLength number of bytes in the other array + * @param ourOffset the offset in our array + * @param ourLength the number of bytes in our array + * @return negative for less, 0 for equal, positive for greater + */ + public int compare(byte[] other, int otherOffset, int otherLength, + int ourOffset, int ourLength) { + int currentChunk = ourOffset / chunkSize; + int currentOffset = ourOffset % chunkSize; + int maxLength = Math.min(otherLength, ourLength); + while (maxLength > 0 && + other[otherOffset] == data[currentChunk][currentOffset]) { + otherOffset += 1; + currentOffset += 1; + if (currentOffset == chunkSize) { + currentChunk += 1; + currentOffset = 0; + } + maxLength -= 1; + } + if (maxLength == 0) { + return otherLength - ourLength; + } + int otherByte = 0xff & other[otherOffset]; + int ourByte = 0xff & data[currentChunk][currentOffset]; + return otherByte > ourByte ? 1 : -1; + } + + /** + * Get the size of the array. + * @return the number of bytes in the array + */ + public int size() { + return length; + } + + /** + * Clear the array to its original pristine state. + */ + public void clear() { + length = 0; + for(int i=0; i < data.length; ++i) { + data[i] = null; + } + initializedChunks = 0; + } + + /** + * Set a text value from the bytes in this dynamic array. + * @param result the value to set + * @param offset the start of the bytes to copy + * @param length the number of bytes to copy + */ + public void setText(Text result, int offset, int length) { + result.clear(); + int currentChunk = offset / chunkSize; + int currentOffset = offset % chunkSize; + int currentLength = Math.min(length, chunkSize - currentOffset); + while (length > 0) { + result.append(data[currentChunk], currentOffset, currentLength); + length -= currentLength; + currentChunk += 1; + currentOffset = 0; + currentLength = Math.min(length, chunkSize - currentOffset); + } + } + + /** + * Write out a range of this dynamic array to an output stream. + * @param out the stream to write to + * @param offset the first offset to write + * @param length the number of bytes to write + * @throws IOException + */ + public void write(OutputStream out, int offset, + int length) throws IOException { + int currentChunk = offset / chunkSize; + int currentOffset = offset % chunkSize; + while (length > 0) { + int currentLength = Math.min(length, chunkSize - currentOffset); + out.write(data[currentChunk], currentOffset, currentLength); + length -= currentLength; + currentChunk += 1; + currentOffset = 0; + } + } + + @Override + public String toString() { + int i; + StringBuilder sb = new StringBuilder(length * 3); + + sb.append('{'); + int l = length - 1; + for (i=0; i<l; i++) { + sb.append(Integer.toHexString(get(i))); + sb.append(','); + } + sb.append(get(i)); + sb.append('}'); + + return sb.toString(); + } + + public void setByteBuffer(ByteBuffer result, int offset, int length) { + result.clear(); + int currentChunk = offset / chunkSize; + int currentOffset = offset % chunkSize; + int currentLength = Math.min(length, chunkSize - currentOffset); + while (length > 0) { + result.put(data[currentChunk], currentOffset, currentLength); + length -= currentLength; + currentChunk += 1; + currentOffset = 0; + currentLength = Math.min(length, chunkSize - currentOffset); + } + } + + /** + * Gets all the bytes of the array. + * + * @return Bytes of the array + */ + public byte[] get() { + byte[] result = null; + if (length > 0) { + int currentChunk = 0; + int currentOffset = 0; + int currentLength = Math.min(length, chunkSize); + int destOffset = 0; + result = new byte[length]; + int totalLength = length; + while (totalLength > 0) { + System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength); + destOffset += currentLength; + totalLength -= currentLength; + currentChunk += 1; + currentOffset = 0; + currentLength = Math.min(totalLength, chunkSize - currentOffset); + } + } + return result; + } + + /** + * Get the size of the buffers. + */ + public long getSizeInBytes() { + return initializedChunks * chunkSize; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java new file mode 100644 index 0000000..3b2884b --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/DynamicIntArray.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +/** + * Dynamic int array that uses primitive types and chunks to avoid copying + * large number of integers when it resizes. + * + * The motivation for this class is memory optimization, i.e. space efficient + * storage of potentially huge arrays without good a-priori size guesses. + * + * The API of this class is between a primitive array and a AbstractList. It's + * not a Collection implementation because it handles primitive types, but the + * API could be extended to support iterators and the like. + * + * NOTE: Like standard Collection implementations/arrays, this class is not + * synchronized. + */ +public final class DynamicIntArray { + static final int DEFAULT_CHUNKSIZE = 8 * 1024; + static final int INIT_CHUNKS = 128; + + private final int chunkSize; // our allocation size + private int[][] data; // the real data + private int length; // max set element index +1 + private int initializedChunks = 0; // the number of created chunks + + public DynamicIntArray() { + this(DEFAULT_CHUNKSIZE); + } + + public DynamicIntArray(int chunkSize) { + this.chunkSize = chunkSize; + + data = new int[INIT_CHUNKS][]; + } + + /** + * Ensure that the given index is valid. + */ + private void grow(int chunkIndex) { + if (chunkIndex >= initializedChunks) { + if (chunkIndex >= data.length) { + int newSize = Math.max(chunkIndex + 1, 2 * data.length); + int[][] newChunk = new int[newSize][]; + System.arraycopy(data, 0, newChunk, 0, data.length); + data = newChunk; + } + for (int i=initializedChunks; i <= chunkIndex; ++i) { + data[i] = new int[chunkSize]; + } + initializedChunks = chunkIndex + 1; + } + } + + public int get(int index) { + if (index >= length) { + throw new IndexOutOfBoundsException("Index " + index + + " is outside of 0.." + + (length - 1)); + } + int i = index / chunkSize; + int j = index % chunkSize; + return data[i][j]; + } + + public void set(int index, int value) { + int i = index / chunkSize; + int j = index % chunkSize; + grow(i); + if (index >= length) { + length = index + 1; + } + data[i][j] = value; + } + + public void increment(int index, int value) { + int i = index / chunkSize; + int j = index % chunkSize; + grow(i); + if (index >= length) { + length = index + 1; + } + data[i][j] += value; + } + + public void add(int value) { + int i = length / chunkSize; + int j = length % chunkSize; + grow(i); + data[i][j] = value; + length += 1; + } + + public int size() { + return length; + } + + public void clear() { + length = 0; + for(int i=0; i < data.length; ++i) { + data[i] = null; + } + initializedChunks = 0; + } + + public String toString() { + int i; + StringBuilder sb = new StringBuilder(length * 4); + + sb.append('{'); + int l = length - 1; + for (i=0; i<l; i++) { + sb.append(get(i)); + sb.append(','); + } + sb.append(get(i)); + sb.append('}'); + + return sb.toString(); + } + + public int getSizeInBytes() { + return 4 * initializedChunks * chunkSize; + } +} + http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java new file mode 100644 index 0000000..ef7d70f --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.util.VersionInfo; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public interface HadoopShims { + + enum DirectCompressionType { + NONE, + ZLIB_NOHEADER, + ZLIB, + SNAPPY, + } + + interface DirectDecompressor { + void decompress(ByteBuffer var1, ByteBuffer var2) throws IOException; + } + + /** + * Get a direct decompressor codec, if it is available + * @param codec + * @return + */ + DirectDecompressor getDirectDecompressor(DirectCompressionType codec); + + /** + * a hadoop.io ByteBufferPool shim. + */ + public interface ByteBufferPoolShim { + /** + * Get a new ByteBuffer from the pool. The pool can provide this from + * removing a buffer from its internal cache, or by allocating a + * new buffer. + * + * @param direct Whether the buffer should be direct. + * @param length The minimum length the buffer will have. + * @return A new ByteBuffer. Its capacity can be less + * than what was requested, but must be at + * least 1 byte. + */ + ByteBuffer getBuffer(boolean direct, int length); + + /** + * Release a buffer back to the pool. + * The pool may choose to put this buffer into its cache/free it. + * + * @param buffer a direct bytebuffer + */ + void putBuffer(ByteBuffer buffer); + } + + /** + * Provides an HDFS ZeroCopyReader shim. + * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to) + * @param in ByteBufferPoolShim to allocate fallback buffers with + * + * @return returns null if not supported + */ + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException; + + public interface ZeroCopyReaderShim extends Closeable { + /** + * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer. + * Also move the in stream by that amount. The data read can be small than maxLength. + * + * @return ByteBuffer read from the stream, + */ + public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException; + /** + * Release a ByteBuffer obtained from a read on the + * Also move the in stream by that amount. The data read can be small than maxLength. + * + */ + public void releaseBuffer(ByteBuffer buffer); + + /** + * Close the underlying stream. + * @throws IOException + */ + public void close() throws IOException; + } + /** + * Read data into a Text object in the fastest way possible + */ + public interface TextReaderShim { + /** + * @param txt + * @param size + * @return bytes read + * @throws IOException + */ + void read(Text txt, int size) throws IOException; + } + + /** + * Wrap a TextReaderShim around an input stream. The reader shim will not + * buffer any reads from the underlying stream and will only consume bytes + * which are required for TextReaderShim.read() input. + */ + public TextReaderShim getTextReaderShim(InputStream input) throws IOException; + + class Factory { + private static HadoopShims SHIMS = null; + + public static synchronized HadoopShims get() { + if (SHIMS == null) { + String[] versionParts = VersionInfo.getVersion().split("[.]"); + int major = Integer.parseInt(versionParts[0]); + int minor = Integer.parseInt(versionParts[1]); + if (major < 2 || (major == 2 && minor < 3)) { + SHIMS = new HadoopShims_2_2(); + } else { + SHIMS = new HadoopShimsCurrent(); + } + } + return SHIMS; + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java new file mode 100644 index 0000000..5c53f74 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Shims for recent versions of Hadoop + */ +public class HadoopShimsCurrent implements HadoopShims { + + private static class DirectDecompressWrapper implements DirectDecompressor { + private final org.apache.hadoop.io.compress.DirectDecompressor root; + + DirectDecompressWrapper(org.apache.hadoop.io.compress.DirectDecompressor root) { + this.root = root; + } + + public void decompress(ByteBuffer input, + ByteBuffer output) throws IOException { + root.decompress(input, output); + } + } + + public DirectDecompressor getDirectDecompressor( + DirectCompressionType codec) { + switch (codec) { + case ZLIB: + return new DirectDecompressWrapper + (new ZlibDecompressor.ZlibDirectDecompressor()); + case ZLIB_NOHEADER: + return new DirectDecompressWrapper + (new ZlibDecompressor.ZlibDirectDecompressor + (ZlibDecompressor.CompressionHeader.NO_HEADER, 0)); + case SNAPPY: + return new DirectDecompressWrapper + (new SnappyDecompressor.SnappyDirectDecompressor()); + default: + return null; + } + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, + ByteBufferPoolShim pool + ) throws IOException { + return ZeroCopyShims.getZeroCopyReader(in, pool); + } + + private final class FastTextReaderShim implements TextReaderShim { + private final DataInputStream din; + + public FastTextReaderShim(InputStream in) { + this.din = new DataInputStream(in); + } + + @Override + public void read(Text txt, int len) throws IOException { + txt.readWithKnownLength(din, len); + } + } + + @Override + public TextReaderShim getTextReaderShim(InputStream in) throws IOException { + return new FastTextReaderShim(in); + } + +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java new file mode 100644 index 0000000..3f65e74 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.Text; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Method; + +/** + * Shims for versions of Hadoop up to and including 2.2.x + */ +public class HadoopShims_2_2 implements HadoopShims { + + final boolean zeroCopy; + final boolean fastRead; + + HadoopShims_2_2() { + boolean zcr = false; + try { + Class.forName("org.apache.hadoop.fs.CacheFlag", false, + HadoopShims_2_2.class.getClassLoader()); + zcr = true; + } catch (ClassNotFoundException ce) { + } + zeroCopy = zcr; + boolean fastRead = false; + if (zcr) { + for (Method m : Text.class.getMethods()) { + if ("readWithKnownLength".equals(m.getName())) { + fastRead = true; + } + } + } + this.fastRead = fastRead; + } + + public DirectDecompressor getDirectDecompressor( + DirectCompressionType codec) { + return null; + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, + ByteBufferPoolShim pool + ) throws IOException { + if(zeroCopy) { + return ZeroCopyShims.getZeroCopyReader(in, pool); + } + /* not supported */ + return null; + } + + private final class BasicTextReaderShim implements TextReaderShim { + private final InputStream in; + + public BasicTextReaderShim(InputStream in) { + this.in = in; + } + + @Override + public void read(Text txt, int len) throws IOException { + int offset = 0; + byte[] bytes = new byte[len]; + while (len > 0) { + int written = in.read(bytes, offset, len); + if (written < 0) { + throw new EOFException("Can't finish read from " + in + " read " + + (offset) + " bytes out of " + bytes.length); + } + len -= written; + offset += written; + } + txt.set(bytes); + } + } + + @Override + public TextReaderShim getTextReaderShim(InputStream in) throws IOException { + return new BasicTextReaderShim(in); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/InStream.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java new file mode 100644 index 0000000..851f645 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/InStream.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; + +import org.apache.orc.CompressionCodec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.common.io.DiskRange; + +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedInputStream; + +public abstract class InStream extends InputStream { + + private static final Logger LOG = LoggerFactory.getLogger(InStream.class); + public static final int PROTOBUF_MESSAGE_MAX_LIMIT = 1024 << 20; // 1GB + + protected final String name; + protected long length; + + public InStream(String name, long length) { + this.name = name; + this.length = length; + } + + public String getStreamName() { + return name; + } + + public long getStreamLength() { + return length; + } + + @Override + public abstract void close(); + + public static class UncompressedStream extends InStream { + private List<DiskRange> bytes; + private long length; + protected long currentOffset; + private ByteBuffer range; + private int currentRange; + + public UncompressedStream(String name, List<DiskRange> input, long length) { + super(name, length); + reset(input, length); + } + + protected void reset(List<DiskRange> input, long length) { + this.bytes = input; + this.length = length; + currentRange = 0; + currentOffset = 0; + range = null; + } + + @Override + public int read() { + if (range == null || range.remaining() == 0) { + if (currentOffset == length) { + return -1; + } + seek(currentOffset); + } + currentOffset += 1; + return 0xff & range.get(); + } + + @Override + public int read(byte[] data, int offset, int length) { + if (range == null || range.remaining() == 0) { + if (currentOffset == this.length) { + return -1; + } + seek(currentOffset); + } + int actualLength = Math.min(length, range.remaining()); + range.get(data, offset, actualLength); + currentOffset += actualLength; + return actualLength; + } + + @Override + public int available() { + if (range != null && range.remaining() > 0) { + return range.remaining(); + } + return (int) (length - currentOffset); + } + + @Override + public void close() { + currentRange = bytes.size(); + currentOffset = length; + // explicit de-ref of bytes[] + bytes.clear(); + } + + @Override + public void seek(PositionProvider index) throws IOException { + seek(index.getNext()); + } + + public void seek(long desired) { + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } + int i = 0; + for (DiskRange curRange : bytes) { + if (desired == 0 && curRange.getData().remaining() == 0) { + logEmptySeek(name); + return; + } + if (curRange.getOffset() <= desired && + (desired - curRange.getOffset()) < curRange.getLength()) { + currentOffset = desired; + currentRange = i; + this.range = curRange.getData().duplicate(); + int pos = range.position(); + pos += (int)(desired - curRange.getOffset()); // this is why we duplicate + this.range.position(pos); + return; + } + ++i; + } + // if they are seeking to the precise end, go ahead and let them go there + int segments = bytes.size(); + if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { + currentOffset = desired; + currentRange = segments - 1; + DiskRange curRange = bytes.get(currentRange); + this.range = curRange.getData().duplicate(); + int pos = range.position(); + pos += (int)(desired - curRange.getOffset()); // this is why we duplicate + this.range.position(pos); + return; + } + throw new IllegalArgumentException("Seek in " + name + " to " + + desired + " is outside of the data"); + } + + @Override + public String toString() { + return "uncompressed stream " + name + " position: " + currentOffset + + " length: " + length + " range: " + currentRange + + " offset: " + (range == null ? 0 : range.position()) + " limit: " + (range == null ? 0 : range.limit()); + } + } + + private static ByteBuffer allocateBuffer(int size, boolean isDirect) { + // TODO: use the same pool as the ORC readers + if (isDirect) { + return ByteBuffer.allocateDirect(size); + } else { + return ByteBuffer.allocate(size); + } + } + + private static class CompressedStream extends InStream { + private final List<DiskRange> bytes; + private final int bufferSize; + private ByteBuffer uncompressed; + private final CompressionCodec codec; + private ByteBuffer compressed; + private long currentOffset; + private int currentRange; + private boolean isUncompressedOriginal; + + public CompressedStream(String name, List<DiskRange> input, long length, + CompressionCodec codec, int bufferSize) { + super(name, length); + this.bytes = input; + this.codec = codec; + this.bufferSize = bufferSize; + currentOffset = 0; + currentRange = 0; + } + + private void allocateForUncompressed(int size, boolean isDirect) { + uncompressed = allocateBuffer(size, isDirect); + } + + private void readHeader() throws IOException { + if (compressed == null || compressed.remaining() <= 0) { + seek(currentOffset); + } + if (compressed.remaining() > OutStream.HEADER_SIZE) { + int b0 = compressed.get() & 0xff; + int b1 = compressed.get() & 0xff; + int b2 = compressed.get() & 0xff; + boolean isOriginal = (b0 & 0x01) == 1; + int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); + + if (chunkLength > bufferSize) { + throw new IllegalArgumentException("Buffer size too small. size = " + + bufferSize + " needed = " + chunkLength); + } + // read 3 bytes, which should be equal to OutStream.HEADER_SIZE always + assert OutStream.HEADER_SIZE == 3 : "The Orc HEADER_SIZE must be the same in OutStream and InStream"; + currentOffset += OutStream.HEADER_SIZE; + + ByteBuffer slice = this.slice(chunkLength); + + if (isOriginal) { + uncompressed = slice; + isUncompressedOriginal = true; + } else { + if (isUncompressedOriginal) { + allocateForUncompressed(bufferSize, slice.isDirect()); + isUncompressedOriginal = false; + } else if (uncompressed == null) { + allocateForUncompressed(bufferSize, slice.isDirect()); + } else { + uncompressed.clear(); + } + codec.decompress(slice, uncompressed); + } + } else { + throw new IllegalStateException("Can't read header at " + this); + } + } + + @Override + public int read() throws IOException { + if (uncompressed == null || uncompressed.remaining() == 0) { + if (currentOffset == length) { + return -1; + } + readHeader(); + } + return 0xff & uncompressed.get(); + } + + @Override + public int read(byte[] data, int offset, int length) throws IOException { + if (uncompressed == null || uncompressed.remaining() == 0) { + if (currentOffset == this.length) { + return -1; + } + readHeader(); + } + int actualLength = Math.min(length, uncompressed.remaining()); + uncompressed.get(data, offset, actualLength); + return actualLength; + } + + @Override + public int available() throws IOException { + if (uncompressed == null || uncompressed.remaining() == 0) { + if (currentOffset == length) { + return 0; + } + readHeader(); + } + return uncompressed.remaining(); + } + + @Override + public void close() { + uncompressed = null; + compressed = null; + currentRange = bytes.size(); + currentOffset = length; + bytes.clear(); + } + + @Override + public void seek(PositionProvider index) throws IOException { + seek(index.getNext()); + long uncompressedBytes = index.getNext(); + if (uncompressedBytes != 0) { + readHeader(); + uncompressed.position(uncompressed.position() + + (int) uncompressedBytes); + } else if (uncompressed != null) { + // mark the uncompressed buffer as done + uncompressed.position(uncompressed.limit()); + } + } + + /* slices a read only contiguous buffer of chunkLength */ + private ByteBuffer slice(int chunkLength) throws IOException { + int len = chunkLength; + final long oldOffset = currentOffset; + ByteBuffer slice; + if (compressed.remaining() >= len) { + slice = compressed.slice(); + // simple case + slice.limit(len); + currentOffset += len; + compressed.position(compressed.position() + len); + return slice; + } else if (currentRange >= (bytes.size() - 1)) { + // nothing has been modified yet + throw new IOException("EOF in " + this + " while trying to read " + + chunkLength + " bytes"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Crossing into next BufferChunk because compressed only has %d bytes (needs %d)", + compressed.remaining(), len)); + } + + // we need to consolidate 2 or more buffers into 1 + // first copy out compressed buffers + ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect()); + currentOffset += compressed.remaining(); + len -= compressed.remaining(); + copy.put(compressed); + ListIterator<DiskRange> iter = bytes.listIterator(currentRange); + + while (len > 0 && iter.hasNext()) { + ++currentRange; + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("Read slow-path, >1 cross block reads with %s", this.toString())); + } + DiskRange range = iter.next(); + compressed = range.getData().duplicate(); + if (compressed.remaining() >= len) { + slice = compressed.slice(); + slice.limit(len); + copy.put(slice); + currentOffset += len; + compressed.position(compressed.position() + len); + return copy; + } + currentOffset += compressed.remaining(); + len -= compressed.remaining(); + copy.put(compressed); + } + + // restore offsets for exception clarity + seek(oldOffset); + throw new IOException("EOF in " + this + " while trying to read " + + chunkLength + " bytes"); + } + + private void seek(long desired) throws IOException { + if (desired == 0 && bytes.isEmpty()) { + logEmptySeek(name); + return; + } + int i = 0; + for (DiskRange range : bytes) { + if (range.getOffset() <= desired && desired < range.getEnd()) { + currentRange = i; + compressed = range.getData().duplicate(); + int pos = compressed.position(); + pos += (int)(desired - range.getOffset()); + compressed.position(pos); + currentOffset = desired; + return; + } + ++i; + } + // if they are seeking to the precise end, go ahead and let them go there + int segments = bytes.size(); + if (segments != 0 && desired == bytes.get(segments - 1).getEnd()) { + DiskRange range = bytes.get(segments - 1); + currentRange = segments - 1; + compressed = range.getData().duplicate(); + compressed.position(compressed.limit()); + currentOffset = desired; + return; + } + throw new IOException("Seek outside of data in " + this + " to " + desired); + } + + private String rangeString() { + StringBuilder builder = new StringBuilder(); + int i = 0; + for (DiskRange range : bytes) { + if (i != 0) { + builder.append("; "); + } + builder.append(" range " + i + " = " + range.getOffset() + + " to " + (range.getEnd() - range.getOffset())); + ++i; + } + return builder.toString(); + } + + @Override + public String toString() { + return "compressed stream " + name + " position: " + currentOffset + + " length: " + length + " range: " + currentRange + + " offset: " + (compressed == null ? 0 : compressed.position()) + " limit: " + (compressed == null ? 0 : compressed.limit()) + + rangeString() + + (uncompressed == null ? "" : + " uncompressed: " + uncompressed.position() + " to " + + uncompressed.limit()); + } + } + + public abstract void seek(PositionProvider index) throws IOException; + + private static void logEmptySeek(String name) { + if (LOG.isWarnEnabled()) { + LOG.warn("Attempting seek into empty stream (" + name + ") Skipping stream."); + } + } + + /** + * Create an input stream from a list of buffers. + * @param streamName the name of the stream + * @param buffers the list of ranges of bytes for the stream + * @param offsets a list of offsets (the same length as input) that must + * contain the first offset of the each set of bytes in input + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return an input stream + * @throws IOException + */ + @VisibleForTesting + @Deprecated + public static InStream create(String streamName, + ByteBuffer[] buffers, + long[] offsets, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + List<DiskRange> input = new ArrayList<DiskRange>(buffers.length); + for (int i = 0; i < buffers.length; ++i) { + input.add(new BufferChunk(buffers[i], offsets[i])); + } + return create(streamName, input, length, codec, bufferSize); + } + + /** + * Create an input stream from a list of disk ranges with data. + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return an input stream + * @throws IOException + */ + public static InStream create(String name, + List<DiskRange> input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + if (codec == null) { + return new UncompressedStream(name, input, length); + } else { + return new CompressedStream(name, input, length, codec, bufferSize); + } + } + + /** + * Creates coded input stream (used for protobuf message parsing) with higher message size limit. + * + * @param name the name of the stream + * @param input the list of ranges of bytes for the stream; from disk or cache + * @param length the length in bytes of the stream + * @param codec the compression codec + * @param bufferSize the compression buffer size + * @return coded input stream + * @throws IOException + */ + public static CodedInputStream createCodedInputStream( + String name, + List<DiskRange> input, + long length, + CompressionCodec codec, + int bufferSize) throws IOException { + InStream inStream = create(name, input, length, codec, bufferSize); + CodedInputStream codedInputStream = CodedInputStream.newInstance(inStream); + codedInputStream.setSizeLimit(PROTOBUF_MESSAGE_MAX_LIMIT); + return codedInputStream; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerReader.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/IntegerReader.java b/java/core/src/java/org/apache/orc/impl/IntegerReader.java new file mode 100644 index 0000000..3e64d54 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/IntegerReader.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +/** + * Interface for reading integers. + */ +public interface IntegerReader { + + /** + * Seek to the position provided by index. + * @param index + * @throws IOException + */ + void seek(PositionProvider index) throws IOException; + + /** + * Skip number of specified rows. + * @param numValues + * @throws IOException + */ + void skip(long numValues) throws IOException; + + /** + * Check if there are any more values left. + * @return + * @throws IOException + */ + boolean hasNext() throws IOException; + + /** + * Return the next available value. + * @return + * @throws IOException + */ + long next() throws IOException; + + /** + * Return the next available vector for values. + * @param column the column being read + * @param data the vector to read into + * @param length the number of numbers to read + * @throws IOException + */ + void nextVector(ColumnVector column, + long[] data, + int length + ) throws IOException; + + /** + * Return the next available vector for values. Does not change the + * value of column.isRepeating. + * @param column the column being read + * @param data the vector to read into + * @param length the number of numbers to read + * @throws IOException + */ + void nextVector(ColumnVector column, + int[] data, + int length + ) throws IOException; +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/IntegerWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java new file mode 100644 index 0000000..419054f --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; + +/** + * Interface for writing integers. + */ +public interface IntegerWriter { + + /** + * Get position from the stream. + * @param recorder + * @throws IOException + */ + void getPosition(PositionRecorder recorder) throws IOException; + + /** + * Write the integer value + * @param value + * @throws IOException + */ + void write(long value) throws IOException; + + /** + * Flush the buffer + * @throws IOException + */ + void flush() throws IOException; +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/MemoryManager.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/MemoryManager.java b/java/core/src/java/org/apache/orc/impl/MemoryManager.java new file mode 100644 index 0000000..757c0b4 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/MemoryManager.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.orc.OrcConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Implements a memory manager that keeps a global context of how many ORC + * writers there are and manages the memory between them. For use cases with + * dynamic partitions, it is easy to end up with many writers in the same task. + * By managing the size of each allocation, we try to cut down the size of each + * allocation and keep the task from running out of memory. + * + * This class is not thread safe, but is re-entrant - ensure creation and all + * invocations are triggered from the same thread. + */ +public class MemoryManager { + + private static final Logger LOG = LoggerFactory.getLogger(MemoryManager.class); + + /** + * How often should we check the memory sizes? Measured in rows added + * to all of the writers. + */ + private static final int ROWS_BETWEEN_CHECKS = 5000; + private final long totalMemoryPool; + private final Map<Path, WriterInfo> writerList = + new HashMap<Path, WriterInfo>(); + private long totalAllocation = 0; + private double currentScale = 1; + private int rowsAddedSinceCheck = 0; + private final OwnedLock ownerLock = new OwnedLock(); + + @SuppressWarnings("serial") + private static class OwnedLock extends ReentrantLock { + public Thread getOwner() { + return super.getOwner(); + } + } + + private static class WriterInfo { + long allocation; + Callback callback; + WriterInfo(long allocation, Callback callback) { + this.allocation = allocation; + this.callback = callback; + } + } + + public interface Callback { + /** + * The writer needs to check its memory usage + * @param newScale the current scale factor for memory allocations + * @return true if the writer was over the limit + * @throws IOException + */ + boolean checkMemory(double newScale) throws IOException; + } + + /** + * Create the memory manager. + * @param conf use the configuration to find the maximum size of the memory + * pool. + */ + public MemoryManager(Configuration conf) { + double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); + totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). + getHeapMemoryUsage().getMax() * maxLoad); + ownerLock.lock(); + } + + /** + * Light weight thread-safety check for multi-threaded access patterns + */ + private void checkOwner() { + if (!ownerLock.isHeldByCurrentThread()) { + LOG.warn("Owner thread expected {}, got {}", + ownerLock.getOwner(), Thread.currentThread()); + } + } + + /** + * Add a new writer's memory allocation to the pool. We use the path + * as a unique key to ensure that we don't get duplicates. + * @param path the file that is being written + * @param requestedAllocation the requested buffer size + */ + public void addWriter(Path path, long requestedAllocation, + Callback callback) throws IOException { + checkOwner(); + WriterInfo oldVal = writerList.get(path); + // this should always be null, but we handle the case where the memory + // manager wasn't told that a writer wasn't still in use and the task + // starts writing to the same path. + if (oldVal == null) { + oldVal = new WriterInfo(requestedAllocation, callback); + writerList.put(path, oldVal); + totalAllocation += requestedAllocation; + } else { + // handle a new writer that is writing to the same path + totalAllocation += requestedAllocation - oldVal.allocation; + oldVal.allocation = requestedAllocation; + oldVal.callback = callback; + } + updateScale(true); + } + + /** + * Remove the given writer from the pool. + * @param path the file that has been closed + */ + public void removeWriter(Path path) throws IOException { + checkOwner(); + WriterInfo val = writerList.get(path); + if (val != null) { + writerList.remove(path); + totalAllocation -= val.allocation; + if (writerList.isEmpty()) { + rowsAddedSinceCheck = 0; + } + updateScale(false); + } + if(writerList.isEmpty()) { + rowsAddedSinceCheck = 0; + } + } + + /** + * Get the total pool size that is available for ORC writers. + * @return the number of bytes in the pool + */ + public long getTotalMemoryPool() { + return totalMemoryPool; + } + + /** + * The scaling factor for each allocation to ensure that the pool isn't + * oversubscribed. + * @return a fraction between 0.0 and 1.0 of the requested size that is + * available for each writer. + */ + public double getAllocationScale() { + return currentScale; + } + + /** + * Give the memory manager an opportunity for doing a memory check. + * @param rows number of rows added + * @throws IOException + */ + public void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { + notifyWriters(); + } + } + + /** + * Notify all of the writers that they should check their memory usage. + * @throws IOException + */ + public void notifyWriters() throws IOException { + checkOwner(); + LOG.debug("Notifying writers after " + rowsAddedSinceCheck); + for(WriterInfo writer: writerList.values()) { + boolean flushed = writer.callback.checkMemory(currentScale); + if (LOG.isDebugEnabled() && flushed) { + LOG.debug("flushed " + writer.toString()); + } + } + rowsAddedSinceCheck = 0; + } + + /** + * Update the currentScale based on the current allocation and pool size. + * This also updates the notificationTrigger. + * @param isAllocate is this an allocation? + */ + private void updateScale(boolean isAllocate) throws IOException { + if (totalAllocation <= totalMemoryPool) { + currentScale = 1; + } else { + currentScale = (double) totalMemoryPool / totalAllocation; + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java new file mode 100644 index 0000000..72c7f54 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/OrcAcidUtils.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.Reader; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.Charset; +import java.nio.charset.CharsetDecoder; + +public class OrcAcidUtils { + public static final String ACID_STATS = "hive.acid.stats"; + public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length"; + + /** + * Get the filename of the ORC ACID side file that contains the lengths + * of the intermediate footers. + * @param main the main ORC filename + * @return the name of the side file + */ + public static Path getSideFile(Path main) { + return new Path(main + DELTA_SIDE_FILE_SUFFIX); + } + + /** + * Read the side file to get the last flush length. + * @param fs the file system to use + * @param deltaFile the path of the delta file + * @return the maximum size of the file to use + * @throws IOException + */ + public static long getLastFlushLength(FileSystem fs, + Path deltaFile) throws IOException { + Path lengths = getSideFile(deltaFile); + long result = Long.MAX_VALUE; + try (FSDataInputStream stream = fs.open(lengths)) { + result = -1; + while (stream.available() > 0) { + result = stream.readLong(); + } + return result; + } catch (IOException ioe) { + return result; + } + } + + private static final Charset utf8 = Charset.forName("UTF-8"); + private static final CharsetDecoder utf8Decoder = utf8.newDecoder(); + + public static AcidStats parseAcidStats(Reader reader) { + if (reader.hasMetadataValue(ACID_STATS)) { + try { + ByteBuffer val = reader.getMetadataValue(ACID_STATS).duplicate(); + return new AcidStats(utf8Decoder.decode(val).toString()); + } catch (CharacterCodingException e) { + throw new IllegalArgumentException("Bad string encoding for " + + ACID_STATS, e); + } + } else { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OrcIndex.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OrcIndex.java b/java/core/src/java/org/apache/orc/impl/OrcIndex.java new file mode 100644 index 0000000..50a15f2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/OrcIndex.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.orc.OrcProto; + +public final class OrcIndex { + OrcProto.RowIndex[] rowGroupIndex; + OrcProto.BloomFilterIndex[] bloomFilterIndex; + + public OrcIndex(OrcProto.RowIndex[] rgIndex, OrcProto.BloomFilterIndex[] bfIndex) { + this.rowGroupIndex = rgIndex; + this.bloomFilterIndex = bfIndex; + } + + public OrcProto.RowIndex[] getRowGroupIndex() { + return rowGroupIndex; + } + + public OrcProto.BloomFilterIndex[] getBloomFilterIndex() { + return bloomFilterIndex; + } + + public void setRowGroupIndex(OrcProto.RowIndex[] rowGroupIndex) { + this.rowGroupIndex = rowGroupIndex; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/OutStream.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java new file mode 100644 index 0000000..81662cc --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/OutStream.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import org.apache.orc.CompressionCodec; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class OutStream extends PositionedOutputStream { + + public interface OutputReceiver { + /** + * Output the given buffer to the final destination + * @param buffer the buffer to output + * @throws IOException + */ + void output(ByteBuffer buffer) throws IOException; + } + + public static final int HEADER_SIZE = 3; + private final String name; + private final OutputReceiver receiver; + // if enabled the stream will be suppressed when writing stripe + private boolean suppress; + + /** + * Stores the uncompressed bytes that have been serialized, but not + * compressed yet. When this fills, we compress the entire buffer. + */ + private ByteBuffer current = null; + + /** + * Stores the compressed bytes until we have a full buffer and then outputs + * them to the receiver. If no compression is being done, this (and overflow) + * will always be null and the current buffer will be sent directly to the + * receiver. + */ + private ByteBuffer compressed = null; + + /** + * Since the compressed buffer may start with contents from previous + * compression blocks, we allocate an overflow buffer so that the + * output of the codec can be split between the two buffers. After the + * compressed buffer is sent to the receiver, the overflow buffer becomes + * the new compressed buffer. + */ + private ByteBuffer overflow = null; + private final int bufferSize; + private final CompressionCodec codec; + private long compressedBytes = 0; + private long uncompressedBytes = 0; + + public OutStream(String name, + int bufferSize, + CompressionCodec codec, + OutputReceiver receiver) throws IOException { + this.name = name; + this.bufferSize = bufferSize; + this.codec = codec; + this.receiver = receiver; + this.suppress = false; + } + + public void clear() throws IOException { + flush(); + suppress = false; + } + + /** + * Write the length of the compressed bytes. Life is much easier if the + * header is constant length, so just use 3 bytes. Considering most of the + * codecs want between 32k (snappy) and 256k (lzo, zlib), 3 bytes should + * be plenty. We also use the low bit for whether it is the original or + * compressed bytes. + * @param buffer the buffer to write the header to + * @param position the position in the buffer to write at + * @param val the size in the file + * @param original is it uncompressed + */ + private static void writeHeader(ByteBuffer buffer, + int position, + int val, + boolean original) { + buffer.put(position, (byte) ((val << 1) + (original ? 1 : 0))); + buffer.put(position + 1, (byte) (val >> 7)); + buffer.put(position + 2, (byte) (val >> 15)); + } + + private void getNewInputBuffer() throws IOException { + if (codec == null) { + current = ByteBuffer.allocate(bufferSize); + } else { + current = ByteBuffer.allocate(bufferSize + HEADER_SIZE); + writeHeader(current, 0, bufferSize, true); + current.position(HEADER_SIZE); + } + } + + /** + * Allocate a new output buffer if we are compressing. + */ + private ByteBuffer getNewOutputBuffer() throws IOException { + return ByteBuffer.allocate(bufferSize + HEADER_SIZE); + } + + private void flip() throws IOException { + current.limit(current.position()); + current.position(codec == null ? 0 : HEADER_SIZE); + } + + @Override + public void write(int i) throws IOException { + if (current == null) { + getNewInputBuffer(); + } + if (current.remaining() < 1) { + spill(); + } + uncompressedBytes += 1; + current.put((byte) i); + } + + @Override + public void write(byte[] bytes, int offset, int length) throws IOException { + if (current == null) { + getNewInputBuffer(); + } + int remaining = Math.min(current.remaining(), length); + current.put(bytes, offset, remaining); + uncompressedBytes += remaining; + length -= remaining; + while (length != 0) { + spill(); + offset += remaining; + remaining = Math.min(current.remaining(), length); + current.put(bytes, offset, remaining); + uncompressedBytes += remaining; + length -= remaining; + } + } + + private void spill() throws java.io.IOException { + // if there isn't anything in the current buffer, don't spill + if (current == null || + current.position() == (codec == null ? 0 : HEADER_SIZE)) { + return; + } + flip(); + if (codec == null) { + receiver.output(current); + getNewInputBuffer(); + } else { + if (compressed == null) { + compressed = getNewOutputBuffer(); + } else if (overflow == null) { + overflow = getNewOutputBuffer(); + } + int sizePosn = compressed.position(); + compressed.position(compressed.position() + HEADER_SIZE); + if (codec.compress(current, compressed, overflow)) { + uncompressedBytes = 0; + // move position back to after the header + current.position(HEADER_SIZE); + current.limit(current.capacity()); + // find the total bytes in the chunk + int totalBytes = compressed.position() - sizePosn - HEADER_SIZE; + if (overflow != null) { + totalBytes += overflow.position(); + } + compressedBytes += totalBytes + HEADER_SIZE; + writeHeader(compressed, sizePosn, totalBytes, false); + // if we have less than the next header left, spill it. + if (compressed.remaining() < HEADER_SIZE) { + compressed.flip(); + receiver.output(compressed); + compressed = overflow; + overflow = null; + } + } else { + compressedBytes += uncompressedBytes + HEADER_SIZE; + uncompressedBytes = 0; + // we are using the original, but need to spill the current + // compressed buffer first. So back up to where we started, + // flip it and add it to done. + if (sizePosn != 0) { + compressed.position(sizePosn); + compressed.flip(); + receiver.output(compressed); + compressed = null; + // if we have an overflow, clear it and make it the new compress + // buffer + if (overflow != null) { + overflow.clear(); + compressed = overflow; + overflow = null; + } + } else { + compressed.clear(); + if (overflow != null) { + overflow.clear(); + } + } + + // now add the current buffer into the done list and get a new one. + current.position(0); + // update the header with the current length + writeHeader(current, 0, current.limit() - HEADER_SIZE, true); + receiver.output(current); + getNewInputBuffer(); + } + } + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + if (codec == null) { + recorder.addPosition(uncompressedBytes); + } else { + recorder.addPosition(compressedBytes); + recorder.addPosition(uncompressedBytes); + } + } + + @Override + public void flush() throws IOException { + spill(); + if (compressed != null && compressed.position() != 0) { + compressed.flip(); + receiver.output(compressed); + } + compressed = null; + uncompressedBytes = 0; + compressedBytes = 0; + overflow = null; + current = null; + } + + @Override + public String toString() { + return name; + } + + @Override + public long getBufferSize() { + long result = 0; + if (current != null) { + result += current.capacity(); + } + if (compressed != null) { + result += compressed.capacity(); + } + if (overflow != null) { + result += overflow.capacity(); + } + return result; + } + + /** + * Set suppress flag + */ + public void suppress() { + suppress = true; + } + + /** + * Returns the state of suppress flag + * @return value of suppress flag + */ + public boolean isSuppressed() { + return suppress; + } +} + http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionProvider.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PositionProvider.java b/java/core/src/java/org/apache/orc/impl/PositionProvider.java new file mode 100644 index 0000000..47cf481 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/PositionProvider.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +/** + * An interface used for seeking to a row index. + */ +public interface PositionProvider { + long getNext(); +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionRecorder.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PositionRecorder.java b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java new file mode 100644 index 0000000..1fff760 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/PositionRecorder.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +/** + * An interface for recording positions in a stream. + */ +public interface PositionRecorder { + void addPosition(long offset); +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java new file mode 100644 index 0000000..d412939 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/PositionedOutputStream.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.io.OutputStream; + +public abstract class PositionedOutputStream extends OutputStream { + + /** + * Record the current position to the recorder. + * @param recorder the object that receives the position + * @throws IOException + */ + public abstract void getPosition(PositionRecorder recorder + ) throws IOException; + + /** + * Get the memory size currently allocated as buffer associated with this + * stream. + * @return the number of bytes used by buffers. + */ + public abstract long getBufferSize(); +}
