http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java deleted file mode 100644 index 73ce7c2..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java +++ /dev/null @@ -1,504 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static parquet.Log.DEBUG; -import static parquet.format.Util.writeFileMetaData; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import parquet.Log; -import parquet.Version; -import parquet.bytes.BytesInput; -import parquet.bytes.BytesUtils; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DictionaryPage; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ColumnPath; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.hadoop.metadata.FileMetaData; -import parquet.hadoop.metadata.GlobalMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -/** - * Internal implementation of the Parquet file writer as a block container - * - * @author Julien Le Dem - * - */ -public class ParquetFileWriter { - private static final Log LOG = Log.getLog(ParquetFileWriter.class); - - public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); - public static final int CURRENT_VERSION = 1; - - private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); - - private final MessageType schema; - private final FSDataOutputStream out; - private BlockMetaData currentBlock; - private ColumnChunkMetaData currentColumn; - private long currentRecordCount; - private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - private long uncompressedLength; - private long compressedLength; - private Set<parquet.column.Encoding> currentEncodings; - - private CompressionCodecName currentChunkCodec; - private ColumnPath currentChunkPath; - private PrimitiveTypeName currentChunkType; - private long currentChunkFirstDataPage; - private long currentChunkDictionaryPageOffset; - private long currentChunkValueCount; - - private Statistics currentStatistics; - - /** - * Captures the order in which methods should be called - * - * @author Julien Le Dem - * - */ - private enum STATE { - NOT_STARTED { - STATE start() { - return STARTED; - } - }, - STARTED { - STATE startBlock() { - return BLOCK; - } - STATE end() { - return ENDED; - } - }, - BLOCK { - STATE startColumn() { - return COLUMN; - } - STATE endBlock() { - return STARTED; - } - }, - COLUMN { - STATE endColumn() { - return BLOCK; - }; - STATE write() { - return this; - } - }, - ENDED; - - STATE start() throws IOException { return error(); } - STATE startBlock() throws IOException { return error(); } - STATE startColumn() throws IOException { return error(); } - STATE write() throws IOException { return error(); } - STATE endColumn() throws IOException { return error(); } - STATE endBlock() throws IOException { return error(); } - STATE end() throws IOException { return error(); } - - private final STATE error() throws IOException { - throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); - } - } - - private STATE state = STATE.NOT_STARTED; - - /** - * - * @param schema the schema of the data - * @param out the file to write to - * @param codec the codec to use to compress blocks - * @throws IOException if the file can not be created - */ - public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { - super(); - this.schema = schema; - FileSystem fs = file.getFileSystem(configuration); - this.out = fs.create(file, false); - } - - /** - * start the file - * @throws IOException - */ - public void start() throws IOException { - state = state.start(); - if (DEBUG) LOG.debug(out.getPos() + ": start"); - out.write(MAGIC); - } - - /** - * start a block - * @param recordCount the record count in this block - * @throws IOException - */ - public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": start block"); -// out.write(MAGIC); // TODO: add a magic delimiter - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; - } - - /** - * start a column inside a block - * @param descriptor the column descriptor - * @param valueCount the value count in this column - * @param statistics the statistics in this column - * @param compressionCodecName - * @throws IOException - */ - public void startColumn(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); - currentEncodings = new HashSet<parquet.column.Encoding>(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = out.getPos(); - compressedLength = 0; - uncompressedLength = 0; - // need to know what type of stats to initialize to - // better way to do this? - currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); - } - - /** - * writes a dictionary page page - * @param dictionaryPage the dictionary page - */ - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out); - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); - currentEncodings.add(dictionaryPage.getEncoding()); - } - - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentStatistics.mergeStatistics(statistics); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a number of pages at once - * @param bytes bytes to be written including page headers - * @param uncompressedTotalPageSize total uncompressed size (without page headers) - * @param compressedTotalPageSize total compressed size (without page headers) - * @throws IOException - */ - void writeDataPages(BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - List<parquet.column.Encoding> encodings) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); - bytes.writeAllTo(out); - currentEncodings.addAll(encodings); - currentStatistics = totalStats; - } - - /** - * end a column (once all rep, def and data have been written) - * @throws IOException - */ - public void endColumn() throws IOException { - state = state.endColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": end column"); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength)); - if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); - currentColumn = null; - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - } - - /** - * ends a block once all column chunks have been written - * @throws IOException - */ - public void endBlock() throws IOException { - state = state.endBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": end block"); - currentBlock.setRowCount(currentRecordCount); - blocks.add(currentBlock); - currentBlock = null; - } - - /** - * ends a file once all blocks have been written. - * closes the file. - * @param extraMetaData the extra meta data to write in the footer - * @throws IOException - */ - public void end(Map<String, String> extraMetaData) throws IOException { - state = state.end(); - if (DEBUG) LOG.debug(out.getPos() + ": end"); - ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out); - out.close(); - } - - private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { - long footerIndex = out.getPos(); - parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); - writeFileMetaData(parquetMetadata, out); - if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); - out.write(MAGIC); - } - - /** - * writes a _metadata file - * @param configuration the configuration to use to get the FileSystem - * @param outputPath the directory to write the _metadata file to - * @param footers the list of footers to merge - * @throws IOException - */ - public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { - Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE); - FileSystem fs = outputPath.getFileSystem(configuration); - outputPath = outputPath.makeQualified(fs); - FSDataOutputStream metadata = fs.create(metaDataPath); - metadata.write(MAGIC); - ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); - serializeFooter(metadataFooter, metadata); - metadata.close(); - } - - private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) { - String rootPath = root.toString(); - GlobalMetaData fileMetaData = null; - List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - for (Footer footer : footers) { - String path = footer.getFile().toString(); - if (!path.startsWith(rootPath)) { - throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root); - } - path = path.substring(rootPath.length()); - while (path.startsWith("/")) { - path = path.substring(1); - } - fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData); - for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) { - block.setPath(path); - blocks.add(block); - } - } - return new ParquetMetadata(fileMetaData.merge(), blocks); - } - - /** - * @return the current position in the underlying file - * @throws IOException - */ - public long getPos() throws IOException { - return out.getPos(); - } - - /** - * Will merge the metadata of all the footers together - * @param footers the list files footers to merge - * @return the global meta data for all the footers - */ - static GlobalMetaData getGlobalMetaData(List<Footer> footers) { - GlobalMetaData fileMetaData = null; - for (Footer footer : footers) { - ParquetMetadata currentMetadata = footer.getParquetMetadata(); - fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData); - } - return fileMetaData; - } - - /** - * Will return the result of merging toMerge into mergedMetadata - * @param toMerge the metadata toMerge - * @param mergedMetadata the reference metadata to merge into - * @return the result of the merge - */ - static GlobalMetaData mergeInto( - FileMetaData toMerge, - GlobalMetaData mergedMetadata) { - MessageType schema = null; - Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>(); - Set<String> createdBy = new HashSet<String>(); - if (mergedMetadata != null) { - schema = mergedMetadata.getSchema(); - newKeyValues.putAll(mergedMetadata.getKeyValueMetaData()); - createdBy.addAll(mergedMetadata.getCreatedBy()); - } - if ((schema == null && toMerge.getSchema() != null) - || (schema != null && !schema.equals(toMerge.getSchema()))) { - schema = mergeInto(toMerge.getSchema(), schema); - } - for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) { - Set<String> values = newKeyValues.get(entry.getKey()); - if (values == null) { - values = new HashSet<String>(); - newKeyValues.put(entry.getKey(), values); - } - values.add(entry.getValue()); - } - createdBy.add(toMerge.getCreatedBy()); - return new GlobalMetaData( - schema, - newKeyValues, - createdBy); - } - - /** - * will return the result of merging toMerge into mergedSchema - * @param toMerge the schema to merge into mergedSchema - * @param mergedSchema the schema to append the fields to - * @return the resulting schema - */ - static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) { - if (mergedSchema == null) { - return toMerge; - } - return mergedSchema.union(toMerge); - } - -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java deleted file mode 100644 index 0fb2c3a..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import java.io.Closeable; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.*; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.api.ReadSupport.ReadContext; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.GlobalMetaData; -import parquet.schema.MessageType; - -/** - * Read records from a Parquet file. - */ -public class ParquetReader<T> implements Closeable { - - private ReadSupport<T> readSupport; - private UnboundRecordFilter filter; - private Configuration conf; - private ReadContext readContext; - private Iterator<Footer> footersIterator; - private InternalParquetRecordReader<T> reader; - private GlobalMetaData globalMetaData; - - /** - * @param file the file to read - * @param readSupport to materialize records - * @throws IOException - */ - public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException { - this(file, readSupport, null); - } - - /** - * @param conf the configuration - * @param file the file to read - * @param readSupport to materialize records - * @throws IOException - */ - public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException { - this(conf, file, readSupport, null); - } - - /** - * @param file the file to read - * @param readSupport to materialize records - * @param filter the filter to use to filter records - * @throws IOException - */ - public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { - this(new Configuration(), file, readSupport, filter); - } - - /** - * @param conf the configuration - * @param file the file to read - * @param readSupport to materialize records - * @param filter the filter to use to filter records - * @throws IOException - */ - public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException { - this.readSupport = readSupport; - this.filter = filter; - this.conf = conf; - - FileSystem fs = file.getFileSystem(conf); - List<FileStatus> statuses = Arrays.asList(fs.listStatus(file)); - List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses); - this.footersIterator = footers.iterator(); - globalMetaData = ParquetFileWriter.getGlobalMetaData(footers); - - List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - for (Footer footer : footers) { - blocks.addAll(footer.getParquetMetadata().getBlocks()); - } - - MessageType schema = globalMetaData.getSchema(); - Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData(); - readContext = readSupport.init(new InitContext(conf, extraMetadata, schema)); - } - - /** - * @return the next record or null if finished - * @throws IOException - */ - public T read() throws IOException { - try { - if (reader != null && reader.nextKeyValue()) { - return reader.getCurrentValue(); - } else { - initReader(); - return reader == null ? null : read(); - } - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - private void initReader() throws IOException { - if (reader != null) { - reader.close(); - reader = null; - } - if (footersIterator.hasNext()) { - Footer footer = footersIterator.next(); - reader = new InternalParquetRecordReader<T>(readSupport, filter); - reader.initialize( - readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(), - readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf); - } - } - - @Override - public void close() throws IOException { - if (reader != null) { - reader.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java deleted file mode 100644 index 0447a47..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.column.ParquetProperties; -import parquet.hadoop.api.WriteSupport; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.MessageType; - -import java.io.Closeable; -import java.io.IOException; - -public class ParquetWriter<T> implements Closeable { - - public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; - public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; - public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = - CompressionCodecName.UNCOMPRESSED; - public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; - public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; - public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = - ParquetProperties.WriterVersion.PARQUET_1_0; - - private final InternalParquetRecordWriter<T> writer; - - /** - * Create a new ParquetWriter. - * (with dictionary encoding enabled and validation off) - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean, boolean) - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold (both data and dictionary) - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws IOException - * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws IOException - * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - dictionaryPageSize, enableDictionary, validating, - DEFAULT_WRITER_VERSION); - } - - /** - * Create a new ParquetWriter. - * - * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads - * configuration from the classpath. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} - * @throws IOException - * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion} - * @param conf Hadoop configuration to use while accessing the filesystem - * @throws IOException - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion, - Configuration conf) throws IOException { - - WriteSupport.WriteContext writeContext = writeSupport.init(conf); - MessageType schema = writeContext.getSchema(); - - ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); - fileWriter.start(); - - CodecFactory codecFactory = new CodecFactory(conf); - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); - this.writer = new InternalParquetRecordWriter<T>( - fileWriter, - writeSupport, - schema, - writeContext.getExtraMetaData(), - blockSize, - pageSize, - compressor, - dictionaryPageSize, - enableDictionary, - validating, - writerVersion); - } - - /** - * Create a new ParquetWriter. The default block size is 50 MB.The default - * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @throws IOException - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException { - this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); - } - - public void write(T object) throws IOException { - try { - writer.write(object); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - public long getEstimatedWrittenSize() throws IOException { - return this.writer.getEstimatedWrittenSize(); - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (InterruptedException e) { - throw new IOException(e); - } - }} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java deleted file mode 100644 index c1835df..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java +++ /dev/null @@ -1,112 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.tuple.offheap.HeapTuple; -import org.apache.tajo.tuple.offheap.OffHeapRowWriter; -import org.apache.tajo.tuple.offheap.ZeroCopyTuple; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable { - private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class); - - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - // buffer - private ByteBuffer buffer; - private long address; - - public BaseTupleBuilder(Schema schema) { - super(SchemaUtil.toDataTypes(schema)); - buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder()); - address = UnsafeUtil.getAddress(buffer); - } - - @Override - public long address() { - return address; - } - - public void ensureSize(int size) { - if (buffer.remaining() - size < 0) { // check the remain size - // enlarge new buffer and copy writing data - int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2); - ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); - long newAddress = ((DirectBuffer)newByteBuf).address(); - UNSAFE.copyMemory(this.address, newAddress, buffer.limit()); - LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false)); - - // release existing buffer and replace variables - UnsafeUtil.free(buffer); - buffer = newByteBuf; - address = newAddress; - } - } - - @Override - public int position() { - return 0; - } - - @Override - public void forward(int length) { - } - - @Override - public void endRow() { - super.endRow(); - buffer.position(0).limit(offset()); - } - - @Override - public Tuple build() { - return buildToHeapTuple(); - } - - public HeapTuple buildToHeapTuple() { - byte [] bytes = new byte[buffer.limit()]; - UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit()); - return new HeapTuple(bytes, dataTypes()); - } - - public ZeroCopyTuple buildToZeroCopyTuple() { - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - zcTuple.set(buffer, 0, buffer.limit(), dataTypes()); - return zcTuple; - } - - public void release() { - UnsafeUtil.free(buffer); - buffer = null; - address = 0; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java deleted file mode 100644 index be734e1..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.Tuple; - -public interface RowBlockReader<T extends Tuple> { - - /** - * Return for each tuple - * - * @return True if tuple block is filled with tuples. Otherwise, It will return false. - */ - public boolean next(T tuple); - - public void reset(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java deleted file mode 100644 index c43c018..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java +++ /dev/null @@ -1,26 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple; - -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.tuple.offheap.RowWriter; - -public interface TupleBuilder extends RowWriter { - public Tuple build(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java deleted file mode 100644 index 9662d5a..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.UnsafeUtil; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class DirectBufTuple extends UnSafeTuple implements Deallocatable { - private ByteBuffer bb; - - public DirectBufTuple(int length, DataType[] types) { - bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder()); - set(bb, 0, length, types); - } - - @Override - public void release() { - UnsafeUtil.free(bb); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java deleted file mode 100644 index a327123..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -/** - * Fixed size limit specification - */ -public class FixedSizeLimitSpec extends ResizableLimitSpec { - public FixedSizeLimitSpec(long size) { - super(size, size); - } - - public FixedSizeLimitSpec(long size, float allowedOverflowRatio) { - super(size, size, allowedOverflowRatio); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java deleted file mode 100644 index 33f9f1c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java +++ /dev/null @@ -1,272 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; - -import org.apache.tajo.datum.*; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.StringUtils; -import org.apache.tajo.util.UnsafeUtil; - -import sun.misc.Unsafe; - -import java.nio.ByteBuffer; -import java.nio.charset.Charset; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class HeapTuple implements Tuple { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET; - - private final byte [] data; - private final DataType [] types; - - public HeapTuple(final byte [] bytes, final DataType [] types) { - this.data = bytes; - this.types = types; - } - - @Override - public int size() { - return data.length; - } - - public ByteBuffer nioBuffer() { - return ByteBuffer.wrap(data); - } - - private int getFieldOffset(int fieldId) { - return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)); - } - - private int checkNullAndGetOffset(int fieldId) { - int offset = getFieldOffset(fieldId); - if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) { - throw new RuntimeException("Invalid Field Access: " + fieldId); - } - return offset; - } - - @Override - public boolean contains(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isNull(int fieldid) { - return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public boolean isNotNull(int fieldid) { - return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - @Override - public void clear() { - // nothing to do - } - - @Override - public void put(int fieldId, Datum value) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum)."); - } - - @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Datum [])."); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple)."); - } - - @Override - public void put(Datum[] values) { - throw new UnsupportedException("UnSafeTuple does not support put(Datum [])."); - } - - @Override - public Datum get(int fieldId) { - if (isNull(fieldId)) { - return NullDatum.get(); - } - - switch (types[fieldId].getType()) { - case BOOLEAN: - return DatumFactory.createBool(getBool(fieldId)); - case INT1: - case INT2: - return DatumFactory.createInt2(getInt2(fieldId)); - case INT4: - return DatumFactory.createInt4(getInt4(fieldId)); - case INT8: - return DatumFactory.createInt8(getInt4(fieldId)); - case FLOAT4: - return DatumFactory.createFloat4(getFloat4(fieldId)); - case FLOAT8: - return DatumFactory.createFloat8(getFloat8(fieldId)); - case TEXT: - return DatumFactory.createText(getText(fieldId)); - case TIMESTAMP: - return DatumFactory.createTimestamp(getInt8(fieldId)); - case DATE: - return DatumFactory.createDate(getInt4(fieldId)); - case TIME: - return DatumFactory.createTime(getInt8(fieldId)); - case INTERVAL: - return getInterval(fieldId); - case INET4: - return DatumFactory.createInet4(getInt4(fieldId)); - case PROTOBUF: - return getProtobufDatum(fieldId); - default: - throw new UnsupportedException("Unknown type: " + types[fieldId]); - } - } - - @Override - public void setOffset(long offset) { - } - - @Override - public long getOffset() { - return 0; - } - - @Override - public boolean getBool(int fieldId) { - return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01; - } - - @Override - public byte getByte(int fieldId) { - return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public char getChar(int fieldId) { - return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public byte[] getBytes(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int len = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return bytes; - } - - @Override - public short getInt2(int fieldId) { - return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public int getInt4(int fieldId) { - return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public long getInt8(int fieldId) { - return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public float getFloat4(int fieldId) { - return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public double getFloat8(int fieldId) { - return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)); - } - - @Override - public String getText(int fieldId) { - return new String(getBytes(fieldId)); - } - - public IntervalDatum getInterval(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int months = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos); - return new IntervalDatum(months, millisecs); - } - - @Override - public Datum getProtobufDatum(int fieldId) { - byte [] bytes = getBytes(fieldId); - - ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode()); - Message.Builder builder = factory.newBuilder(); - try { - builder.mergeFrom(bytes); - } catch (InvalidProtocolBufferException e) { - return NullDatum.get(); - } - - return new ProtobufDatum(builder.build()); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - long pos = checkNullAndGetOffset(fieldId); - int len = UNSAFE.getInt(data, BASE_OFFSET + pos); - pos += SizeOf.SIZE_OF_INT; - - byte [] bytes = new byte[len]; - UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len); - return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8")); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - return this; - } - - @Override - public Datum[] getValues() { - Datum [] datums = new Datum[size()]; - for (int i = 0; i < size(); i++) { - if (contains(i)) { - datums[i] = get(i); - } else { - datums[i] = NullDatum.get(); - } - } - return datums; - } - - @Override - public String toString() { - return VTuple.toDisplayString(getValues()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java deleted file mode 100644 index 2f8e349..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; -import sun.nio.ch.DirectBuffer; - -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -public class OffHeapMemory implements Deallocatable { - private static final Log LOG = LogFactory.getLog(OffHeapMemory.class); - - protected static final Unsafe UNSAFE = UnsafeUtil.unsafe; - - protected ByteBuffer buffer; - protected int memorySize; - protected ResizableLimitSpec limitSpec; - protected long address; - - @VisibleForTesting - protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) { - this.buffer = buffer; - this.address = ((DirectBuffer) buffer).address(); - this.memorySize = buffer.limit(); - this.limitSpec = limitSpec; - } - - public OffHeapMemory(ResizableLimitSpec limitSpec) { - this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec); - } - - public long address() { - return address; - } - - public long size() { - return memorySize; - } - - public void resize(int newSize) { - Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes"); - - if (newSize > limitSpec.limit()) { - throw new RuntimeException("Resize cannot exceed the size limit"); - } - - if (newSize < memorySize) { - LOG.warn("The size reduction is ignored."); - } - - int newBlockSize = UnsafeUtil.alignedSize(newSize); - ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize); - long newAddress = ((DirectBuffer)newByteBuf).address(); - - UNSAFE.copyMemory(this.address, newAddress, memorySize); - - UnsafeUtil.free(buffer); - this.memorySize = newSize; - this.buffer = newByteBuf; - this.address = newAddress; - } - - public java.nio.Buffer nioBuffer() { - return (ByteBuffer) buffer.position(0).limit(memorySize); - } - - @Override - public void release() { - UnsafeUtil.free(this.buffer); - this.buffer = null; - this.address = 0; - this.memorySize = 0; - } - - public String toString() { - return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java deleted file mode 100644 index 689efb7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java +++ /dev/null @@ -1,176 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.util.Deallocatable; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.SizeOf; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import static org.apache.tajo.common.TajoDataTypes.DataType; - -public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable { - private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class); - - public static final int NULL_FIELD_OFFSET = -1; - - DataType [] dataTypes; - - // Basic States - private int maxRowNum = Integer.MAX_VALUE; // optional - private int rowNum; - protected int position = 0; - - private OffHeapRowBlockWriter builder; - - private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) { - super(buffer, limitSpec); - initialize(schema); - } - - public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) { - super(limitSpec); - initialize(schema); - } - - private void initialize(Schema schema) { - dataTypes = SchemaUtil.toDataTypes(schema); - - this.builder = new OffHeapRowBlockWriter(this); - } - - @VisibleForTesting - public OffHeapRowBlock(Schema schema, int bytes) { - this(schema, new ResizableLimitSpec(bytes)); - } - - @VisibleForTesting - public OffHeapRowBlock(Schema schema, ByteBuffer buffer) { - this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT); - } - - public void position(int pos) { - this.position = pos; - } - - public void clear() { - this.position = 0; - this.rowNum = 0; - - builder.clear(); - } - - @Override - public ByteBuffer nioBuffer() { - return (ByteBuffer) buffer.position(0).limit(position); - } - - public int position() { - return position; - } - - public long usedMem() { - return position; - } - - /** - * Ensure that this buffer has enough remaining space to add the size. - * Creates and copies to a new buffer if necessary - * - * @param size Size to add - */ - public void ensureSize(int size) { - if (remain() - size < 0) { - if (!limitSpec.canIncrease(memorySize)) { - throw new RuntimeException("Cannot increase RowBlock anymore."); - } - - int newBlockSize = limitSpec.increasedSize(memorySize); - resize(newBlockSize); - LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false)); - } - } - - public long remain() { - return memorySize - position - builder.offset(); - } - - public int maxRowNum() { - return maxRowNum; - } - public int rows() { - return rowNum; - } - - public void setRows(int rowNum) { - this.rowNum = rowNum; - } - - public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException { - if (channel.position() < channel.size()) { - clear(); - - buffer.clear(); - channel.read(buffer); - memorySize = buffer.position(); - - while (position < memorySize) { - long recordPtr = address + position; - - if (remain() < SizeOf.SIZE_OF_INT) { - channel.position(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - int recordSize = UNSAFE.getInt(recordPtr); - - if (remain() < recordSize) { - channel.position(channel.position() - remain()); - memorySize = (int) (memorySize - remain()); - return true; - } - - position += recordSize; - rowNum++; - } - - return true; - } else { - return false; - } - } - - public RowWriter getWriter() { - return builder; - } - - public OffHeapRowBlockReader getReader() { - return new OffHeapRowBlockReader(this); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java deleted file mode 100644 index 4a9313f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java +++ /dev/null @@ -1,63 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.tuple.RowBlockReader; -import org.apache.tajo.util.UnsafeUtil; -import sun.misc.Unsafe; - -public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> { - private static final Unsafe UNSAFE = UnsafeUtil.unsafe; - OffHeapRowBlock rowBlock; - - // Read States - private int curRowIdxForRead; - private int curPosForRead; - - public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) { - this.rowBlock = rowBlock; - } - - public long remainForRead() { - return rowBlock.memorySize - curPosForRead; - } - - @Override - public boolean next(ZeroCopyTuple tuple) { - if (curRowIdxForRead < rowBlock.rows()) { - - long recordStartPtr = rowBlock.address() + curPosForRead; - int recordLen = UNSAFE.getInt(recordStartPtr); - tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes); - - curPosForRead += recordLen; - curRowIdxForRead++; - - return true; - } else { - return false; - } - } - - @Override - public void reset() { - curPosForRead = 0; - curRowIdxForRead = 0; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java deleted file mode 100644 index dbc3188..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.collect.Lists; -import org.apache.tajo.storage.Tuple; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -public class OffHeapRowBlockUtils { - - public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { - List<Tuple> tupleList = Lists.newArrayList(); - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - while(reader.next(zcTuple)) { - tupleList.add(zcTuple); - zcTuple = new ZeroCopyTuple(); - } - Collections.sort(tupleList, comparator); - return tupleList; - } - - public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) { - Tuple[] tuples = new Tuple[rowBlock.rows()]; - ZeroCopyTuple zcTuple = new ZeroCopyTuple(); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) { - tuples[i] = zcTuple; - zcTuple = new ZeroCopyTuple(); - } - Arrays.sort(tuples, comparator); - return tuples; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java deleted file mode 100644 index d177e0c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; - -public class OffHeapRowBlockWriter extends OffHeapRowWriter { - OffHeapRowBlock rowBlock; - - OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) { - super(rowBlock.dataTypes); - this.rowBlock = rowBlock; - } - - public long address() { - return rowBlock.address(); - } - - public int position() { - return rowBlock.position(); - } - - @Override - public void forward(int length) { - rowBlock.position(position() + length); - } - - public void ensureSize(int size) { - rowBlock.ensureSize(size); - } - - @Override - public void endRow() { - super.endRow(); - rowBlock.setRows(rowBlock.rows() + 1); - } - - @Override - public TajoDataTypes.DataType[] dataTypes() { - return rowBlock.dataTypes; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java deleted file mode 100644 index 85c7e0b..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.util.SizeOf; -import org.apache.tajo.util.UnsafeUtil; - -/** - * - * Row Record Structure - * - * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N | - * 4 bytes 4 bytes 4 bytes - * - */ -public abstract class OffHeapRowWriter implements RowWriter { - /** record size + offset list */ - private final int headerSize; - /** field offsets */ - private final int [] fieldOffsets; - private final TajoDataTypes.DataType [] dataTypes; - - private int curFieldIdx; - private int curOffset; - - public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) { - this.dataTypes = dataTypes; - fieldOffsets = new int[dataTypes.length]; - headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1); - } - - public void clear() { - curOffset = 0; - curFieldIdx = 0; - } - - public long recordStartAddr() { - return address() + position(); - } - - public abstract long address(); - - public abstract void ensureSize(int size); - - public int offset() { - return curOffset; - } - - /** - * Current position - * - * @return The position - */ - public abstract int position(); - - /** - * Forward the address; - * - * @param length Length to be forwarded - */ - public abstract void forward(int length); - - @Override - public TajoDataTypes.DataType[] dataTypes() { - return dataTypes; - } - - public boolean startRow() { - curOffset = headerSize; - curFieldIdx = 0; - return true; - } - - public void endRow() { - long rowHeaderPos = address() + position(); - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset); - rowHeaderPos += SizeOf.SIZE_OF_INT; - - for (int i = 0; i < curFieldIdx; i++) { - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]); - rowHeaderPos += SizeOf.SIZE_OF_INT; - } - for (int i = curFieldIdx; i < dataTypes.length; i++) { - OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET); - rowHeaderPos += SizeOf.SIZE_OF_INT; - } - - // rowOffset is equivalent to a byte length of this row. - forward(curOffset); - } - - public void skipField() { - fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET; - } - - private void forwardField() { - fieldOffsets[curFieldIdx++] = curOffset; - } - - public void putBool(boolean val) { - ensureSize(SizeOf.SIZE_OF_BOOL); - forwardField(); - - OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00)); - - curOffset += SizeOf.SIZE_OF_BOOL; - } - - public void putInt2(short val) { - ensureSize(SizeOf.SIZE_OF_SHORT); - forwardField(); - - OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_SHORT; - } - - public void putInt4(int val) { - ensureSize(SizeOf.SIZE_OF_INT); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_INT; - } - - public void putInt8(long val) { - ensureSize(SizeOf.SIZE_OF_LONG); - forwardField(); - - OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_LONG; - } - - public void putFloat4(float val) { - ensureSize(SizeOf.SIZE_OF_FLOAT); - forwardField(); - - OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_FLOAT; - } - - public void putFloat8(double val) { - ensureSize(SizeOf.SIZE_OF_DOUBLE); - forwardField(); - - OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val); - curOffset += SizeOf.SIZE_OF_DOUBLE; - } - - public void putText(String val) { - byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET); - putText(bytes); - } - - public void putText(byte[] val) { - int bytesLen = val.length; - - ensureSize(SizeOf.SIZE_OF_INT + bytesLen); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); - curOffset += SizeOf.SIZE_OF_INT; - - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, - recordStartAddr() + curOffset, bytesLen); - curOffset += bytesLen; - } - - public void putBlob(byte[] val) { - int bytesLen = val.length; - - ensureSize(SizeOf.SIZE_OF_INT + bytesLen); - forwardField(); - - OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen); - curOffset += SizeOf.SIZE_OF_INT; - - OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null, - recordStartAddr() + curOffset, bytesLen); - curOffset += bytesLen; - } - - public void putTimestamp(long val) { - putInt8(val); - } - - public void putDate(int val) { - putInt4(val); - } - - public void putTime(long val) { - putInt8(val); - } - - public void putInterval(IntervalDatum val) { - ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG); - forwardField(); - - long offset = recordStartAddr() + curOffset; - OffHeapMemory.UNSAFE.putInt(offset, val.getMonths()); - offset += SizeOf.SIZE_OF_INT; - OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds()); - curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG; - } - - public void putInet4(int val) { - putInt4(val); - } - - public void putProtoDatum(ProtobufDatum val) { - putBlob(val.asByteArray()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java deleted file mode 100644 index 14e67b2..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.base.Preconditions; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.util.FileUtil; - -/** - * It specifies the maximum size or increasing ratio. In addition, - * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31 - * due to ByteBuffer. - */ -public class ResizableLimitSpec { - private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class); - - public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE; - public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE); - - private final long initSize; - private final long limitBytes; - private final float incRatio; - private final float allowedOVerflowRatio; - private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f; - private final static float DEFAULT_INCREASE_RATIO = 1.0f; - - public ResizableLimitSpec(long initSize) { - this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes) { - this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) { - this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO); - } - - public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) { - Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes."); - Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB."); - Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes."); - Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB."); - Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0."); - - if (initSize == limitBytes) { - long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio)); - - if (overflowedSize > Integer.MAX_VALUE) { - overflowedSize = Integer.MAX_VALUE; - } - - this.initSize = overflowedSize; - this.limitBytes = overflowedSize; - } else { - this.initSize = initSize; - limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio)); - - if (limitBytes > Integer.MAX_VALUE) { - this.limitBytes = Integer.MAX_VALUE; - } else { - this.limitBytes = limitBytes; - } - } - - this.allowedOVerflowRatio = allowedOverflowRatio; - this.incRatio = incRatio; - } - - public long initialSize() { - return initSize; - } - - public long limit() { - return limitBytes; - } - - public float remainRatio(long currentSize) { - Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); - if (currentSize > Integer.MAX_VALUE) { - currentSize = Integer.MAX_VALUE; - } - return (float)currentSize / (float)limitBytes; - } - - public boolean canIncrease(long currentSize) { - return remain(currentSize) > 0; - } - - public long remain(long currentSize) { - Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes."); - return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize; - } - - public int increasedSize(int currentSize) { - if (currentSize < initSize) { - return (int) initSize; - } - - if (currentSize > Integer.MAX_VALUE) { - LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)"); - return Integer.MAX_VALUE; - } - long nextSize = (long) (currentSize + ((float) currentSize * incRatio)); - - if (nextSize > limitBytes) { - LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")"); - nextSize = limitBytes; - } - - if (nextSize > Integer.MAX_VALUE) { - LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")"); - nextSize = Integer.MAX_VALUE; - } - - return (int) nextSize; - } - - @Override - public String toString() { - return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit=" - + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio - + ",inc_ratio=" + incRatio; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java deleted file mode 100644 index a2b2561..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java +++ /dev/null @@ -1,73 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; - -/** - * The call sequence should be as follows: - * - * <pre> - * startRow() --> skipField() or putXXX --> endRow() - * </pre> - * - * The total number of skipField and putXXX invocations must be equivalent to the number of fields. - */ -public interface RowWriter { - - public TajoDataTypes.DataType [] dataTypes(); - - public boolean startRow(); - - public void endRow(); - - public void skipField(); - - public void putBool(boolean val); - - public void putInt2(short val); - - public void putInt4(int val); - - public void putInt8(long val); - - public void putFloat4(float val); - - public void putFloat8(double val); - - public void putText(String val); - - public void putText(byte[] val); - - public void putBlob(byte[] val); - - public void putTimestamp(long val); - - public void putTime(long val); - - public void putDate(int val); - - public void putInterval(IntervalDatum val); - - public void putInet4(int val); - - public void putProtoDatum(ProtobufDatum datum); -}
