http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java deleted file mode 100644 index 27cdac2..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DecimalColumnStatistics.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import org.apache.hadoop.hive.common.type.HiveDecimal; - -/** - * Statistics for decimal columns. - */ -public interface DecimalColumnStatistics extends ColumnStatistics { - - /** - * Get the minimum value for the column. - * @return the minimum value - */ - HiveDecimal getMinimum(); - - /** - * Get the maximum value for the column. - * @return the maximum value - */ - HiveDecimal getMaximum(); - - /** - * Get the sum of the values of the column. - * @return the sum - */ - HiveDecimal getSum(); - -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java deleted file mode 100644 index 5333052..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DirectDecompressionCodec.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; -import java.nio.ByteBuffer; - -public interface DirectDecompressionCodec extends CompressionCodec { - public boolean isAvailable(); - public void directDecompress(ByteBuffer in, ByteBuffer out) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java deleted file mode 100644 index ddce8f7..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DoubleColumnStatistics.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -/** - * Statistics for float and double columns. - */ -public interface DoubleColumnStatistics extends ColumnStatistics { - - /** - * Get the smallest value in the column. Only defined if getNumberOfValues - * is non-zero. - * @return the minimum - */ - double getMinimum(); - - /** - * Get the largest value in the column. Only defined if getNumberOfValues - * is non-zero. - * @return the maximum - */ - double getMaximum(); - - /** - * Get the sum of the values in the column. - * @return the sum - */ - double getSum(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java deleted file mode 100644 index 1d44f77..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicByteArray.java +++ /dev/null @@ -1,303 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import org.apache.hadoop.io.Text; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; - -/** - * A class that is a growable array of bytes. Growth is managed in terms of - * chunks that are allocated when needed. - */ -final class DynamicByteArray { - static final int DEFAULT_CHUNKSIZE = 32 * 1024; - static final int DEFAULT_NUM_CHUNKS = 128; - - private final int chunkSize; // our allocation sizes - private byte[][] data; // the real data - private int length; // max set element index +1 - private int initializedChunks = 0; // the number of chunks created - - public DynamicByteArray() { - this(DEFAULT_NUM_CHUNKS, DEFAULT_CHUNKSIZE); - } - - public DynamicByteArray(int numChunks, int chunkSize) { - if (chunkSize == 0) { - throw new IllegalArgumentException("bad chunksize"); - } - this.chunkSize = chunkSize; - data = new byte[numChunks][]; - } - - /** - * Ensure that the given index is valid. - */ - private void grow(int chunkIndex) { - if (chunkIndex >= initializedChunks) { - if (chunkIndex >= data.length) { - int newSize = Math.max(chunkIndex + 1, 2 * data.length); - byte[][] newChunk = new byte[newSize][]; - System.arraycopy(data, 0, newChunk, 0, data.length); - data = newChunk; - } - for(int i=initializedChunks; i <= chunkIndex; ++i) { - data[i] = new byte[chunkSize]; - } - initializedChunks = chunkIndex + 1; - } - } - - public byte get(int index) { - if (index >= length) { - throw new IndexOutOfBoundsException("Index " + index + - " is outside of 0.." + - (length - 1)); - } - int i = index / chunkSize; - int j = index % chunkSize; - return data[i][j]; - } - - public void set(int index, byte value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] = value; - } - - public int add(byte value) { - int i = length / chunkSize; - int j = length % chunkSize; - grow(i); - data[i][j] = value; - int result = length; - length += 1; - return result; - } - - /** - * Copy a slice of a byte array into our buffer. - * @param value the array to copy from - * @param valueOffset the first location to copy from value - * @param valueLength the number of bytes to copy from value - * @return the offset of the start of the value - */ - public int add(byte[] value, int valueOffset, int valueLength) { - int i = length / chunkSize; - int j = length % chunkSize; - grow((length + valueLength) / chunkSize); - int remaining = valueLength; - while (remaining > 0) { - int size = Math.min(remaining, chunkSize - j); - System.arraycopy(value, valueOffset, data[i], j, size); - remaining -= size; - valueOffset += size; - i += 1; - j = 0; - } - int result = length; - length += valueLength; - return result; - } - - /** - * Read the entire stream into this array. - * @param in the stream to read from - * @throws IOException - */ - public void readAll(InputStream in) throws IOException { - int currentChunk = length / chunkSize; - int currentOffset = length % chunkSize; - grow(currentChunk); - int currentLength = in.read(data[currentChunk], currentOffset, - chunkSize - currentOffset); - while (currentLength > 0) { - length += currentLength; - currentOffset = length % chunkSize; - if (currentOffset == 0) { - currentChunk = length / chunkSize; - grow(currentChunk); - } - currentLength = in.read(data[currentChunk], currentOffset, - chunkSize - currentOffset); - } - } - - /** - * Byte compare a set of bytes against the bytes in this dynamic array. - * @param other source of the other bytes - * @param otherOffset start offset in the other array - * @param otherLength number of bytes in the other array - * @param ourOffset the offset in our array - * @param ourLength the number of bytes in our array - * @return negative for less, 0 for equal, positive for greater - */ - public int compare(byte[] other, int otherOffset, int otherLength, - int ourOffset, int ourLength) { - int currentChunk = ourOffset / chunkSize; - int currentOffset = ourOffset % chunkSize; - int maxLength = Math.min(otherLength, ourLength); - while (maxLength > 0 && - other[otherOffset] == data[currentChunk][currentOffset]) { - otherOffset += 1; - currentOffset += 1; - if (currentOffset == chunkSize) { - currentChunk += 1; - currentOffset = 0; - } - maxLength -= 1; - } - if (maxLength == 0) { - return otherLength - ourLength; - } - int otherByte = 0xff & other[otherOffset]; - int ourByte = 0xff & data[currentChunk][currentOffset]; - return otherByte > ourByte ? 1 : -1; - } - - /** - * Get the size of the array. - * @return the number of bytes in the array - */ - public int size() { - return length; - } - - /** - * Clear the array to its original pristine state. - */ - public void clear() { - length = 0; - for(int i=0; i < data.length; ++i) { - data[i] = null; - } - initializedChunks = 0; - } - - /** - * Set a text value from the bytes in this dynamic array. - * @param result the value to set - * @param offset the start of the bytes to copy - * @param length the number of bytes to copy - */ - public void setText(Text result, int offset, int length) { - result.clear(); - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - int currentLength = Math.min(length, chunkSize - currentOffset); - while (length > 0) { - result.append(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(length, chunkSize - currentOffset); - } - } - - /** - * Write out a range of this dynamic array to an output stream. - * @param out the stream to write to - * @param offset the first offset to write - * @param length the number of bytes to write - * @throws IOException - */ - public void write(OutputStream out, int offset, - int length) throws IOException { - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - while (length > 0) { - int currentLength = Math.min(length, chunkSize - currentOffset); - out.write(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - } - } - - @Override - public String toString() { - int i; - StringBuilder sb = new StringBuilder(length * 3); - - sb.append('{'); - int l = length - 1; - for (i=0; i<l; i++) { - sb.append(Integer.toHexString(get(i))); - sb.append(','); - } - sb.append(get(i)); - sb.append('}'); - - return sb.toString(); - } - - public void setByteBuffer(ByteBuffer result, int offset, int length) { - result.clear(); - int currentChunk = offset / chunkSize; - int currentOffset = offset % chunkSize; - int currentLength = Math.min(length, chunkSize - currentOffset); - while (length > 0) { - result.put(data[currentChunk], currentOffset, currentLength); - length -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(length, chunkSize - currentOffset); - } - } - - /** - * Gets all the bytes of the array. - * - * @return Bytes of the array - */ - public byte[] get() { - byte[] result = null; - if (length > 0) { - int currentChunk = 0; - int currentOffset = 0; - int currentLength = Math.min(length, chunkSize); - int destOffset = 0; - result = new byte[length]; - int totalLength = length; - while (totalLength > 0) { - System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength); - destOffset += currentLength; - totalLength -= currentLength; - currentChunk += 1; - currentOffset = 0; - currentLength = Math.min(totalLength, chunkSize - currentOffset); - } - } - return result; - } - - /** - * Get the size of the buffers. - */ - public long getSizeInBytes() { - return initializedChunks * chunkSize; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java deleted file mode 100644 index a347706..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/DynamicIntArray.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -/** - * Dynamic int array that uses primitive types and chunks to avoid copying - * large number of integers when it resizes. - * - * The motivation for this class is memory optimization, i.e. space efficient - * storage of potentially huge arrays without good a-priori size guesses. - * - * The API of this class is between a primitive array and a AbstractList. It's - * not a Collection implementation because it handles primitive types, but the - * API could be extended to support iterators and the like. - * - * NOTE: Like standard Collection implementations/arrays, this class is not - * synchronized. - */ -final class DynamicIntArray { - static final int DEFAULT_CHUNKSIZE = 8 * 1024; - static final int INIT_CHUNKS = 128; - - private final int chunkSize; // our allocation size - private int[][] data; // the real data - private int length; // max set element index +1 - private int initializedChunks = 0; // the number of created chunks - - public DynamicIntArray() { - this(DEFAULT_CHUNKSIZE); - } - - public DynamicIntArray(int chunkSize) { - this.chunkSize = chunkSize; - - data = new int[INIT_CHUNKS][]; - } - - /** - * Ensure that the given index is valid. - */ - private void grow(int chunkIndex) { - if (chunkIndex >= initializedChunks) { - if (chunkIndex >= data.length) { - int newSize = Math.max(chunkIndex + 1, 2 * data.length); - int[][] newChunk = new int[newSize][]; - System.arraycopy(data, 0, newChunk, 0, data.length); - data = newChunk; - } - for (int i=initializedChunks; i <= chunkIndex; ++i) { - data[i] = new int[chunkSize]; - } - initializedChunks = chunkIndex + 1; - } - } - - public int get(int index) { - if (index >= length) { - throw new IndexOutOfBoundsException("Index " + index + - " is outside of 0.." + - (length - 1)); - } - int i = index / chunkSize; - int j = index % chunkSize; - return data[i][j]; - } - - public void set(int index, int value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] = value; - } - - public void increment(int index, int value) { - int i = index / chunkSize; - int j = index % chunkSize; - grow(i); - if (index >= length) { - length = index + 1; - } - data[i][j] += value; - } - - public void add(int value) { - int i = length / chunkSize; - int j = length % chunkSize; - grow(i); - data[i][j] = value; - length += 1; - } - - public int size() { - return length; - } - - public void clear() { - length = 0; - for(int i=0; i < data.length; ++i) { - data[i] = null; - } - initializedChunks = 0; - } - - public String toString() { - int i; - StringBuilder sb = new StringBuilder(length * 4); - - sb.append('{'); - int l = length - 1; - for (i=0; i<l; i++) { - sb.append(get(i)); - sb.append(','); - } - sb.append(get(i)); - sb.append('}'); - - return sb.toString(); - } - - public int getSizeInBytes() { - return 4 * initializedChunks * chunkSize; - } -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java deleted file mode 100644 index dcc1347..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/FileOrcDataSource.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import com.facebook.presto.orc.DiskRange; -import com.facebook.presto.orc.OrcDataSource; -import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; -import io.airlift.units.DataSize; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; -import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; - -/** - * File data source class for Orc Reader - * - * Most of code is from Presto - */ -public class FileOrcDataSource - implements OrcDataSource -{ - private final File path; - private final long size; - private final RandomAccessFile input; - private final DataSize maxMergeDistance; - private long readTimeNanos; - - public FileOrcDataSource(File path, double mergeDistance) - throws IOException - { - this.path = checkNotNull(path, "path is null"); - this.size = path.length(); - this.input = new RandomAccessFile(path, "r"); - - maxMergeDistance = new DataSize(mergeDistance, DataSize.Unit.BYTE); - } - - @Override - public void close() - throws IOException - { - input.close(); - } - - @Override - public long getReadTimeNanos() - { - return readTimeNanos; - } - - @Override - public long getSize() - { - return size; - } - - @Override - public void readFully(long position, byte[] buffer) - throws IOException - { - readFully(position, buffer, 0, buffer.length); - } - - @Override - public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) - throws IOException - { - long start = System.nanoTime(); - - input.seek(position); - input.readFully(buffer, bufferOffset, bufferLength); - - readTimeNanos += System.nanoTime() - start; - } - - @Override - public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) - throws IOException - { - checkNotNull(diskRanges, "diskRanges is null"); - - if (diskRanges.isEmpty()) { - return ImmutableMap.of(); - } - - // TODO: benchmark alternatively strategies: - // 1) sort ranges and perform one read per range - // 2) single read with transferTo() using custom WritableByteChannel - - Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); - - // read ranges - Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, byte[]>(); - for (DiskRange mergedRange : mergedRanges) { - // read full range in one request - byte[] buffer = new byte[mergedRange.getLength()]; - readFully(mergedRange.getOffset(), buffer); - buffers.put(mergedRange, buffer); - } - - ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); - for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); - } - return slices.build(); - } - - @Override - public String toString() - { - return path.getPath(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java deleted file mode 100644 index 73ea475..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/HdfsOrcDataSource.java +++ /dev/null @@ -1,131 +0,0 @@ - -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -import com.facebook.presto.orc.DiskRange; -import com.facebook.presto.orc.OrcDataSource; -import com.google.common.collect.ImmutableMap; -import io.airlift.slice.Slice; -import io.airlift.units.DataSize; -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Map.Entry; - -import static com.facebook.presto.orc.OrcDataSourceUtils.getDiskRangeSlice; -import static com.facebook.presto.orc.OrcDataSourceUtils.mergeAdjacentDiskRanges; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * HDFS File data source class for Orc Reader - * - * Most of code is from Presto - */ -public class HdfsOrcDataSource - implements OrcDataSource -{ - private final FSDataInputStream inputStream; - private final String path; - private final long size; - private final DataSize maxMergeDistance; - private long readTimeNanos; - - public HdfsOrcDataSource(String path, FSDataInputStream inputStream, long size, double maxMergeDistance) - { - this.path = checkNotNull(path, "path is null"); - this.inputStream = checkNotNull(inputStream, "inputStream is null"); - this.size = size; - checkArgument(size >= 0, "size is negative"); - - DataSize mergeDistance = new DataSize(maxMergeDistance, DataSize.Unit.BYTE); - this.maxMergeDistance = checkNotNull(mergeDistance, "maxMergeDistance is null"); - } - - @Override - public void close() - throws IOException - { - inputStream.close(); - } - - @Override - public long getReadTimeNanos() - { - return readTimeNanos; - } - - @Override - public long getSize() - { - return size; - } - - @Override - public void readFully(long position, byte[] buffer) - throws IOException - { - readFully(position, buffer, 0, buffer.length); - } - - @Override - public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) - throws IOException - { - long start = System.nanoTime(); - - inputStream.readFully(position, buffer, bufferOffset, bufferLength); - - readTimeNanos += System.nanoTime() - start; - } - - @Override - public <K> Map<K, Slice> readFully(Map<K, DiskRange> diskRanges) - throws IOException - { - checkNotNull(diskRanges, "diskRanges is null"); - - if (diskRanges.isEmpty()) { - return ImmutableMap.of(); - } - - Iterable<DiskRange> mergedRanges = mergeAdjacentDiskRanges(diskRanges.values(), maxMergeDistance); - - // read ranges - Map<DiskRange, byte[]> buffers = new LinkedHashMap<DiskRange, byte[]>(); - for (DiskRange mergedRange : mergedRanges) { - // read full range in one request - byte[] buffer = new byte[mergedRange.getLength()]; - readFully(mergedRange.getOffset(), buffer); - buffers.put(mergedRange, buffer); - } - - ImmutableMap.Builder<K, Slice> slices = ImmutableMap.builder(); - for (Entry<K, DiskRange> entry : diskRanges.entrySet()) { - slices.put(entry.getKey(), getDiskRangeSlice(entry.getValue(), buffers)); - } - return slices.build(); - } - - @Override - public String toString() - { - return path; - } -} - - http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java deleted file mode 100644 index 208454f..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerColumnStatistics.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.tajo.storage.thirdparty.orc; - -/** - * Statistics for all of the integer columns, such as byte, short, int, and - * long. - */ -public interface IntegerColumnStatistics extends ColumnStatistics { - /** - * Get the smallest value in the column. Only defined if getNumberOfValues - * is non-zero. - * @return the minimum - */ - long getMinimum(); - - /** - * Get the largest value in the column. Only defined if getNumberOfValues - * is non-zero. - * @return the maximum - */ - long getMaximum(); - - /** - * Is the sum defined? If the sum overflowed the counter this will be false. - * @return is the sum available - */ - boolean isSumDefined(); - - /** - * Get the sum of the column. Only valid if isSumDefined returns true. - * @return the sum of the column - */ - long getSum(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java deleted file mode 100644 index 6872882..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/IntegerWriter.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.orc; - -import java.io.IOException; - -/** - * Interface for writing integers. - */ -interface IntegerWriter { - - /** - * Get position from the stream. - * @param recorder - * @throws IOException - */ - void getPosition(PositionRecorder recorder) throws IOException; - - /** - * Write the integer value - * @param value - * @throws IOException - */ - void write(long value) throws IOException; - - /** - * Flush the buffer - * @throws IOException - */ - void flush() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java deleted file mode 100644 index 8cd40f7..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/MemoryManager.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.orc; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; - -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Implements a memory manager that keeps a global context of how many ORC - * writers there are and manages the memory between them. For use cases with - * dynamic partitions, it is easy to end up with many writers in the same task. - * By managing the size of each allocation, we try to cut down the size of each - * allocation and keep the task from running out of memory. - * - * This class is not thread safe, but is re-entrant - ensure creation and all - * invocations are triggered from the same thread. - */ -class MemoryManager { - - private static final Log LOG = LogFactory.getLog(MemoryManager.class); - - /** - * How often should we check the memory sizes? Measured in rows added - * to all of the writers. - */ - private static final int ROWS_BETWEEN_CHECKS = 5000; - private final long totalMemoryPool; - private final Map<Path, WriterInfo> writerList = - new HashMap<Path, WriterInfo>(); - private long totalAllocation = 0; - private double currentScale = 1; - private int rowsAddedSinceCheck = 0; - private final OwnedLock ownerLock = new OwnedLock(); - - @SuppressWarnings("serial") - private static class OwnedLock extends ReentrantLock { - public Thread getOwner() { - return super.getOwner(); - } - } - - private static class WriterInfo { - long allocation; - Callback callback; - WriterInfo(long allocation, Callback callback) { - this.allocation = allocation; - this.callback = callback; - } - } - - public interface Callback { - /** - * The writer needs to check its memory usage - * @param newScale the current scale factor for memory allocations - * @return true if the writer was over the limit - * @throws IOException - */ - boolean checkMemory(double newScale) throws IOException; - } - - /** - * Create the memory manager. - * @param conf use the configuration to find the maximum size of the memory - * pool. - */ - MemoryManager(Configuration conf) { - HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL; - double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); - totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). - getHeapMemoryUsage().getMax() * maxLoad); - ownerLock.lock(); - } - - /** - * Light weight thread-safety check for multi-threaded access patterns - */ - private void checkOwner() { - Preconditions.checkArgument(ownerLock.isHeldByCurrentThread(), - "Owner thread expected %s, got %s", - ownerLock.getOwner(), - Thread.currentThread()); - } - - /** - * Add a new writer's memory allocation to the pool. We use the path - * as a unique key to ensure that we don't get duplicates. - * @param path the file that is being written - * @param requestedAllocation the requested buffer size - */ - void addWriter(Path path, long requestedAllocation, - Callback callback) throws IOException { - checkOwner(); - WriterInfo oldVal = writerList.get(path); - // this should always be null, but we handle the case where the memory - // manager wasn't told that a writer wasn't still in use and the task - // starts writing to the same path. - if (oldVal == null) { - oldVal = new WriterInfo(requestedAllocation, callback); - writerList.put(path, oldVal); - totalAllocation += requestedAllocation; - } else { - // handle a new writer that is writing to the same path - totalAllocation += requestedAllocation - oldVal.allocation; - oldVal.allocation = requestedAllocation; - oldVal.callback = callback; - } - updateScale(true); - } - - /** - * Remove the given writer from the pool. - * @param path the file that has been closed - */ - void removeWriter(Path path) throws IOException { - checkOwner(); - WriterInfo val = writerList.get(path); - if (val != null) { - writerList.remove(path); - totalAllocation -= val.allocation; - if (writerList.isEmpty()) { - rowsAddedSinceCheck = 0; - } - updateScale(false); - } - if(writerList.isEmpty()) { - rowsAddedSinceCheck = 0; - } - } - - /** - * Get the total pool size that is available for ORC writers. - * @return the number of bytes in the pool - */ - long getTotalMemoryPool() { - return totalMemoryPool; - } - - /** - * The scaling factor for each allocation to ensure that the pool isn't - * oversubscribed. - * @return a fraction between 0.0 and 1.0 of the requested size that is - * available for each writer. - */ - double getAllocationScale() { - return currentScale; - } - - /** - * Give the memory manager an opportunity for doing a memory check. - * @throws IOException - */ - void addedRow() throws IOException { - if (++rowsAddedSinceCheck >= ROWS_BETWEEN_CHECKS) { - notifyWriters(); - } - } - - /** - * Notify all of the writers that they should check their memory usage. - * @throws IOException - */ - void notifyWriters() throws IOException { - checkOwner(); - LOG.debug("Notifying writers after " + rowsAddedSinceCheck); - for(WriterInfo writer: writerList.values()) { - boolean flushed = writer.callback.checkMemory(currentScale); - if (LOG.isDebugEnabled() && flushed) { - LOG.debug("flushed " + writer.toString()); - } - } - rowsAddedSinceCheck = 0; - } - - /** - * Update the currentScale based on the current allocation and pool size. - * This also updates the notificationTrigger. - * @param isAllocate is this an allocation? - */ - private void updateScale(boolean isAllocate) throws IOException { - if (totalAllocation <= totalMemoryPool) { - currentScale = 1; - } else { - currentScale = (double) totalMemoryPool / totalAllocation; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java deleted file mode 100644 index dfa4c36..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Metadata.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.orc; - -import com.google.common.collect.Lists; - -import java.util.List; - -public class Metadata { - - private final OrcProto.Metadata metadata; - - Metadata(OrcProto.Metadata m) { - this.metadata = m; - } - - /** - * Return list of stripe level column statistics - * - * @return list of stripe statistics - */ - public List<StripeStatistics> getStripeStatistics() { - List<StripeStatistics> result = Lists.newArrayList(); - for (OrcProto.StripeStatistics ss : metadata.getStripeStatsList()) { - result.add(new StripeStatistics(ss.getColStatsList())); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java index a291953..8f26d21 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcFile.java @@ -21,11 +21,15 @@ package org.apache.tajo.storage.thirdparty.orc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; - -import static org.apache.tajo.storage.thirdparty.orc.OrcConf.ConfVars.*; +import org.apache.orc.CompressionKind; +import org.apache.orc.FileMetaInfo; +import org.apache.orc.FileMetadata; +import org.apache.orc.TypeDescription; +import org.apache.orc.impl.MemoryManager; +import org.apache.tajo.storage.orc.ORCAppender; import java.io.IOException; +import java.util.Properties; import java.util.TimeZone; /** @@ -50,9 +54,9 @@ public final class OrcFile { * prevent the new reader from reading ORC files generated by any released * version of Hive. */ - public static enum Version { + public enum Version { V_0_11("0.11", 0, 11), - V_0_12("0.12", 0, 12); + V_0_12("0.12", 0, 12); public static final Version CURRENT = V_0_12; @@ -102,9 +106,14 @@ public final class OrcFile { * For bugs in the writer, but the old readers already read the new data * correctly, bump this version instead of the Version. */ - public static enum WriterVersion { + public enum WriterVersion { ORIGINAL(0), - HIVE_8732(1); // corrupted stripe/file maximum column statistics + HIVE_8732(1), // corrupted stripe/file maximum column statistics + HIVE_4243(2), // use real column names from Hive tables + HIVE_12055(3), // vectorized writer + + // Don't use any magic numbers here except for the below: + FUTURE(Integer.MAX_VALUE); // a version from a future writer private final int id; @@ -112,67 +121,111 @@ public final class OrcFile { return id; } - private WriterVersion(int id) { + WriterVersion(int id) { this.id = id; } + + private static final WriterVersion[] values; + static { + // Assumes few non-negative values close to zero. + int max = Integer.MIN_VALUE; + for (WriterVersion v : WriterVersion.values()) { + if (v.id < 0) throw new AssertionError(); + if (v.id > max && FUTURE.id != v.id) { + max = v.id; + } + } + values = new WriterVersion[max + 1]; + for (WriterVersion v : WriterVersion.values()) { + if (v.id < values.length) { + values[v.id] = v; + } + } + } + + public static WriterVersion from(int val) { + if (val == FUTURE.id) return FUTURE; // Special handling for the magic value. + return values[val]; + } } + public static final WriterVersion CURRENT_WRITER = WriterVersion.HIVE_12055; - public static enum EncodingStrategy { + public enum EncodingStrategy { SPEED, COMPRESSION; } - public static enum CompressionStrategy { + public enum CompressionStrategy { SPEED, COMPRESSION; } - // Note : these string definitions for table properties are deprecated, - // and retained only for backward compatibility, please do not add to - // them, add to OrcTableProperties below instead - @Deprecated public static final String COMPRESSION = "orc.compress"; - @Deprecated public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size"; - @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size"; - @Deprecated public static final String ROW_INDEX_STRIDE = "orc.row.index.stride"; - @Deprecated public static final String ENABLE_INDEXES = "orc.create.index"; - @Deprecated public static final String BLOCK_PADDING = "orc.block.padding"; + // unused + private OrcFile() {} - /** - * Enum container for all orc table properties. - * If introducing a new orc-specific table property, - * add it here. - */ - public static enum OrcTableProperties { - COMPRESSION("orc.compress"), - COMPRESSION_BLOCK_SIZE("orc.compress.size"), - STRIPE_SIZE("orc.stripe.size"), - BLOCK_SIZE("orc.block.size"), - ROW_INDEX_STRIDE("orc.row.index.stride"), - ENABLE_INDEXES("orc.create.index"), - BLOCK_PADDING("orc.block.padding"), - ENCODING_STRATEGY("orc.encoding.strategy"), - BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"), - BLOOM_FILTER_FPP("orc.bloom.filter.fpp"); + public static class ReaderOptions { + private final Configuration conf; + private FileSystem filesystem; + private FileMetaInfo fileMetaInfo; // TODO: this comes from some place. + private long maxLength = Long.MAX_VALUE; + private FileMetadata fullFileMetadata; // Propagate from LLAP cache. + + public ReaderOptions(Configuration conf) { + this.conf = conf; + } + + public ReaderOptions fileMetaInfo(FileMetaInfo info) { + fileMetaInfo = info; + return this; + } + + public ReaderOptions filesystem(FileSystem fs) { + this.filesystem = fs; + return this; + } + + public ReaderOptions maxLength(long val) { + maxLength = val; + return this; + } - private final String propName; + public ReaderOptions fileMetadata(FileMetadata metadata) { + this.fullFileMetadata = metadata; + return this; + } + + public Configuration getConfiguration() { + return conf; + } - OrcTableProperties(String propName) { - this.propName = propName; + public FileSystem getFilesystem() { + return filesystem; } - public String getPropName(){ - return this.propName; + public FileMetaInfo getFileMetaInfo() { + return fileMetaInfo; + } + + public long getMaxLength() { + return maxLength; + } + + public FileMetadata getFileMetadata() { + return fullFileMetadata; } } - // unused - private OrcFile() {} + public static ReaderOptions readerOptions(Configuration conf) { + return new ReaderOptions(conf); + } + + - public static interface WriterContext { + public interface WriterContext { Writer getWriter(); } - public static interface WriterCallback { - public void preStripeWrite(WriterContext context) throws IOException; - public void preFooterWrite(WriterContext context) throws IOException; + public interface WriterCallback { + void preStripeWrite(WriterContext context) throws IOException; + void preFooterWrite(WriterContext context) throws IOException; } /** @@ -181,7 +234,7 @@ public final class OrcFile { public static class WriterOptions { private final Configuration configuration; private FileSystem fileSystemValue = null; - private ObjectInspector inspectorValue = null; + private TypeDescription schema = null; private long stripeSizeValue; private long blockSizeValue; private int rowIndexStrideValue; @@ -193,45 +246,42 @@ public final class OrcFile { private WriterCallback callback; private EncodingStrategy encodingStrategy; private CompressionStrategy compressionStrategy; - private float paddingTolerance; + private double paddingTolerance; private String bloomFilterColumns; private double bloomFilterFpp; - private TimeZone timezone; - WriterOptions(Configuration conf) { + protected WriterOptions(Properties tableProperties, Configuration conf) { configuration = conf; - memoryManagerValue = getMemoryManager(conf); - stripeSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE); - blockSizeValue = OrcConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE); - rowIndexStrideValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE); - bufferSizeValue = OrcConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE); - blockPaddingValue = OrcConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING); - compressValue = CompressionKind.valueOf(OrcConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)); - String versionName = OrcConf.getVar(conf, HIVE_ORC_WRITE_FORMAT); - if (versionName == null) { - versionValue = Version.CURRENT; - } else { - versionValue = Version.byName(versionName); - } - String enString = - conf.get(OrcConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); - if (enString == null) { - encodingStrategy = EncodingStrategy.SPEED; - } else { - encodingStrategy = EncodingStrategy.valueOf(enString); - } - - String compString = conf - .get(OrcConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); - if (compString == null) { - compressionStrategy = CompressionStrategy.SPEED; - } else { - compressionStrategy = CompressionStrategy.valueOf(compString); - } - - paddingTolerance = conf.getFloat(OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, - OrcConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); - bloomFilterFpp = BloomFilterIO.DEFAULT_FPP; + memoryManagerValue = getStaticMemoryManager(conf); + stripeSizeValue = org.apache.orc.OrcConf.STRIPE_SIZE.getLong(tableProperties, conf); + blockSizeValue = org.apache.orc.OrcConf.BLOCK_SIZE.getLong(tableProperties, conf); + rowIndexStrideValue = + (int) org.apache.orc.OrcConf.ROW_INDEX_STRIDE.getLong(tableProperties, conf); + bufferSizeValue = (int) org.apache.orc.OrcConf.BUFFER_SIZE.getLong(tableProperties, + conf); + blockPaddingValue = + org.apache.orc.OrcConf.BLOCK_PADDING.getBoolean(tableProperties, conf); + compressValue = + CompressionKind.valueOf(org.apache.orc.OrcConf.COMPRESS.getString(tableProperties, + conf)); + String versionName = org.apache.orc.OrcConf.WRITE_FORMAT.getString(tableProperties, + conf); + versionValue = Version.byName(versionName); + String enString = org.apache.orc.OrcConf.ENCODING_STRATEGY.getString(tableProperties, + conf); + encodingStrategy = EncodingStrategy.valueOf(enString); + + String compString = + org.apache.orc.OrcConf.COMPRESSION_STRATEGY.getString(tableProperties, conf); + compressionStrategy = CompressionStrategy.valueOf(compString); + + paddingTolerance = + org.apache.orc.OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(tableProperties, conf); + + bloomFilterColumns = org.apache.orc.OrcConf.BLOOM_FILTER_COLUMNS.getString(tableProperties, + conf); + bloomFilterFpp = org.apache.orc.OrcConf.BLOOM_FILTER_FPP.getDouble(tableProperties, + conf); } /** @@ -302,7 +352,7 @@ public final class OrcFile { /** * Sets the tolerance for block padding as a percentage of stripe size. */ - public WriterOptions paddingTolerance(float value) { + public WriterOptions paddingTolerance(double value) { paddingTolerance = value; return this; } @@ -318,7 +368,7 @@ public final class OrcFile { /** * Specify the false positive probability for bloom filter. * @param fpp - false positive probability - * @return + * @return this */ public WriterOptions bloomFilterFpp(double fpp) { bloomFilterFpp = fpp; @@ -334,11 +384,12 @@ public final class OrcFile { } /** - * A required option that sets the object inspector for the rows. Used - * to determine the schema for the file. + * Set the schema for the file. This is a required parameter. + * @param schema the schema for the file. + * @return this */ - public WriterOptions inspector(ObjectInspector value) { - inspectorValue = value; + public WriterOptions setSchema(TypeDescription schema) { + this.schema = schema; return this; } @@ -353,7 +404,7 @@ public final class OrcFile { /** * Add a listener for when the stripe and file are about to be closed. * @param callback the object to be called when the stripe is closed - * @return + * @return this */ public WriterOptions callback(WriterCallback callback) { this.callback = callback; @@ -363,25 +414,112 @@ public final class OrcFile { /** * A package local option to set the memory manager. */ - WriterOptions memory(MemoryManager value) { + protected WriterOptions memory(MemoryManager value) { memoryManagerValue = value; return this; } - /** - * Tajo-specific - */ - WriterOptions timezone(TimeZone value) { - timezone = value; - return this; + public boolean getBlockPadding() { + return blockPaddingValue; + } + + public long getBlockSize() { + return blockSizeValue; + } + + public String getBloomFilterColumns() { + return bloomFilterColumns; } + + public FileSystem getFileSystem() { + return fileSystemValue; + } + + public Configuration getConfiguration() { + return configuration; + } + + public TypeDescription getSchema() { + return schema; + } + + public long getStripeSize() { + return stripeSizeValue; + } + + public CompressionKind getCompress() { + return compressValue; + } + + public WriterCallback getCallback() { + return callback; + } + + public Version getVersion() { + return versionValue; + } + + public MemoryManager getMemoryManager() { + return memoryManagerValue; + } + + public int getBufferSize() { + return bufferSizeValue; + } + + public int getRowIndexStride() { + return rowIndexStrideValue; + } + + public CompressionStrategy getCompressionStrategy() { + return compressionStrategy; + } + + public EncodingStrategy getEncodingStrategy() { + return encodingStrategy; + } + + public double getPaddingTolerance() { + return paddingTolerance; + } + + public double getBloomFilterFpp() { + return bloomFilterFpp; + } + } + + /** + * Create a set of writer options based on a configuration. + * @param conf the configuration to use for values + * @return A WriterOptions object that can be modified + */ + public static ORCAppender.WriterOptions writerOptions(Configuration conf) { + return new ORCAppender.WriterOptions(null, conf); } /** - * Create a default set of write options that can be modified. + * Create a set of write options based on a set of table properties and + * configuration. + * @param tableProperties the properties of the table + * @param conf the configuration of the query + * @return a WriterOptions object that can be modified */ - public static WriterOptions writerOptions(Configuration conf) { - return new WriterOptions(conf); + public static WriterOptions writerOptions(Properties tableProperties, + Configuration conf) { + return new WriterOptions(tableProperties, conf); + } + + private static synchronized MemoryManager getStaticMemoryManager( + final Configuration conf) { + if (memoryManager == null) { + memoryManager = new ThreadLocal<MemoryManager>() { + @Override + protected MemoryManager initialValue() { + return new MemoryManager(conf); + } + }; + } + return memoryManager.get(); } /** @@ -393,54 +531,13 @@ public final class OrcFile { * @throws IOException */ public static Writer createWriter(Path path, - WriterOptions opts - ) throws IOException { - FileSystem fs = opts.fileSystemValue == null ? - path.getFileSystem(opts.configuration) : opts.fileSystemValue; - - return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue, - opts.stripeSizeValue, opts.compressValue, - opts.bufferSizeValue, opts.rowIndexStrideValue, - opts.memoryManagerValue, opts.blockPaddingValue, - opts.versionValue, opts.callback, - opts.encodingStrategy, opts.compressionStrategy, - opts.paddingTolerance, opts.blockSizeValue, - opts.bloomFilterColumns, opts.bloomFilterFpp, - opts.timezone); - } + WriterOptions opts, + TimeZone timeZone + ) throws IOException { + FileSystem fs = opts.getFileSystem() == null ? + path.getFileSystem(opts.getConfiguration()) : opts.getFileSystem(); - /** - * Create an ORC file writer. This method is provided for API backward - * compatability with Hive 0.11. - * @param fs file system - * @param path filename to write to - * @param inspector the ObjectInspector that inspects the rows - * @param stripeSize the number of bytes in a stripe - * @param compress how to compress the file - * @param bufferSize the number of bytes to compress at once - * @param rowIndexStride the number of rows between row index entries or - * 0 to suppress all indexes - * @return a new ORC file writer - * @throws IOException - */ - public static Writer createWriter(FileSystem fs, - Path path, - Configuration conf, - ObjectInspector inspector, - long stripeSize, - CompressionKind compress, - int bufferSize, - int rowIndexStride, - TimeZone timeZone) throws IOException { - return createWriter(path, - writerOptions(conf) - .fileSystem(fs) - .inspector(inspector) - .stripeSize(stripeSize) - .compress(compress) - .bufferSize(bufferSize) - .rowIndexStride(rowIndexStride) - .timezone(timeZone)); + return new WriterImpl(fs, path, opts, timeZone); } private static ThreadLocal<MemoryManager> memoryManager = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/a9e8cbc4/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java new file mode 100644 index 0000000..7194bf4 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/OrcRecordReader.java @@ -0,0 +1,454 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.orc.*; +import org.apache.orc.OrcProto; +import org.apache.orc.impl.*; +import org.apache.orc.impl.StreamName; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.thirdparty.orc.TreeReaderFactory.DatumTreeReader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; + +public class OrcRecordReader implements Closeable { + + private final Log LOG = LogFactory.getLog(OrcRecordReader.class); + + private final Path path; + private final long firstRow; + private final List<StripeInformation> stripes = new ArrayList<>(); + private OrcProto.StripeFooter stripeFooter; + private final long totalRowCount; + private final CompressionCodec codec; + private final List<OrcProto.Type> types; + private final int bufferSize; + private final boolean[] included; + private final long rowIndexStride; + private long rowInStripe = 0; + private int currentStripe = -1; + private long rowBaseInStripe = 0; + private long rowCountInStripe = 0; + private final Map<org.apache.orc.impl.StreamName, InStream> streams = new HashMap<>(); + DiskRangeList bufferChunks = null; + private final TreeReaderFactory.DatumTreeReader[] reader; + private final OrcProto.RowIndex[] indexes; + private final OrcProto.BloomFilterIndex[] bloomFilterIndices; + private final Configuration conf; + private final MetadataReader metadata; + private final DataReader dataReader; + private final Tuple result; + + public OrcRecordReader(List<StripeInformation> stripes, + FileSystem fileSystem, + Schema schema, + Column[] target, + FileFragment fragment, + List<OrcProto.Type> types, + CompressionCodec codec, + int bufferSize, + long strideRate, + Reader.Options options, + Configuration conf, + TimeZone timeZone) throws IOException { + + result = new VTuple(target.length); + + this.conf = conf; + this.path = fragment.getPath(); + this.codec = codec; + this.types = types; + this.bufferSize = bufferSize; + this.included = new boolean[schema.size() + 1]; + included[0] = target.length > 0; // always include root column except when target schema size is 0 + Schema targetSchema = new Schema(target); + for (int i = 1; i < included.length; i++) { + included[i] = targetSchema.contains(schema.getColumn(i - 1)); + } + this.rowIndexStride = strideRate; + this.metadata = new MetadataReaderImpl(fileSystem, path, codec, bufferSize, types.size()); + + long rows = 0; + long skippedRows = 0; + long offset = fragment.getStartKey(); + long maxOffset = fragment.getStartKey() + fragment.getLength(); + for(StripeInformation stripe: stripes) { + long stripeStart = stripe.getOffset(); + if (offset > stripeStart) { + skippedRows += stripe.getNumberOfRows(); + } else if (stripeStart < maxOffset) { + this.stripes.add(stripe); + rows += stripe.getNumberOfRows(); + } + } + + // TODO: we could change the ctor to pass this externally + this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, options.getUseZeroCopy(), codec); + this.dataReader.open(); + + firstRow = skippedRows; + totalRowCount = rows; + + reader = new DatumTreeReader[target.length]; + for (int i = 0; i < reader.length; i++) { + reader[i] = TreeReaderFactory.createTreeReader(timeZone, schema.getColumnId(target[i].getQualifiedName()), target[i], + options.getSkipCorruptRecords()); + } + + indexes = new OrcProto.RowIndex[types.size()]; + bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; + advanceToNextRow(reader, 0L, true); + } + + /** + * Plan the ranges of the file that we need to read given the list of + * columns and row groups. + * + * @param streamList the list of streams available + * @param includedColumns which columns are needed + * @param doMergeBuffers + * @return the list of disk ranges that will be loaded + */ + static DiskRangeList planReadPartialDataStreams + (List<OrcProto.Stream> streamList, + boolean[] includedColumns, + boolean doMergeBuffers) { + long offset = 0; + // figure out which columns have a present stream + DiskRangeList.CreateHelper list = new DiskRangeList.CreateHelper(); + for (OrcProto.Stream stream : streamList) { + long length = stream.getLength(); + int column = stream.getColumn(); + OrcProto.Stream.Kind streamKind = stream.getKind(); + // since stream kind is optional, first check if it exists + if (stream.hasKind() && + (org.apache.orc.impl.StreamName.getArea(streamKind) == org.apache.orc.impl.StreamName.Area.DATA) && + includedColumns[column]) { + RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers); + } + offset += length; + } + return list.extract(); + } + + void createStreams(List<OrcProto.Stream> streamDescriptions, + DiskRangeList ranges, + boolean[] includeColumn, + CompressionCodec codec, + int bufferSize, + Map<org.apache.orc.impl.StreamName, InStream> streams) throws IOException { + long streamOffset = 0; + for (OrcProto.Stream streamDesc : streamDescriptions) { + int column = streamDesc.getColumn(); + if ((includeColumn != null && !includeColumn[column]) || + streamDesc.hasKind() && + (org.apache.orc.impl.StreamName.getArea(streamDesc.getKind()) != org.apache.orc.impl.StreamName.Area.DATA)) { + streamOffset += streamDesc.getLength(); + continue; + } + List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers( + ranges, streamOffset, streamDesc.getLength()); + org.apache.orc.impl.StreamName name = new StreamName(column, streamDesc.getKind()); + streams.put(name, InStream.create(name.toString(), buffers, + streamDesc.getLength(), codec, bufferSize)); + streamOffset += streamDesc.getLength(); + } + } + + private void readPartialDataStreams(StripeInformation stripe) throws IOException { + List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); + DiskRangeList toRead = planReadPartialDataStreams(streamList, included, true); + if (LOG.isDebugEnabled()) { + LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); + } + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); + if (LOG.isDebugEnabled()) { + LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks)); + } + + createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); + } + + /** + * Skip over rows that we aren't selecting, so that the next row is + * one that we will read. + * + * @param nextRow the row we want to go to + * @throws IOException + */ + private boolean advanceToNextRow( + TreeReaderFactory.TreeReader[] reader, long nextRow, boolean canAdvanceStripe) + throws IOException { + long nextRowInStripe = nextRow - rowBaseInStripe; + + if (nextRowInStripe >= rowCountInStripe) { + if (canAdvanceStripe) { + advanceStripe(); + } + return canAdvanceStripe; + } + if (nextRowInStripe != rowInStripe) { + if (rowIndexStride != 0) { + int rowGroup = (int) (nextRowInStripe / rowIndexStride); + seekToRowEntry(reader, rowGroup); + for (TreeReaderFactory.TreeReader eachReader : reader) { + eachReader.skipRows(nextRowInStripe - rowGroup * rowIndexStride); + } + } else { + for (TreeReaderFactory.TreeReader eachReader : reader) { + eachReader.skipRows(nextRowInStripe - rowInStripe); + } + } + rowInStripe = nextRowInStripe; + } + return true; + } + + public boolean hasNext() throws IOException { + return rowInStripe < rowCountInStripe; + } + + public Tuple next() throws IOException { + if (hasNext()) { + try { + for (int i = 0; i < reader.length; i++) { + result.put(i, reader[i].next()); + } + // find the next row + rowInStripe += 1; + advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); + return result; + } catch (IOException e) { + // Rethrow exception with file name in log message + throw new IOException("Error reading file: " + path, e); + } + } else { + return null; + } + } + + /** + * Read the next stripe until we find a row that we don't skip. + * + * @throws IOException + */ + private void advanceStripe() throws IOException { + rowInStripe = rowCountInStripe; + while (rowInStripe >= rowCountInStripe && + currentStripe < stripes.size() - 1) { + currentStripe += 1; + readStripe(); + } + } + + /** + * Read the current stripe into memory. + * + * @throws IOException + */ + private void readStripe() throws IOException { + StripeInformation stripe = beginReadStripe(); + + // if we haven't skipped the whole stripe, read the data + if (rowInStripe < rowCountInStripe) { + // if we aren't projecting columns or filtering rows, just read it all + if (included == null) { + readAllDataStreams(stripe); + } else { + readPartialDataStreams(stripe); + } + + for (TreeReaderFactory.TreeReader eachReader : reader) { + eachReader.startStripe(streams, stripeFooter); + } + // if we skipped the first row group, move the pointers forward + if (rowInStripe != 0) { + seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride)); + } + } + } + + private void clearStreams() throws IOException { + // explicit close of all streams to de-ref ByteBuffers + for (InStream is : streams.values()) { + is.close(); + } + if (bufferChunks != null) { + if (dataReader.isTrackingDiskRanges()) { + for (DiskRangeList range = bufferChunks; range != null; range = range.next) { + if (!(range instanceof BufferChunk)) { + continue; + } + dataReader.releaseBuffer(((BufferChunk) range).getChunk()); + } + } + } + bufferChunks = null; + streams.clear(); + } + + OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { + return metadata.readStripeFooter(stripe); + } + + private StripeInformation beginReadStripe() throws IOException { + StripeInformation stripe = stripes.get(currentStripe); + stripeFooter = readStripeFooter(stripe); + clearStreams(); + // setup the position in the stripe + rowCountInStripe = stripe.getNumberOfRows(); + rowInStripe = 0; + rowBaseInStripe = 0; + for (int i = 0; i < currentStripe; ++i) { + rowBaseInStripe += stripes.get(i).getNumberOfRows(); + } + // reset all of the indexes + for (int i = 0; i < indexes.length; ++i) { + indexes[i] = null; + } + return stripe; + } + + private void readAllDataStreams(StripeInformation stripe) throws IOException { + long start = stripe.getIndexLength(); + long end = start + stripe.getDataLength(); + // explicitly trigger 1 big read + DiskRangeList toRead = new DiskRangeList(start, end); + bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); + List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList(); + createStreams(streamDescriptions, bufferChunks, included, codec, bufferSize, streams); + } + + public long getRowNumber() { + return rowInStripe + rowBaseInStripe + firstRow; + } + + public float getProgress() { + return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; + } + + private int findStripe(long rowNumber) { + for (int i = 0; i < stripes.size(); i++) { + StripeInformation stripe = stripes.get(i); + if (stripe.getNumberOfRows() > rowNumber) { + return i; + } + rowNumber -= stripe.getNumberOfRows(); + } + throw new IllegalArgumentException("Seek after the end of reader range"); + } + + OrcIndex readRowIndex( + int stripeIndex, boolean[] included) throws IOException { + return readRowIndex(stripeIndex, included, null, null); + } + + OrcIndex readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes, + OrcProto.BloomFilterIndex[] bloomFilterIndex) throws IOException { + StripeInformation stripe = stripes.get(stripeIndex); + OrcProto.StripeFooter stripeFooter = null; + // if this is the current stripe, use the cached objects. + if (stripeIndex == currentStripe) { + stripeFooter = this.stripeFooter; + indexes = indexes == null ? this.indexes : indexes; + bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex; + } + return metadata.readRowIndex(stripe, stripeFooter, included, indexes, null, + bloomFilterIndex); + } + + private void seekToRowEntry(TreeReaderFactory.TreeReader []reader, int rowEntry) + throws IOException { + PositionProvider[] index = new PositionProvider[indexes.length]; + for (int i = 0; i < indexes.length; ++i) { + if (indexes[i] != null) { + index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry)); + } + } + for (TreeReaderFactory.TreeReader eachReader : reader) { + eachReader.seek(index); + } + } + + public void seekToRow(long rowNumber) throws IOException { + if (rowNumber < 0) { + throw new IllegalArgumentException("Seek to a negative row number " + + rowNumber); + } else if (rowNumber < firstRow) { + throw new IllegalArgumentException("Seek before reader range " + + rowNumber); + } + // convert to our internal form (rows from the beginning of slice) + rowNumber -= firstRow; + + // move to the right stripe + int rightStripe = findStripe(rowNumber); + if (rightStripe != currentStripe) { + currentStripe = rightStripe; + readStripe(); + } + readRowIndex(currentStripe, included); + + // if we aren't to the right row yet, advance in the stripe. + advanceToNextRow(reader, rowNumber, true); + } + + public long getNumBytes() { + return ((RecordReaderUtils.DefaultDataReader)dataReader).getReadBytes(); + } + + @Override + public void close() throws IOException { + clearStreams(); + dataReader.close(); + } + + public static final class PositionProviderImpl implements PositionProvider { + private final OrcProto.RowIndexEntry entry; + private int index; + + public PositionProviderImpl(OrcProto.RowIndexEntry entry) { + this(entry, 0); + } + + public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) { + this.entry = entry; + this.index = startPos; + } + + @Override + public long getNext() { + return entry.getPositions(index++); + } + } +}
