http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java new file mode 100644 index 0000000..532d9a2 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import parquet.Log; +import parquet.column.ParquetProperties.WriterVersion; +import parquet.column.impl.ColumnWriteStoreImpl; +import parquet.hadoop.api.WriteSupport; +import parquet.io.ColumnIOFactory; +import parquet.io.MessageColumnIO; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Map; + +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.String.format; +import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; +import static parquet.Log.DEBUG; +import static parquet.Preconditions.checkNotNull; + +class InternalParquetRecordWriter<T> { + private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); + + private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; + private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; + private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + + private final ParquetFileWriter w; + private final WriteSupport<T> writeSupport; + private final MessageType schema; + private final Map<String, String> extraMetaData; + private final int blockSize; + private final int pageSize; + private final BytesCompressor compressor; + private final int dictionaryPageSize; + private final boolean enableDictionary; + private final boolean validating; + private final WriterVersion writerVersion; + + private long recordCount = 0; + private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + + private ColumnWriteStoreImpl store; + private ColumnChunkPageWriteStore pageStore; + + /** + * @param w the file to write to + * @param writeSupport the class to convert incoming records + * @param schema the schema of the records + * @param extraMetaData extra meta data to write in the footer of the file + * @param blockSize the size of a block in the file (this will be approximate) + * @param codec the codec used to compress + */ + public InternalParquetRecordWriter( + ParquetFileWriter w, + WriteSupport<T> writeSupport, + MessageType schema, + Map<String, String> extraMetaData, + int blockSize, + int pageSize, + BytesCompressor compressor, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + WriterVersion writerVersion) { + this.w = w; + this.writeSupport = checkNotNull(writeSupport, "writeSupport"); + this.schema = schema; + this.extraMetaData = extraMetaData; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.compressor = compressor; + this.dictionaryPageSize = dictionaryPageSize; + this.enableDictionary = enableDictionary; + this.validating = validating; + this.writerVersion = writerVersion; + initStore(); + } + + private void initStore() { + // we don't want this number to be too small + // ideally we divide the block equally across the columns + // it is unlikely all columns are going to be the same size. + int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); + // we don't want this number to be too small either + // ideally, slightly bigger than the page size, but not bigger than the block buffer + int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); + store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); + MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); + writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); + } + + public void close() throws IOException, InterruptedException { + flushStore(); + w.end(extraMetaData); + } + + public void write(T value) throws IOException, InterruptedException { + writeSupport.write(value); + ++ recordCount; + checkBlockSizeReached(); + } + + private void checkBlockSizeReached() throws IOException { + if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. + long memSize = store.memSize(); + if (memSize > blockSize) { + LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); + flushStore(); + initStore(); + recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); + } else { + float recordSize = (float) memSize / recordCount; + recordCountForNextMemCheck = min( + max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway + recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead + ); + if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); + } + } + } + + public long getEstimatedWrittenSize() throws IOException { + return w.getPos() + store.memSize(); + } + + private void flushStore() + throws IOException { + LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); + if (store.allocatedSize() > 3 * blockSize) { + LOG.warn("Too much memory used: " + store.memUsageString()); + } + w.startBlock(recordCount); + store.flush(); + pageStore.flushToFileWriter(w); + recordCount = 0; + w.endBlock(); + store = null; + pageStore = null; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java new file mode 100644 index 0000000..ac1c421 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.Log; +import parquet.Version; +import parquet.bytes.BytesInput; +import parquet.bytes.BytesUtils; +import parquet.column.ColumnDescriptor; +import parquet.column.page.DictionaryPage; +import parquet.column.statistics.Statistics; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.hadoop.Footer; +import parquet.hadoop.metadata.*; +import parquet.io.ParquetEncodingException; +import parquet.schema.MessageType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.*; +import java.util.Map.Entry; + +import static parquet.Log.DEBUG; +import static parquet.format.Util.writeFileMetaData; + +/** + * Internal implementation of the Parquet file writer as a block container + * + * @author Julien Le Dem + * + */ +public class ParquetFileWriter { + private static final Log LOG = Log.getLog(ParquetFileWriter.class); + + public static final String PARQUET_METADATA_FILE = "_metadata"; + public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); + public static final int CURRENT_VERSION = 1; + + private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); + + private final MessageType schema; + private final FSDataOutputStream out; + private BlockMetaData currentBlock; + private ColumnChunkMetaData currentColumn; + private long currentRecordCount; + private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + private long uncompressedLength; + private long compressedLength; + private Set<parquet.column.Encoding> currentEncodings; + + private CompressionCodecName currentChunkCodec; + private ColumnPath currentChunkPath; + private PrimitiveTypeName currentChunkType; + private long currentChunkFirstDataPage; + private long currentChunkDictionaryPageOffset; + private long currentChunkValueCount; + + private Statistics currentStatistics; + + /** + * Captures the order in which methods should be called + * + * @author Julien Le Dem + * + */ + private enum STATE { + NOT_STARTED { + STATE start() { + return STARTED; + } + }, + STARTED { + STATE startBlock() { + return BLOCK; + } + STATE end() { + return ENDED; + } + }, + BLOCK { + STATE startColumn() { + return COLUMN; + } + STATE endBlock() { + return STARTED; + } + }, + COLUMN { + STATE endColumn() { + return BLOCK; + }; + STATE write() { + return this; + } + }, + ENDED; + + STATE start() throws IOException { return error(); } + STATE startBlock() throws IOException { return error(); } + STATE startColumn() throws IOException { return error(); } + STATE write() throws IOException { return error(); } + STATE endColumn() throws IOException { return error(); } + STATE endBlock() throws IOException { return error(); } + STATE end() throws IOException { return error(); } + + private final STATE error() throws IOException { + throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); + } + } + + private STATE state = STATE.NOT_STARTED; + + /** + * + * @param configuration Configuration + * @param schema the schema of the data + * @param file the file to write to + * @throws java.io.IOException if the file can not be created + */ + public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { + super(); + this.schema = schema; + FileSystem fs = file.getFileSystem(configuration); + this.out = fs.create(file, false); + } + + /** + * start the file + * @throws java.io.IOException + */ + public void start() throws IOException { + state = state.start(); + if (DEBUG) LOG.debug(out.getPos() + ": start"); + out.write(MAGIC); + } + + /** + * start a block + * @param recordCount the record count in this block + * @throws java.io.IOException + */ + public void startBlock(long recordCount) throws IOException { + state = state.startBlock(); + if (DEBUG) LOG.debug(out.getPos() + ": start block"); +// out.write(MAGIC); // TODO: add a magic delimiter + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; + } + + /** + * start a column inside a block + * @param descriptor the column descriptor + * @param valueCount the value count in this column + * @param statistics the statistics in this column + * @param compressionCodecName + * @throws java.io.IOException + */ + public void startColumn(ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName) throws IOException { + state = state.startColumn(); + if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); + currentEncodings = new HashSet<parquet.column.Encoding>(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = out.getPos(); + compressedLength = 0; + uncompressedLength = 0; + // need to know what type of stats to initialize to + // better way to do this? + currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); + } + + /** + * writes a dictionary page page + * @param dictionaryPage the dictionary page + */ + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + state = state.write(); + if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out); + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); + dictionaryPage.getBytes().writeAllTo(out); + currentEncodings.add(dictionaryPage.getEncoding()); + } + + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + */ + @Deprecated + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + parquet.column.Encoding rlEncoding, + parquet.column.Encoding dlEncoding, + parquet.column.Encoding valuesEncoding) throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); + int compressedPageSize = (int)bytes.size(); + metadataConverter.writeDataPageHeader( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); + bytes.writeAllTo(out); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + */ + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + parquet.column.Encoding rlEncoding, + parquet.column.Encoding dlEncoding, + parquet.column.Encoding valuesEncoding) throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); + int compressedPageSize = (int)bytes.size(); + metadataConverter.writeDataPageHeader( + uncompressedPageSize, compressedPageSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); + bytes.writeAllTo(out); + currentStatistics.mergeStatistics(statistics); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * writes a number of pages at once + * @param bytes bytes to be written including page headers + * @param uncompressedTotalPageSize total uncompressed size (without page headers) + * @param compressedTotalPageSize total compressed size (without page headers) + * @throws java.io.IOException + */ + void writeDataPages(BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + List<parquet.column.Encoding> encodings) throws IOException { + state = state.write(); + if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); + bytes.writeAllTo(out); + currentEncodings.addAll(encodings); + currentStatistics = totalStats; + } + + /** + * end a column (once all rep, def and data have been written) + * @throws java.io.IOException + */ + public void endColumn() throws IOException { + state = state.endColumn(); + if (DEBUG) LOG.debug(out.getPos() + ": end column"); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength)); + if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); + currentColumn = null; + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + } + + /** + * ends a block once all column chunks have been written + * @throws java.io.IOException + */ + public void endBlock() throws IOException { + state = state.endBlock(); + if (DEBUG) LOG.debug(out.getPos() + ": end block"); + currentBlock.setRowCount(currentRecordCount); + blocks.add(currentBlock); + currentBlock = null; + } + + /** + * ends a file once all blocks have been written. + * closes the file. + * @param extraMetaData the extra meta data to write in the footer + * @throws java.io.IOException + */ + public void end(Map<String, String> extraMetaData) throws IOException { + state = state.end(); + if (DEBUG) LOG.debug(out.getPos() + ": end"); + ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out); + out.close(); + } + + private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { + long footerIndex = out.getPos(); + parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); + writeFileMetaData(parquetMetadata, out); + if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); + out.write(MAGIC); + } + + /** + * writes a _metadata file + * @param configuration the configuration to use to get the FileSystem + * @param outputPath the directory to write the _metadata file to + * @param footers the list of footers to merge + * @throws java.io.IOException + */ + public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { + Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE); + FileSystem fs = outputPath.getFileSystem(configuration); + outputPath = outputPath.makeQualified(fs); + FSDataOutputStream metadata = fs.create(metaDataPath); + metadata.write(MAGIC); + ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); + serializeFooter(metadataFooter, metadata); + metadata.close(); + } + + private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) { + String rootPath = root.toString(); + GlobalMetaData fileMetaData = null; + List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + for (Footer footer : footers) { + String path = footer.getFile().toString(); + if (!path.startsWith(rootPath)) { + throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root); + } + path = path.substring(rootPath.length()); + while (path.startsWith("/")) { + path = path.substring(1); + } + fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData); + for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) { + block.setPath(path); + blocks.add(block); + } + } + return new ParquetMetadata(fileMetaData.merge(), blocks); + } + + /** + * @return the current position in the underlying file + * @throws java.io.IOException + */ + public long getPos() throws IOException { + return out.getPos(); + } + + /** + * Will merge the metadata of all the footers together + * @param footers the list files footers to merge + * @return the global meta data for all the footers + */ + static GlobalMetaData getGlobalMetaData(List<Footer> footers) { + GlobalMetaData fileMetaData = null; + for (Footer footer : footers) { + ParquetMetadata currentMetadata = footer.getParquetMetadata(); + fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData); + } + return fileMetaData; + } + + /** + * Will return the result of merging toMerge into mergedMetadata + * @param toMerge the metadata toMerge + * @param mergedMetadata the reference metadata to merge into + * @return the result of the merge + */ + static GlobalMetaData mergeInto( + FileMetaData toMerge, + GlobalMetaData mergedMetadata) { + MessageType schema = null; + Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>(); + Set<String> createdBy = new HashSet<String>(); + if (mergedMetadata != null) { + schema = mergedMetadata.getSchema(); + newKeyValues.putAll(mergedMetadata.getKeyValueMetaData()); + createdBy.addAll(mergedMetadata.getCreatedBy()); + } + if ((schema == null && toMerge.getSchema() != null) + || (schema != null && !schema.equals(toMerge.getSchema()))) { + schema = mergeInto(toMerge.getSchema(), schema); + } + for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) { + Set<String> values = newKeyValues.get(entry.getKey()); + if (values == null) { + values = new HashSet<String>(); + newKeyValues.put(entry.getKey(), values); + } + values.add(entry.getValue()); + } + createdBy.add(toMerge.getCreatedBy()); + return new GlobalMetaData( + schema, + newKeyValues, + createdBy); + } + + /** + * will return the result of merging toMerge into mergedSchema + * @param toMerge the schema to merge into mergedSchema + * @param mergedSchema the schema to append the fields to + * @return the resulting schema + */ + static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) { + if (mergedSchema == null) { + return toMerge; + } + return mergedSchema.union(toMerge); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java new file mode 100644 index 0000000..9c167a0 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.filter.UnboundRecordFilter; +import parquet.hadoop.Footer; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.api.InitContext; +import parquet.hadoop.api.ReadSupport; +import parquet.hadoop.api.ReadSupport.ReadContext; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.GlobalMetaData; +import parquet.schema.MessageType; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; + +/** + * Read records from a Parquet file. + */ +public class ParquetReader<T> implements Closeable { + + private ReadSupport<T> readSupport; + private UnboundRecordFilter filter; + private Configuration conf; + private ReadContext readContext; + private Iterator<Footer> footersIterator; + private InternalParquetRecordReader<T> reader; + private GlobalMetaData globalMetaData; + + /** + * @param file the file to read + * @param readSupport to materialize records + * @throws java.io.IOException + */ + public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException { + this(file, readSupport, null); + } + + /** + * @param conf the configuration + * @param file the file to read + * @param readSupport to materialize records + * @throws java.io.IOException + */ + public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException { + this(conf, file, readSupport, null); + } + + /** + * @param file the file to read + * @param readSupport to materialize records + * @param filter the filter to use to filter records + * @throws java.io.IOException + */ + public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { + this(new Configuration(), file, readSupport, filter); + } + + /** + * @param conf the configuration + * @param file the file to read + * @param readSupport to materialize records + * @param filter the filter to use to filter records + * @throws java.io.IOException + */ + public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { + this.readSupport = readSupport; + this.filter = filter; + this.conf = conf; + + FileSystem fs = file.getFileSystem(conf); + List<FileStatus> statuses = Arrays.asList(fs.listStatus(file)); + List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses); + this.footersIterator = footers.iterator(); + globalMetaData = ParquetFileWriter.getGlobalMetaData(footers); + + List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + for (Footer footer : footers) { + blocks.addAll(footer.getParquetMetadata().getBlocks()); + } + + MessageType schema = globalMetaData.getSchema(); + Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData(); + readContext = readSupport.init(new InitContext(conf, extraMetadata, schema)); + } + + /** + * @return the next record or null if finished + * @throws java.io.IOException + */ + public T read() throws IOException { + try { + if (reader != null && reader.nextKeyValue()) { + return reader.getCurrentValue(); + } else { + initReader(); + return reader == null ? null : read(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + private void initReader() throws IOException { + if (reader != null) { + reader.close(); + reader = null; + } + if (footersIterator.hasNext()) { + Footer footer = footersIterator.next(); + reader = new InternalParquetRecordReader<T>(readSupport, filter); + reader.initialize( + readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(), + readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf); + } + } + + @Override + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java new file mode 100644 index 0000000..7527437 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.column.ParquetProperties; +import parquet.hadoop.api.WriteSupport; +import parquet.hadoop.metadata.CompressionCodecName; +import parquet.schema.MessageType; + +import java.io.Closeable; +import java.io.IOException; + +public class ParquetWriter<T> implements Closeable { + + public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; + public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; + public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = + CompressionCodecName.UNCOMPRESSED; + public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; + public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; + public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = + ParquetProperties.WriterVersion.PARQUET_1_0; + + private final InternalParquetRecordWriter<T> writer; + + /** + * Create a new ParquetWriter. + * (with dictionary encoding enabled and validation off) + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean) + */ + public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold (both data and dictionary) + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, + dictionaryPageSize, enableDictionary, validating, + DEFAULT_WRITER_VERSION); + } + + /** + * Create a new ParquetWriter. + * + * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads + * configuration from the classpath. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} + * @throws java.io.IOException + * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion) throws IOException { + this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); + } + + /** + * Create a new ParquetWriter. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @param compressionCodecName the compression codec to use + * @param blockSize the block size threshold + * @param pageSize the page size threshold + * @param dictionaryPageSize the page size threshold for the dictionary pages + * @param enableDictionary to turn dictionary encoding on + * @param validating to turn on validation using the schema + * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} + * @param conf Hadoop configuration to use while accessing the filesystem + * @throws java.io.IOException + */ + public ParquetWriter( + Path file, + WriteSupport<T> writeSupport, + CompressionCodecName compressionCodecName, + int blockSize, + int pageSize, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + ParquetProperties.WriterVersion writerVersion, + Configuration conf) throws IOException { + + WriteSupport.WriteContext writeContext = writeSupport.init(conf); + MessageType schema = writeContext.getSchema(); + + ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); + fileWriter.start(); + + CodecFactory codecFactory = new CodecFactory(conf); + CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); + this.writer = new InternalParquetRecordWriter<T>( + fileWriter, + writeSupport, + schema, + writeContext.getExtraMetaData(), + blockSize, + pageSize, + compressor, + dictionaryPageSize, + enableDictionary, + validating, + writerVersion); + } + + /** + * Create a new ParquetWriter. The default block size is 50 MB.The default + * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled. + * + * @param file the file to create + * @param writeSupport the implementation to write a record to a RecordConsumer + * @throws java.io.IOException + */ + public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException { + this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + } + + public void write(T object) throws IOException { + try { + writer.write(object); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public long getEstimatedWrittenSize() throws IOException { + return this.writer.getEstimatedWrittenSize(); + } + + @Override + public void close() throws IOException { + try { + writer.close(); + } catch (InterruptedException e) { + throw new IOException(e); + } + }} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto new file mode 100644 index 0000000..ce9aab6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/proto/StorageFragmentProtos.proto @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage"; +option java_outer_classname = "StorageFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message FileFragmentProto { + required string id = 1; + required string path = 2; + required int64 startOffset = 3; + required int64 length = 4; + repeated string hosts = 5; + repeated int32 diskIds = 6; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java new file mode 100644 index 0000000..cf8a54e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.net.NetUtils; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFactory; +import org.jboss.netty.channel.group.ChannelGroup; +import org.jboss.netty.channel.group.ChannelGroupFuture; +import org.jboss.netty.channel.group.DefaultChannelGroup; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.concurrent.Executors; + +public class HttpFileServer { + private final static Log LOG = LogFactory.getLog(HttpFileServer.class); + + private final InetSocketAddress addr; + private InetSocketAddress bindAddr; + private ServerBootstrap bootstrap = null; + private ChannelFactory factory = null; + private ChannelGroup channelGroup = null; + + public HttpFileServer(final InetSocketAddress addr) { + this.addr = addr; + this.factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), + 2); + + // Configure the server. + this.bootstrap = new ServerBootstrap(factory); + // Set up the event pipeline factory. + this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory()); + this.channelGroup = new DefaultChannelGroup(); + } + + public HttpFileServer(String bindaddr) { + this(NetUtils.createSocketAddr(bindaddr)); + } + + public void start() { + // Bind and start to accept incoming connections. + Channel channel = bootstrap.bind(addr); + channelGroup.add(channel); + this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); + LOG.info("HttpFileServer starts up (" + + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() + + ")"); + } + + public InetSocketAddress getBindAddress() { + return this.bindAddr; + } + + public void stop() { + ChannelGroupFuture future = channelGroup.close(); + future.awaitUninterruptibly(); + factory.releaseExternalResources(); + + LOG.info("HttpFileServer shutdown (" + + this.bindAddr.getAddress().getHostAddress() + ":" + + this.bindAddr.getPort() + ")"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java new file mode 100644 index 0000000..6c77317 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.frame.TooLongFrameException; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpRequest; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.ssl.SslHandler; +import org.jboss.netty.handler.stream.ChunkedFile; +import org.jboss.netty.util.CharsetUtil; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; +import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; +import static org.jboss.netty.handler.codec.http.HttpMethod.GET; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6 + */ +public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + HttpRequest request = (HttpRequest) e.getMessage(); + if (request.getMethod() != GET) { + sendError(ctx, METHOD_NOT_ALLOWED); + return; + } + + final String path = sanitizeUri(request.getUri()); + if (path == null) { + sendError(ctx, FORBIDDEN); + return; + } + + File file = new File(path); + if (file.isHidden() || !file.exists()) { + sendError(ctx, NOT_FOUND); + return; + } + if (!file.isFile()) { + sendError(ctx, FORBIDDEN); + return; + } + + RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException fnfe) { + sendError(ctx, NOT_FOUND); + return; + } + long fileLength = raf.length(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + setContentLength(response, fileLength); + setContentTypeHeader(response); + + Channel ch = e.getChannel(); + + // Write the initial line and the header. + ch.write(response); + + // Write the content. + ChannelFuture writeFuture; + if (ch.getPipeline().get(SslHandler.class) != null) { + // Cannot use zero-copy with HTTPS. + writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); + } else { + // No encryption - use zero-copy. + final FileRegion region = + new DefaultFileRegion(raf.getChannel(), 0, fileLength); + writeFuture = ch.write(region); + writeFuture.addListener(new ChannelFutureProgressListener() { + public void operationComplete(ChannelFuture future) { + region.releaseExternalResources(); + } + + public void operationProgressed( + ChannelFuture future, long amount, long current, long total) { + System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); + } + }); + } + + // Decide whether to close the connection or not. + if (!isKeepAlive(request)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + Channel ch = e.getChannel(); + Throwable cause = e.getCause(); + if (cause instanceof TooLongFrameException) { + sendError(ctx, BAD_REQUEST); + return; + } + + cause.printStackTrace(); + if (ch.isConnected()) { + sendError(ctx, INTERNAL_SERVER_ERROR); + } + } + + private static String sanitizeUri(String uri) { + // Decode the path. + try { + uri = URLDecoder.decode(uri, "UTF-8"); + } catch (UnsupportedEncodingException e) { + try { + uri = URLDecoder.decode(uri, "ISO-8859-1"); + } catch (UnsupportedEncodingException e1) { + throw new Error(); + } + } + + // Convert file separators. + uri = uri.replace('/', File.separatorChar); + + // Simplistic dumb security check. + // You will have to do something serious in the production environment. + if (uri.contains(File.separator + '.') || + uri.contains('.' + File.separator) || + uri.startsWith(".") || uri.endsWith(".")) { + return null; + } + + return uri; + } + + private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent(ChannelBuffers.copiedBuffer( + "Failure: " + status.toString() + "\r\n", + CharsetUtil.UTF_8)); + + // Close the connection as soon as the error message is sent. + ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + } + + /** + * Sets the content type header for the HTTP Response + * + * @param response + * HTTP response + */ + private static void setContentTypeHeader(HttpResponse response) { + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java new file mode 100644 index 0000000..cecf93b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.handler.codec.http.HttpChunkAggregator; +import org.jboss.netty.handler.codec.http.HttpRequestDecoder; +import org.jboss.netty.handler.codec.http.HttpResponseEncoder; +import org.jboss.netty.handler.stream.ChunkedWriteHandler; + +import static org.jboss.netty.channel.Channels.pipeline; + +// Uncomment the following lines if you want HTTPS +//import javax.net.ssl.SSLEngine; +//import org.jboss.netty.example.securechat.SecureChatSslContextFactory; +//import org.jboss.netty.handler.ssl.SslHandler; + +//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6 +public class HttpFileServerPipelineFactory implements ChannelPipelineFactory { + public ChannelPipeline getPipeline() throws Exception { + // Create a default pipeline implementation. + ChannelPipeline pipeline = pipeline(); + + // Uncomment the following lines if you want HTTPS + //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); + //engine.setUseClientMode(false); + //pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + + pipeline.addLast("handler", new HttpFileServerHandler()); + return pipeline; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java new file mode 100644 index 0000000..3c78d6b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.io.compress.zlib.ZlibFactory; +import org.apache.hadoop.util.NativeCodeLoader; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.storage.text.DelimitedTextFile; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; + +import static org.junit.Assert.*; + +@RunWith(Parameterized.class) +public class TestCompressionStorages { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestCompressionStorages"; + + private StoreType storeType; + private Path testDir; + private FileSystem fs; + + public TestCompressionStorages(StoreType type) throws IOException { + this.storeType = type; + conf = new TajoConf(); + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() { + return Arrays.asList(new Object[][]{ + {StoreType.CSV}, + {StoreType.RCFILE}, + {StoreType.SEQUENCEFILE}, + {StoreType.TEXTFILE} + }); + } + + @Test + public void testDeflateCodecCompressionData() throws IOException { + storageCompressionTest(storeType, DeflateCodec.class); + } + + @Test + public void testGzipCodecCompressionData() throws IOException { + if (storeType == StoreType.RCFILE) { + if( ZlibFactory.isNativeZlibLoaded(conf)) { + storageCompressionTest(storeType, GzipCodec.class); + } + } else if (storeType == StoreType.SEQUENCEFILE) { + if( ZlibFactory.isNativeZlibLoaded(conf)) { + storageCompressionTest(storeType, GzipCodec.class); + } + } else { + storageCompressionTest(storeType, GzipCodec.class); + } + } + + @Test + public void testSnappyCodecCompressionData() throws IOException { + if (SnappyCodec.isNativeCodeLoaded()) { + storageCompressionTest(storeType, SnappyCodec.class); + } + } + + @Test + public void testLz4CodecCompressionData() throws IOException { + if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded()) + storageCompressionTest(storeType, Lz4Codec.class); + } + + private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.FLOAT4); + schema.addColumn("name", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.putOption("compression.codec", codec.getCanonicalName()); + meta.putOption("compression.type", SequenceFile.CompressionType.BLOCK.name()); + meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName()); + meta.putOption("sequencefile.serde", TextSerializerDeserializer.class.getName()); + + String fileName = "Compression_" + codec.getSimpleName(); + Path tablePath = new Path(testDir, fileName); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + appender.enableStats(); + + appender.init(); + + String extension = ""; + if (appender instanceof CSVFile.CSVAppender) { + extension = ((CSVFile.CSVAppender) appender).getExtension(); + } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { + extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); + } + + int tupleNum = 100000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(3); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createFloat4((float) i)); + vTuple.put(2, DatumFactory.createText(String.valueOf(i))); + appender.addTuple(vTuple); + } + appender.close(); + + TableStats stat = appender.getStats(); + assertEquals(tupleNum, stat.getNumRows().longValue()); + tablePath = tablePath.suffix(extension); + FileStatus status = fs.getFileStatus(tablePath); + long fileLen = status.getLen(); + FileFragment[] tablets = new FileFragment[1]; + tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); + + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + + if (StoreType.CSV == storeType) { + if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { + assertTrue(scanner.isSplittable()); + } else { + assertFalse(scanner.isSplittable()); + } + } + scanner.init(); + + if (storeType == StoreType.SEQUENCEFILE) { + assertTrue(scanner instanceof SequenceFileScanner); + Writable key = ((SequenceFileScanner) scanner).getKey(); + assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName()); + } + + int tupleCnt = 0; + Tuple tuple; + while ((tuple = scanner.next()) != null) { + tupleCnt++; + } + scanner.close(); + assertEquals(tupleNum, tupleCnt); + assertNotSame(appender.getStats().getNumBytes().longValue(), scanner.getInputStats().getNumBytes().longValue()); + assertEquals(appender.getStats().getNumRows().longValue(), scanner.getInputStats().getNumRows().longValue()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java new file mode 100644 index 0000000..8749925 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.FileUtil; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URL; + +import static org.junit.Assert.*; + +public class TestDelimitedTextFile { + + private static Schema schema = new Schema(); + + private static Tuple baseTuple = new VTuple(10); + + static { + schema.addColumn("col1", Type.BOOLEAN); + schema.addColumn("col2", Type.CHAR, 7); + schema.addColumn("col3", Type.INT2); + schema.addColumn("col4", Type.INT4); + schema.addColumn("col5", Type.INT8); + schema.addColumn("col6", Type.FLOAT4); + schema.addColumn("col7", Type.FLOAT8); + schema.addColumn("col8", Type.TEXT); + schema.addColumn("col9", Type.BLOB); + schema.addColumn("col10", Type.INET4); + + baseTuple.put(new Datum[] { + DatumFactory.createBool(true), // 0 + DatumFactory.createChar("hyunsik"), // 1 + DatumFactory.createInt2((short) 17), // 2 + DatumFactory.createInt4(59), // 3 + DatumFactory.createInt8(23l), // 4 + DatumFactory.createFloat4(77.9f), // 5 + DatumFactory.createFloat8(271.9d), // 6 + DatumFactory.createText("hyunsik"), // 7 + DatumFactory.createBlob("hyunsik".getBytes()),// 8 + DatumFactory.createInet4("192.168.0.1"), // 9 + }); + } + + public static Path getResourcePath(String path, String suffix) { + URL resultBaseURL = ClassLoader.getSystemResource(path); + return new Path(resultBaseURL.toString(), suffix); + } + + public static Path getResultPath(Class clazz, String fileName) { + return new Path (getResourcePath("results", clazz.getSimpleName()), fileName); + } + + public static String getResultText(Class clazz, String fileName) throws IOException { + FileSystem localFS = FileSystem.getLocal(new Configuration()); + Path path = getResultPath(clazz, fileName); + Preconditions.checkState(localFS.exists(path) && localFS.isFile(path)); + return FileUtil.readTextFile(new File(path.toUri())); + } + + private static final FileFragment getFileFragment(String fileName) throws IOException { + TajoConf conf = new TajoConf(); + Path tablePath = new Path(getResourcePath("dataset", "TestDelimitedTextFile"), fileName); + FileSystem fs = FileSystem.getLocal(conf); + FileStatus status = fs.getFileStatus(tablePath); + return new FileFragment("table", tablePath, 0, status.getLen()); + } + + @Test + public void testIgnoreAllErrors() throws IOException { + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); + meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); + FileFragment fragment = getFileFragment("testErrorTolerance1.json"); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + Tuple tuple; + int i = 0; + while ((tuple = scanner.next()) != null) { + assertEquals(baseTuple, tuple); + i++; + } + assertEquals(3, i); + scanner.close(); + } + + @Test + public void testIgnoreOneErrorTolerance() throws IOException { + + + TajoConf conf = new TajoConf(); + + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); + meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); + FileFragment fragment = getFileFragment("testErrorTolerance1.json"); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + assertNotNull(scanner.next()); + assertNotNull(scanner.next()); + try { + scanner.next(); + } catch (IOException ioe) { + System.out.println(ioe); + return; + } finally { + scanner.close(); + } + fail(); + } + + @Test + public void testNoErrorTolerance() throws IOException { + TajoConf conf = new TajoConf(); + TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.JSON); + meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); + FileFragment fragment = getFileFragment("testErrorTolerance2.json"); + Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + scanner.init(); + + try { + scanner.next(); + } catch (IOException ioe) { + return; + } finally { + scanner.close(); + } + fail(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java new file mode 100644 index 0000000..19a39a2 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.*; + +public class TestFileStorageManager { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestFileStorageManager"; + StorageManager sm = null; + private Path testDir; + private FileSystem fs; + + @Before + public void setUp() throws Exception { + conf = new TajoConf(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + fs = testDir.getFileSystem(conf); + sm = StorageManager.getFileStorageManager(conf, testDir); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testGetScannerAndAppender() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Tuple[] tuples = new Tuple[4]; + for(int i=0; i < tuples.length; i++) { + tuples[i] = new VTuple(3); + tuples[i].put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i + 32), + DatumFactory.createText("name" + i)}); + } + + Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); + fs.mkdirs(path.getParent()); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, path); + appender.init(); + for(Tuple t : tuples) { + appender.addTuple(t); + } + appender.close(); + + Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(meta, schema, path); + scanner.init(); + int i=0; + while(scanner.next() != null) { + i++; + } + assertEquals(4,i); + } + + @Test + public void testGetSplit() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + + int testCount = 10; + Path tablePath = new Path("/testGetSplit"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test partitions + List<Path> partitions = Lists.newArrayList(); + for (int i =0; i < testCount; i++){ + Path tmpFile = new Path(tablePath, String.valueOf(i)); + DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); + partitions.add(tmpFile); + } + + assertTrue(fs.exists(tablePath)); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + List<Fragment> splits = Lists.newArrayList(); + // Get FileFragments in partition batch + splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + assertEquals(testCount, splits.size()); + // -1 is unknown volumeId + assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + + splits.clear(); + splits.addAll(sm.getSplits("data", meta, schema, + partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); + assertEquals(testCount / 2, splits.size()); + assertEquals(1, splits.get(0).getHosts().length); + assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(); + + File dir = new File(testDataPath); + dir.delete(); + } + } + + @Test + public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + + int testCount = 10; + Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test files + for (int i = 0; i < testCount; i++) { + Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); + DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); + } + assertTrue(fs.exists(tablePath)); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(new TajoConf(conf), tablePath); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT4); + schema.addColumn("name", Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + List<Fragment> splits = Lists.newArrayList(); + splits.addAll(sm.getSplits("data", meta, schema, tablePath)); + + assertEquals(testCount, splits.size()); + assertEquals(2, splits.get(0).getHosts().length); + assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length); + assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(); + + File dir = new File(testDataPath); + dir.delete(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java new file mode 100644 index 0000000..ff7fe13 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.Fragment; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class TestFileSystems { + + private static String TEST_PATH = "target/test-data/TestFileSystem"; + private TajoConf conf; + private FileStorageManager sm; + private FileSystem fs; + private Path testDir; + + public TestFileSystems(FileSystem fs) throws IOException { + this.fs = fs; + this.conf = new TajoConf(fs.getConf()); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); + testDir = getTestDir(this.fs, TEST_PATH); + } + + public Path getTestDir(FileSystem fs, String dir) throws IOException { + Path path = new Path(dir); + if(fs.exists(path)) + fs.delete(path, true); + + fs.mkdirs(path); + + return fs.makeQualified(path); + } + + @Parameterized.Parameters + public static Collection<Object[]> generateParameters() throws IOException { + return Arrays.asList(new Object[][]{ + {FileSystem.getLocal(new TajoConf())}, + }); + } + + @Before + public void setup() throws IOException { + if (!(fs instanceof LocalFileSystem)) { + conf.set("fs.local.block.size", "10"); + fs.initialize(URI.create(fs.getScheme() + ":///"), conf); + fs.setConf(conf); + } + } + + @After + public void tearDown() throws IOException { + if (!(fs instanceof LocalFileSystem)) { + fs.setConf(new TajoConf()); + } + } + + @Test + public void testBlockSplit() throws IOException { + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT4); + schema.addColumn("name", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); + + Tuple[] tuples = new Tuple[4]; + for (int i = 0; i < tuples.length; i++) { + tuples[i] = new VTuple(3); + tuples[i] + .put(new Datum[]{DatumFactory.createInt4(i), + DatumFactory.createInt4(i + 32), + DatumFactory.createText("name" + i)}); + } + + Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", + "table.csv"); + fs.mkdirs(path.getParent()); + + Appender appender = sm.getAppender(meta, schema, path); + appender.init(); + for (Tuple t : tuples) { + appender.addTuple(t); + } + appender.close(); + FileStatus fileStatus = fs.getFileStatus(path); + + List<Fragment> splits = sm.getSplits("table", meta, schema, path); + int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize()); + assertEquals(splitSize, splits.size()); + + for (Fragment fragment : splits) { + assertTrue(fragment.getLength() <= fileStatus.getBlockSize()); + } + } +} \ No newline at end of file
