Repository: tajo Updated Branches: refs/heads/master 22d2ba4db -> 18b898ffb
TAJO-1718: Refine code for Parquet 1.8.1. Closes #663 Signed-off-by: Hyunsik Choi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/18b898ff Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/18b898ff Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/18b898ff Branch: refs/heads/master Commit: 18b898ffbab0462ae26c0cf374b119dcee5f1c6f Parents: 22d2ba4 Author: Jongyoung Park <[email protected]> Authored: Mon Jul 27 18:13:24 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Wed Jul 29 20:15:11 2015 +0900 ---------------------------------------------------------------------- tajo-storage/tajo-storage-hdfs/pom.xml | 16 +- .../tajo/storage/parquet/ParquetAppender.java | 6 +- .../tajo/storage/parquet/TajoParquetReader.java | 4 +- .../tajo/storage/parquet/TajoParquetWriter.java | 4 +- .../tajo/storage/parquet/TajoReadSupport.java | 10 +- .../storage/parquet/TajoRecordConverter.java | 12 +- .../storage/parquet/TajoRecordMaterializer.java | 6 +- .../storage/parquet/TajoSchemaConverter.java | 10 +- .../tajo/storage/parquet/TajoWriteSupport.java | 12 +- .../thirdparty/parquet/CodecFactory.java | 190 ------- .../parquet/ColumnChunkPageWriteStore.java | 206 -------- .../parquet/InternalParquetRecordReader.java | 190 ------- .../parquet/InternalParquetRecordWriter.java | 160 ------ .../thirdparty/parquet/ParquetFileWriter.java | 492 ------------------- .../thirdparty/parquet/ParquetReader.java | 146 ------ .../thirdparty/parquet/ParquetWriter.java | 224 --------- .../storage/parquet/TestSchemaConverter.java | 4 +- 17 files changed, 36 insertions(+), 1656 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index bfa5707..1784ab3 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -34,8 +34,6 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <parquet.version>1.5.0</parquet.version> - <parquet.format.version>2.1.0</parquet.format.version> </properties> <repositories> @@ -334,19 +332,9 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-column</artifactId> - <version>${parquet.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> + <groupId>org.apache.parquet</groupId> <artifactId>parquet-hadoop</artifactId> - <version>${parquet.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-format</artifactId> - <version>${parquet.format.version}</version> + <version>1.8.1</version> </dependency> <dependency> <groupId>io.netty</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 45960aa..4a8b256 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -21,8 +21,8 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.storage.StorageConstants; -import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -133,7 +133,7 @@ public class ParquetAppender extends FileAppender { } public long getEstimatedOutputSize() throws IOException { - return writer.getEstimatedWrittenSize(); + return writer.getDataSize(); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java index a765f48..1a6545f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java @@ -19,10 +19,10 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; -import parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter.UnboundRecordFilter; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java index 5f220c5..8e6ae3e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java @@ -19,10 +19,10 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; -import parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java index a64e987..4a3300c 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java @@ -21,11 +21,11 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import parquet.Log; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.Log; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; import java.util.Map; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java index 7f236b6..43c55e1 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -27,12 +27,12 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import parquet.io.api.Binary; -import parquet.io.api.Converter; -import parquet.io.api.GroupConverter; -import parquet.io.api.PrimitiveConverter; -import parquet.schema.GroupType; -import parquet.schema.Type; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java index 436159c..f762820 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java @@ -21,9 +21,9 @@ package org.apache.tajo.storage.parquet; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; /** * Materializes a Tajo Tuple from a stream of Parquet data. http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java index 555b623..e0cf64b 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java @@ -21,11 +21,11 @@ package org.apache.tajo.storage.parquet; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import java.util.ArrayList; import java.util.List; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java index de2a1e3..9613a25 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java @@ -24,12 +24,12 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; import org.apache.tajo.storage.Tuple; -import parquet.hadoop.api.WriteSupport; -import parquet.io.api.Binary; -import parquet.io.api.RecordConsumer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import java.util.HashMap; import java.util.List; http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java deleted file mode 100644 index 4ba47c1..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.*; -import org.apache.hadoop.util.ReflectionUtils; -import parquet.bytes.BytesInput; -import parquet.hadoop.BadConfigurationException; -import parquet.hadoop.metadata.CompressionCodecName; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - -class CodecFactory { - - public static class BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - public BytesDecompressor(CompressionCodec codec) { - this.codec = codec; - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } else { - decompressed = bytes; - } - return decompressed; - } - - private void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - /** - * Encapsulates the logic around hadoop compression - * - * @author Julien Le Dem - * - */ - public static class BytesCompressor { - - private final CompressionCodec codec; - private final Compressor compressor; - private final ByteArrayOutputStream compressedOutBuffer; - private final CompressionCodecName codecName; - - public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { - this.codecName = codecName; - this.codec = codec; - if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); - this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); - } else { - this.compressor = null; - this.compressedOutBuffer = null; - } - } - - public BytesInput compress(BytesInput bytes) throws IOException { - final BytesInput compressedBytes; - if (codec == null) { - compressedBytes = bytes; - } else { - compressedOutBuffer.reset(); - if (compressor != null) { - // null compressor for non-native gzip - compressor.reset(); - } - CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); - bytes.writeAllTo(cos); - cos.finish(); - cos.close(); - compressedBytes = BytesInput.from(compressedOutBuffer); - } - return compressedBytes; - } - - private void release() { - if (compressor != null) { - CodecPool.returnCompressor(compressor); - } - } - - public CompressionCodecName getCodecName() { - return codecName; - } - - } - - private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); - private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); - private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); - private final Configuration configuration; - - public CodecFactory(Configuration configuration) { - this.configuration = configuration; - } - - /** - * - * @param codecName the requested codec - * @return the corresponding hadoop codec. null if UNCOMPRESSED - */ - private CompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - CompressionCodec codec = codecByName.get(codecClassName); - if (codec != null) { - return codec; - } - - try { - Class<?> codecClass = Class.forName(codecClassName); - codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { - BytesCompressor comp = compressors.get(codecName); - if (comp == null) { - CompressionCodec codec = getCodec(codecName); - comp = new BytesCompressor(codecName, codec, pageSize); - compressors.put(codecName, comp); - } - return comp; - } - - public BytesDecompressor getDecompressor(CompressionCodecName codecName) { - BytesDecompressor decomp = decompressors.get(codecName); - if (decomp == null) { - CompressionCodec codec = getCodec(codecName); - decomp = new BytesDecompressor(codec); - decompressors.put(codecName, decomp); - } - return decomp; - } - - public void release() { - for (BytesCompressor compressor : compressors.values()) { - compressor.release(); - } - compressors.clear(); - for (BytesDecompressor decompressor : decompressors.values()) { - decompressor.release(); - } - decompressors.clear(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java deleted file mode 100644 index 91d4748..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import parquet.Log; -import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageWriteStore; -import parquet.column.page.PageWriter; -import parquet.column.statistics.BooleanStatistics; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.util.*; - -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; - -class ColumnChunkPageWriteStore implements PageWriteStore { - private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private static final class ColumnChunkPageWriter implements PageWriter { - - private final ColumnDescriptor path; - private final BytesCompressor compressor; - - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - private Set<Encoding> encodings = new HashSet<Encoding>(); - - private Statistics totalStatistics; - - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { - this.path = path; - this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); - } - - @Deprecated - @Override - public void writePage(BytesInput bytes, - int valueCount, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, - int valueCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public long getMemSize() { - return buf.size(); - } - - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.startColumn(path, totalValueCount, compressor.getCodecName()); - if (dictionaryPage != null) { - writer.writeDictionaryPage(dictionaryPage); - encodings.add(dictionaryPage.getEncoding()); - } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); - writer.endColumn(); - if (DEBUG) { - LOG.debug( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); - } - encodings.clear(); - pageCount = 0; - } - - @Override - public long allocatedSize() { - return buf.getCapacity(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - } - - private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); - private final MessageType schema; - private final BytesCompressor compressor; - private final int initialSize; - - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { - this.compressor = compressor; - this.schema = schema; - this.initialSize = initialSize; - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - if (!writers.containsKey(path)) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); - } - return writers.get(path); - } - - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - List<ColumnDescriptor> columns = schema.getColumns(); - for (ColumnDescriptor columnDescriptor : columns) { - ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); - pageWriter.writeToFileWriter(writer); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java deleted file mode 100644 index 10ac6de..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.column.ColumnDescriptor; -import parquet.column.page.PageReadStore; -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.util.counters.BenchmarkCounter; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.ParquetDecodingException; -import parquet.io.api.RecordMaterializer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static java.lang.String.format; -import static parquet.Log.DEBUG; - -class InternalParquetRecordReader<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); - - private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); - - private MessageType requestedSchema; - private MessageType fileSchema; - private int columnCount; - private final ReadSupport<T> readSupport; - - private RecordMaterializer<T> recordConverter; - - private T currentValue; - private long total; - private int current = 0; - private int currentBlock = -1; - private ParquetFileReader reader; - private parquet.io.RecordReader<T> recordReader; - private UnboundRecordFilter recordFilter; - - private long totalTimeSpentReadingBytes; - private long totalTimeSpentProcessingRecords; - private long startedAssemblingCurrentBlockAt; - - private long totalCountLoadedSoFar = 0; - - private Path file; - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport) { - this(readSupport, null); - } - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - * @param filter Optional filter for only returning matching records. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter - filter) { - this.readSupport = readSupport; - this.recordFilter = filter; - } - - private void checkRead() throws IOException { - if (current == totalCountLoadedSoFar) { - if (current != 0) { - long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; - totalTimeSpentProcessingRecords += timeAssembling; - if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); - long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; - long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; - long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; - if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); - } - - if (DEBUG) LOG.debug("at row " + current + ". reading next block"); - long t0 = System.currentTimeMillis(); - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { - throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); - } - long timeSpentReading = System.currentTimeMillis() - t0; - totalTimeSpentReadingBytes += timeSpentReading; - BenchmarkCounter.incrementTime(timeSpentReading); - if (DEBUG) { - LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); - LOG.debug("initializing Record assembly with requested schema " + requestedSchema); - } - MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); - startedAssemblingCurrentBlockAt = System.currentTimeMillis(); - totalCountLoadedSoFar += pages.getRowCount(); - ++ currentBlock; - } - } - - public void close() throws IOException { - reader.close(); - } - - public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - public T getCurrentValue() throws IOException, - InterruptedException { - return currentValue; - } - - public float getProgress() throws IOException, InterruptedException { - return (float) current / total; - } - - public void initialize(MessageType requestedSchema, MessageType fileSchema, - Map<String, String> extraMetadata, Map<String, String> readSupportMetadata, - Path file, List<BlockMetaData> blocks, Configuration configuration) - throws IOException { - this.requestedSchema = requestedSchema; - this.fileSchema = fileSchema; - this.file = file; - this.columnCount = this.requestedSchema.getPaths().size(); - this.recordConverter = readSupport.prepareForRead( - configuration, extraMetadata, fileSchema, - new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); - - List<ColumnDescriptor> columns = requestedSchema.getColumns(); - reader = new ParquetFileReader(configuration, file, blocks, columns); - for (BlockMetaData block : blocks) { - total += block.getRowCount(); - } - if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records."); - } - - private boolean contains(GroupType group, String[] path, int index) { - if (index == path.length) { - return false; - } - if (group.containsField(path[index])) { - Type type = group.getType(path[index]); - if (type.isPrimitive()) { - return index + 1 == path.length; - } else { - return contains(type.asGroupType(), path, index + 1); - } - } - return false; - } - - public boolean nextKeyValue() throws IOException, InterruptedException { - if (current < total) { - try { - checkRead(); - currentValue = recordReader.read(); - if (DEBUG) LOG.debug("read value: " + currentValue); - current ++; - } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); - } - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java deleted file mode 100644 index da57745..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import parquet.Log; -import parquet.column.ParquetProperties.WriterVersion; -import parquet.column.impl.ColumnWriteStoreImpl; -import parquet.hadoop.api.WriteSupport; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.util.Map; - -import static java.lang.Math.max; -import static java.lang.Math.min; -import static java.lang.String.format; -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; -import static parquet.Preconditions.checkNotNull; - -class InternalParquetRecordWriter<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - - private final ParquetFileWriter w; - private final WriteSupport<T> writeSupport; - private final MessageType schema; - private final Map<String, String> extraMetaData; - private final int blockSize; - private final int pageSize; - private final BytesCompressor compressor; - private final int dictionaryPageSize; - private final boolean enableDictionary; - private final boolean validating; - private final WriterVersion writerVersion; - - private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; - - private ColumnWriteStoreImpl store; - private ColumnChunkPageWriteStore pageStore; - - /** - * @param w the file to write to - * @param writeSupport the class to convert incoming records - * @param schema the schema of the records - * @param extraMetaData extra meta data to write in the footer of the file - * @param blockSize the size of a block in the file (this will be approximate) - * @param codec the codec used to compress - */ - public InternalParquetRecordWriter( - ParquetFileWriter w, - WriteSupport<T> writeSupport, - MessageType schema, - Map<String, String> extraMetaData, - int blockSize, - int pageSize, - BytesCompressor compressor, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - WriterVersion writerVersion) { - this.w = w; - this.writeSupport = checkNotNull(writeSupport, "writeSupport"); - this.schema = schema; - this.extraMetaData = extraMetaData; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.compressor = compressor; - this.dictionaryPageSize = dictionaryPageSize; - this.enableDictionary = enableDictionary; - this.validating = validating; - this.writerVersion = writerVersion; - initStore(); - } - - private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); - MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); - } - - public void close() throws IOException, InterruptedException { - flushStore(); - w.end(extraMetaData); - } - - public void write(T value) throws IOException, InterruptedException { - writeSupport.write(value); - ++ recordCount; - checkBlockSizeReached(); - } - - private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. - long memSize = store.memSize(); - if (memSize > blockSize) { - if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); - flushStore(); - initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); - } else { - float recordSize = (float) memSize / recordCount; - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); - if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); - } - } - } - - public long getEstimatedWrittenSize() throws IOException { - return w.getPos() + store.memSize(); - } - - private void flushStore() - throws IOException { - if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); - if (store.allocatedSize() > 3 * blockSize) { - LOG.warn("Too much memory used: " + store.memUsageString()); - } - w.startBlock(recordCount); - store.flush(); - pageStore.flushToFileWriter(w); - recordCount = 0; - w.endBlock(); - store = null; - pageStore = null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java deleted file mode 100644 index ac1c421..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java +++ /dev/null @@ -1,492 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.Version; -import parquet.bytes.BytesInput; -import parquet.bytes.BytesUtils; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DictionaryPage; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.metadata.*; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.*; -import java.util.Map.Entry; - -import static parquet.Log.DEBUG; -import static parquet.format.Util.writeFileMetaData; - -/** - * Internal implementation of the Parquet file writer as a block container - * - * @author Julien Le Dem - * - */ -public class ParquetFileWriter { - private static final Log LOG = Log.getLog(ParquetFileWriter.class); - - public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); - public static final int CURRENT_VERSION = 1; - - private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); - - private final MessageType schema; - private final FSDataOutputStream out; - private BlockMetaData currentBlock; - private ColumnChunkMetaData currentColumn; - private long currentRecordCount; - private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - private long uncompressedLength; - private long compressedLength; - private Set<parquet.column.Encoding> currentEncodings; - - private CompressionCodecName currentChunkCodec; - private ColumnPath currentChunkPath; - private PrimitiveTypeName currentChunkType; - private long currentChunkFirstDataPage; - private long currentChunkDictionaryPageOffset; - private long currentChunkValueCount; - - private Statistics currentStatistics; - - /** - * Captures the order in which methods should be called - * - * @author Julien Le Dem - * - */ - private enum STATE { - NOT_STARTED { - STATE start() { - return STARTED; - } - }, - STARTED { - STATE startBlock() { - return BLOCK; - } - STATE end() { - return ENDED; - } - }, - BLOCK { - STATE startColumn() { - return COLUMN; - } - STATE endBlock() { - return STARTED; - } - }, - COLUMN { - STATE endColumn() { - return BLOCK; - }; - STATE write() { - return this; - } - }, - ENDED; - - STATE start() throws IOException { return error(); } - STATE startBlock() throws IOException { return error(); } - STATE startColumn() throws IOException { return error(); } - STATE write() throws IOException { return error(); } - STATE endColumn() throws IOException { return error(); } - STATE endBlock() throws IOException { return error(); } - STATE end() throws IOException { return error(); } - - private final STATE error() throws IOException { - throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); - } - } - - private STATE state = STATE.NOT_STARTED; - - /** - * - * @param configuration Configuration - * @param schema the schema of the data - * @param file the file to write to - * @throws java.io.IOException if the file can not be created - */ - public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { - super(); - this.schema = schema; - FileSystem fs = file.getFileSystem(configuration); - this.out = fs.create(file, false); - } - - /** - * start the file - * @throws java.io.IOException - */ - public void start() throws IOException { - state = state.start(); - if (DEBUG) LOG.debug(out.getPos() + ": start"); - out.write(MAGIC); - } - - /** - * start a block - * @param recordCount the record count in this block - * @throws java.io.IOException - */ - public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": start block"); -// out.write(MAGIC); // TODO: add a magic delimiter - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; - } - - /** - * start a column inside a block - * @param descriptor the column descriptor - * @param valueCount the value count in this column - * @param statistics the statistics in this column - * @param compressionCodecName - * @throws java.io.IOException - */ - public void startColumn(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); - currentEncodings = new HashSet<parquet.column.Encoding>(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = out.getPos(); - compressedLength = 0; - uncompressedLength = 0; - // need to know what type of stats to initialize to - // better way to do this? - currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); - } - - /** - * writes a dictionary page page - * @param dictionaryPage the dictionary page - */ - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out); - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); - currentEncodings.add(dictionaryPage.getEncoding()); - } - - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentStatistics.mergeStatistics(statistics); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a number of pages at once - * @param bytes bytes to be written including page headers - * @param uncompressedTotalPageSize total uncompressed size (without page headers) - * @param compressedTotalPageSize total compressed size (without page headers) - * @throws java.io.IOException - */ - void writeDataPages(BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - List<parquet.column.Encoding> encodings) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); - bytes.writeAllTo(out); - currentEncodings.addAll(encodings); - currentStatistics = totalStats; - } - - /** - * end a column (once all rep, def and data have been written) - * @throws java.io.IOException - */ - public void endColumn() throws IOException { - state = state.endColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": end column"); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength)); - if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); - currentColumn = null; - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - } - - /** - * ends a block once all column chunks have been written - * @throws java.io.IOException - */ - public void endBlock() throws IOException { - state = state.endBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": end block"); - currentBlock.setRowCount(currentRecordCount); - blocks.add(currentBlock); - currentBlock = null; - } - - /** - * ends a file once all blocks have been written. - * closes the file. - * @param extraMetaData the extra meta data to write in the footer - * @throws java.io.IOException - */ - public void end(Map<String, String> extraMetaData) throws IOException { - state = state.end(); - if (DEBUG) LOG.debug(out.getPos() + ": end"); - ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out); - out.close(); - } - - private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { - long footerIndex = out.getPos(); - parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); - writeFileMetaData(parquetMetadata, out); - if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); - out.write(MAGIC); - } - - /** - * writes a _metadata file - * @param configuration the configuration to use to get the FileSystem - * @param outputPath the directory to write the _metadata file to - * @param footers the list of footers to merge - * @throws java.io.IOException - */ - public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { - Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE); - FileSystem fs = outputPath.getFileSystem(configuration); - outputPath = outputPath.makeQualified(fs); - FSDataOutputStream metadata = fs.create(metaDataPath); - metadata.write(MAGIC); - ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); - serializeFooter(metadataFooter, metadata); - metadata.close(); - } - - private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) { - String rootPath = root.toString(); - GlobalMetaData fileMetaData = null; - List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - for (Footer footer : footers) { - String path = footer.getFile().toString(); - if (!path.startsWith(rootPath)) { - throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root); - } - path = path.substring(rootPath.length()); - while (path.startsWith("/")) { - path = path.substring(1); - } - fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData); - for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) { - block.setPath(path); - blocks.add(block); - } - } - return new ParquetMetadata(fileMetaData.merge(), blocks); - } - - /** - * @return the current position in the underlying file - * @throws java.io.IOException - */ - public long getPos() throws IOException { - return out.getPos(); - } - - /** - * Will merge the metadata of all the footers together - * @param footers the list files footers to merge - * @return the global meta data for all the footers - */ - static GlobalMetaData getGlobalMetaData(List<Footer> footers) { - GlobalMetaData fileMetaData = null; - for (Footer footer : footers) { - ParquetMetadata currentMetadata = footer.getParquetMetadata(); - fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData); - } - return fileMetaData; - } - - /** - * Will return the result of merging toMerge into mergedMetadata - * @param toMerge the metadata toMerge - * @param mergedMetadata the reference metadata to merge into - * @return the result of the merge - */ - static GlobalMetaData mergeInto( - FileMetaData toMerge, - GlobalMetaData mergedMetadata) { - MessageType schema = null; - Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>(); - Set<String> createdBy = new HashSet<String>(); - if (mergedMetadata != null) { - schema = mergedMetadata.getSchema(); - newKeyValues.putAll(mergedMetadata.getKeyValueMetaData()); - createdBy.addAll(mergedMetadata.getCreatedBy()); - } - if ((schema == null && toMerge.getSchema() != null) - || (schema != null && !schema.equals(toMerge.getSchema()))) { - schema = mergeInto(toMerge.getSchema(), schema); - } - for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) { - Set<String> values = newKeyValues.get(entry.getKey()); - if (values == null) { - values = new HashSet<String>(); - newKeyValues.put(entry.getKey(), values); - } - values.add(entry.getValue()); - } - createdBy.add(toMerge.getCreatedBy()); - return new GlobalMetaData( - schema, - newKeyValues, - createdBy); - } - - /** - * will return the result of merging toMerge into mergedSchema - * @param toMerge the schema to merge into mergedSchema - * @param mergedSchema the schema to append the fields to - * @return the resulting schema - */ - static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) { - if (mergedSchema == null) { - return toMerge; - } - return mergedSchema.union(toMerge); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java deleted file mode 100644 index 9c167a0..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.api.ReadSupport.ReadContext; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.GlobalMetaData; -import parquet.schema.MessageType; - -import java.io.Closeable; -import java.io.IOException; -import java.util.*; - -/** - * Read records from a Parquet file. - */ -public class ParquetReader<T> implements Closeable { - - private ReadSupport<T> readSupport; - private UnboundRecordFilter filter; - private Configuration conf; - private ReadContext readContext; - private Iterator<Footer> footersIterator; - private InternalParquetRecordReader<T> reader; - private GlobalMetaData globalMetaData; - - /** - * @param file the file to read - * @param readSupport to materialize records - * @throws java.io.IOException - */ - public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException { - this(file, readSupport, null); - } - - /** - * @param conf the configuration - * @param file the file to read - * @param readSupport to materialize records - * @throws java.io.IOException - */ - public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException { - this(conf, file, readSupport, null); - } - - /** - * @param file the file to read - * @param readSupport to materialize records - * @param filter the filter to use to filter records - * @throws java.io.IOException - */ - public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { - this(new Configuration(), file, readSupport, filter); - } - - /** - * @param conf the configuration - * @param file the file to read - * @param readSupport to materialize records - * @param filter the filter to use to filter records - * @throws java.io.IOException - */ - public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { - this.readSupport = readSupport; - this.filter = filter; - this.conf = conf; - - FileSystem fs = file.getFileSystem(conf); - List<FileStatus> statuses = Arrays.asList(fs.listStatus(file)); - List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses); - this.footersIterator = footers.iterator(); - globalMetaData = ParquetFileWriter.getGlobalMetaData(footers); - - List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - for (Footer footer : footers) { - blocks.addAll(footer.getParquetMetadata().getBlocks()); - } - - MessageType schema = globalMetaData.getSchema(); - Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData(); - readContext = readSupport.init(new InitContext(conf, extraMetadata, schema)); - } - - /** - * @return the next record or null if finished - * @throws java.io.IOException - */ - public T read() throws IOException { - try { - if (reader != null && reader.nextKeyValue()) { - return reader.getCurrentValue(); - } else { - initReader(); - return reader == null ? null : read(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - private void initReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - if (footersIterator.hasNext()) { - Footer footer = footersIterator.next(); - reader = new InternalParquetRecordReader<T>(readSupport, filter); - reader.initialize( - readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(), - readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf); - } - } - - @Override - public void close() throws IOException { - if (reader != null) { - reader.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java deleted file mode 100644 index 7527437..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.column.ParquetProperties; -import parquet.hadoop.api.WriteSupport; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.MessageType; - -import java.io.Closeable; -import java.io.IOException; - -public class ParquetWriter<T> implements Closeable { - - public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; - public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; - public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = - CompressionCodecName.UNCOMPRESSED; - public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; - public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; - public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = - ParquetProperties.WriterVersion.PARQUET_1_0; - - private final InternalParquetRecordWriter<T> writer; - - /** - * Create a new ParquetWriter. - * (with dictionary encoding enabled and validation off) - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean) - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold (both data and dictionary) - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - dictionaryPageSize, enableDictionary, validating, - DEFAULT_WRITER_VERSION); - } - - /** - * Create a new ParquetWriter. - * - * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads - * configuration from the classpath. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} - * @param conf Hadoop configuration to use while accessing the filesystem - * @throws java.io.IOException - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion, - Configuration conf) throws IOException { - - WriteSupport.WriteContext writeContext = writeSupport.init(conf); - MessageType schema = writeContext.getSchema(); - - ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); - fileWriter.start(); - - CodecFactory codecFactory = new CodecFactory(conf); - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); - this.writer = new InternalParquetRecordWriter<T>( - fileWriter, - writeSupport, - schema, - writeContext.getExtraMetaData(), - blockSize, - pageSize, - compressor, - dictionaryPageSize, - enableDictionary, - validating, - writerVersion); - } - - /** - * Create a new ParquetWriter. The default block size is 50 MB.The default - * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @throws java.io.IOException - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException { - this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); - } - - public void write(T object) throws IOException { - try { - writer.write(object); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - public long getEstimatedWrittenSize() throws IOException { - return this.writer.getEstimatedWrittenSize(); - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (InterruptedException e) { - throw new IOException(e); - } - }} http://git-wip-us.apache.org/repos/asf/tajo/blob/18b898ff/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java index 517e00e..ad0a92a 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java @@ -22,8 +22,8 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.junit.Test; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import java.util.ArrayList; import java.util.List;
