http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java deleted file mode 100644 index 1448885..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.ByteBufInputChannel; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -public class ByteBufLineReader implements Closeable { - private static int DEFAULT_BUFFER = 64 * 1024; - - private int bufferSize; - private long readBytes; - private ByteBuf buffer; - private final ByteBufInputChannel channel; - private final AtomicInteger tempReadBytes = new AtomicInteger(); - private final LineSplitProcessor processor = new LineSplitProcessor(); - - public ByteBufLineReader(ByteBufInputChannel channel) { - this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); - } - - public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { - this.readBytes = 0; - this.channel = channel; - this.buffer = buf; - this.bufferSize = buf.capacity(); - } - - public long readBytes() { - return readBytes - buffer.readableBytes(); - } - - public long available() throws IOException { - return channel.available() + buffer.readableBytes(); - } - - @Override - public void close() throws IOException { - if (this.buffer.refCnt() > 0) { - this.buffer.release(); - } - this.channel.close(); - } - - public String readLine() throws IOException { - ByteBuf buf = readLineBuf(tempReadBytes); - if (buf != null) { - return buf.toString(CharsetUtil.UTF_8); - } - return null; - } - - private void fillBuffer() throws IOException { - - int tailBytes = 0; - if (this.readBytes > 0) { - this.buffer.markReaderIndex(); - this.buffer.discardSomeReadBytes(); // compact the buffer - tailBytes = this.buffer.writerIndex(); - if (!this.buffer.isWritable()) { - // a line bytes is large than the buffer - BufferPool.ensureWritable(buffer, bufferSize); - this.bufferSize = buffer.capacity(); - } - } - - boolean release = true; - try { - int readBytes = tailBytes; - for (; ; ) { - int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes); - if (localReadBytes < 0) { - break; - } - readBytes += localReadBytes; - if (readBytes == bufferSize) { - break; - } - } - this.readBytes += (readBytes - tailBytes); - release = false; - this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) - } finally { - if (release) { - buffer.release(); - } - } - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { - int startIndex = buffer.readerIndex(); - int readBytes; - int readable; - int newlineLength; //length of terminating newline - - loop: - while (true) { - readable = buffer.readableBytes(); - if (readable <= 0) { - buffer.readerIndex(startIndex); - fillBuffer(); //compact and fill buffer - if (!buffer.isReadable()) { - return null; - } else { - startIndex = 0; // reset the line start position - } - readable = buffer.readableBytes(); - } - - int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); - if (endIndex < 0) { - buffer.readerIndex(buffer.writerIndex()); - } else { - buffer.readerIndex(endIndex + 1); - readBytes = buffer.readerIndex() - startIndex; - if (processor.isPrevCharCR() && buffer.isReadable() - && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { - buffer.skipBytes(1); - newlineLength = 2; - } else { - newlineLength = 1; - } - break loop; - } - } - reads.set(readBytes); - return buffer.slice(startIndex, readBytes - newlineLength); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java deleted file mode 100644 index d9e2016..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; -import org.apache.tajo.common.exception.NotImplementedException; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.ByteBufInputChannel; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicInteger; - -public class DelimitedLineReader implements Closeable { - private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); - private final static int DEFAULT_PAGE_SIZE = 128 * 1024; - - private FileSystem fs; - private FSDataInputStream fis; - private InputStream is; //decompressd stream - private CompressionCodecFactory factory; - private CompressionCodec codec; - private Decompressor decompressor; - - private long startOffset, end, pos; - private boolean eof = true; - private ByteBufLineReader lineReader; - private AtomicInteger tempReadBytes = new AtomicInteger(); - private FileFragment fragment; - private Configuration conf; - - public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { - this.fragment = fragment; - this.conf = conf; - this.factory = new CompressionCodecFactory(conf); - this.codec = factory.getCodec(fragment.getPath()); - if (this.codec instanceof SplittableCompressionCodec) { - throw new NotImplementedException(); // bzip2 does not support multi-thread model - } - } - - public void init() throws IOException { - if (fs == null) { - fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); - } - if (fis == null) fis = fs.open(fragment.getPath()); - pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getLength(); - - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - is = new DataInputStream(codec.createInputStream(fis, decompressor)); - ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); - } else { - fis.seek(startOffset); - is = fis; - - ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, - BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); - } - eof = false; - } - - public long getCompressedPosition() throws IOException { - long retVal; - if (isCompressed()) { - retVal = fis.getPos(); - } else { - retVal = pos; - } - return retVal; - } - - public long getUnCompressedPosition() throws IOException { - return pos; - } - - public long getReadBytes() { - return pos - startOffset; - } - - public boolean isReadable() { - return !eof; - } - - public ByteBuf readLine() throws IOException { - if (eof) { - return null; - } - - ByteBuf buf = lineReader.readLineBuf(tempReadBytes); - if (buf == null) { - eof = true; - } else { - pos += tempReadBytes.get(); - } - - if (!isCompressed() && getCompressedPosition() > end) { - eof = true; - } - return buf; - } - - public boolean isCompressed() { - return codec != null; - } - - @Override - public void close() throws IOException { - try { - IOUtils.cleanup(LOG, lineReader, is, fis); - fs = null; - is = null; - fis = null; - lineReader = null; - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java deleted file mode 100644 index a337509..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ /dev/null @@ -1,468 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; - -public class DelimitedTextFile { - - public static final byte LF = '\n'; - public static int EOF = -1; - - private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); - - public static class DelimitedTextFileAppender extends FileAppender { - private final TableMeta meta; - private final Schema schema; - private final int columnNum; - private final FileSystem fs; - private FSDataOutputStream fos; - private DataOutputStream outputStream; - private CompressionOutputStream deflateFilter; - private char delimiter; - private TableStatistics stats = null; - private Compressor compressor; - private CompressionCodecFactory codecFactory; - private CompressionCodec codec; - private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; - private int bufferedBytes = 0; - private long pos = 0; - - private NonSyncByteArrayOutputStream os; - private FieldSerializerDeserializer serde; - - public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, - final Schema schema, final TableMeta meta, final Path path) - throws IOException { - super(conf, taskAttemptId, schema, meta, path); - this.fs = path.getFileSystem(conf); - this.meta = meta; - this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - this.columnNum = schema.size(); - - String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - @Override - public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { - String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); - codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByClassName(codecName); - compressor = CodecPool.getCompressor(codec); - if (compressor != null) compressor.reset(); //builtin gzip is null - - String extension = codec.getDefaultExtension(); - compressedPath = path.suffix(extension); - - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); - deflateFilter = codec.createOutputStream(fos, compressor); - outputStream = new DataOutputStream(deflateFilter); - - } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); - outputStream = new DataOutputStream(new BufferedOutputStream(fos)); - } - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - serde = new TextFieldSerializerDeserializer(); - - if (os == null) { - os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); - } - - os.reset(); - pos = fos.getPos(); - bufferedBytes = 0; - super.init(); - } - - - @Override - public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; - - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars); - - if (columnNum - 1 > i) { - os.write((byte) delimiter); - rowBytes += 1; - } - } - os.write(LF); - rowBytes += 1; - - pos += rowBytes; - bufferedBytes += rowBytes; - if (bufferedBytes > BUFFER_SIZE) { - flushBuffer(); - } - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - private void flushBuffer() throws IOException { - if (os.getLength() > 0) { - os.writeTo(outputStream); - os.reset(); - bufferedBytes = 0; - } - } - - @Override - public long getOffset() throws IOException { - return pos; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - - try { - if(outputStream != null){ - flush(); - } - - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - - if (deflateFilter != null) { - deflateFilter.finish(); - deflateFilter.resetState(); - deflateFilter = null; - } - - os.close(); - } finally { - IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - - public boolean isCompress() { - return compressor != null; - } - - public String getExtension() { - return codec != null ? codec.getDefaultExtension() : ""; - } - } - - public static class DelimitedTextFileScanner extends FileScanner { - - private boolean splittable = false; - private final long startOffset; - private final long endOffset; - - private int recordCount = 0; - private int[] targetColumnIndexes; - - private ByteBuf nullChars; - private FieldSerializerDeserializer serde; - private DelimitedLineReader reader; - private FieldSplitProcessor processor; - - public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, - final Fragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - reader = new DelimitedLineReader(conf, this.fragment); - if (!reader.isCompressed()) { - splittable = true; - } - - startOffset = this.fragment.getStartKey(); - endOffset = startOffset + fragment.getLength(); - - //Delimiter - String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); - this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0)); - } - - @Override - public void init() throws IOException { - if (nullChars != null) { - nullChars.release(); - } - - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - byte[] bytes; - if (StringUtils.isEmpty(nullCharacters)) { - bytes = NullDatum.get().asTextBytes(); - } else { - bytes = nullCharacters.getBytes(); - } - - nullChars = BufferPool.directBuffer(bytes.length, bytes.length); - nullChars.writeBytes(bytes); - - if (reader != null) { - reader.close(); - } - reader = new DelimitedLineReader(conf, fragment); - reader.init(); - recordCount = 0; - - if (targets == null) { - targets = schema.toArray(); - } - - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - - serde = new TextFieldSerializerDeserializer(); - - super.init(); - Arrays.sort(targetColumnIndexes); - if (LOG.isDebugEnabled()) { - LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); - } - - if (startOffset > 0) { - reader.readLine(); // skip first line; - } - } - - public ByteBuf readLine() throws IOException { - ByteBuf buf = reader.readLine(); - if (buf == null) { - return null; - } else { - recordCount++; - } - - return buf; - } - - @Override - public float getProgress() { - try { - if (!reader.isReadable()) { - return 1.0f; - } - long filePos = reader.getCompressedPosition(); - if (startOffset == filePos) { - return 0.0f; - } else { - long readBytes = filePos - startOffset; - long remainingBytes = Math.max(endOffset - filePos, 0); - return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return 0.0f; - } - } - - @Override - public Tuple next() throws IOException { - try { - if (!reader.isReadable()) return null; - - ByteBuf buf = readLine(); - if (buf == null) return null; - - if (targets.length == 0) { - return EmptyTuple.get(); - } - - VTuple tuple = new VTuple(schema.size()); - fillTuple(schema, tuple, buf, targetColumnIndexes); - return tuple; - } catch (Throwable t) { - LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t); - throw new IOException(t); - } - } - - private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException { - int[] projection = target; - if (lineBuf == null || target == null || target.length == 0) { - return; - } - - final int rowLength = lineBuf.readableBytes(); - int start = 0, fieldLength = 0, end = 0; - - //Projection - int currentTarget = 0; - int currentIndex = 0; - - while (end != -1) { - end = lineBuf.forEachByte(start, rowLength - start, processor); - - if (end < 0) { - fieldLength = rowLength - start; - } else { - fieldLength = end - start; - } - - if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { - lineBuf.setIndex(start, start + fieldLength); - Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - dst.put(currentIndex, datum); - currentTarget++; - } - - if (projection.length == currentTarget) { - break; - } - - start = end + 1; - currentIndex++; - } - } - - @Override - public void reset() throws IOException { - init(); - } - - @Override - public void close() throws IOException { - try { - if (nullChars != null) { - nullChars.release(); - nullChars = null; - } - - if (tableStats != null && reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - } - if (LOG.isDebugEnabled()) { - LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); - } - } finally { - IOUtils.cleanup(LOG, reader); - reader = null; - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public boolean isSplittable() { - return splittable; - } - - @Override - public TableStats getInputStats() { - if (tableStats != null && reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - tableStats.setNumBytes(fragment.getLength()); - } - return tableStats; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java deleted file mode 100644 index a5ac142..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBufProcessor; - -public class FieldSplitProcessor implements ByteBufProcessor { - private char delimiter; //the ascii separate character - - public FieldSplitProcessor(char recordDelimiterByte) { - this.delimiter = recordDelimiterByte; - } - - @Override - public boolean process(byte value) throws Exception { - return delimiter != value; - } - - public char getDelimiter() { - return delimiter; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java deleted file mode 100644 index a130527..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBufProcessor; - -public class LineSplitProcessor implements ByteBufProcessor { - public static final byte CR = '\r'; - public static final byte LF = '\n'; - private boolean prevCharCR = false; //true of prev char was CR - - @Override - public boolean process(byte value) throws Exception { - switch (value) { - case LF: - return false; - case CR: - prevCharCR = true; - return false; - default: - prevCharCR = false; - return true; - } - } - - public boolean isPrevCharCR() { - return prevCharCR; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java deleted file mode 100644 index 9722959..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import com.google.protobuf.Message; -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.*; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.storage.FieldSerializerDeserializer; -import org.apache.tajo.util.NumberUtil; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.CharsetDecoder; - -//Compatibility with Apache Hive -public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { - public static final byte[] trueBytes = "true".getBytes(); - public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); - - private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { - return !val.isReadable() || nullBytes.equals(val); - } - - private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { - return val.readableBytes() > 0 && nullBytes.equals(val); - } - - @Override - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException { - byte[] bytes; - int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); - - if (datum == null || datum instanceof NullDatum) { - switch (dataType.getType()) { - case CHAR: - case TEXT: - length = nullChars.length; - out.write(nullChars); - break; - default: - break; - } - return length; - } - - switch (dataType.getType()) { - case BOOLEAN: - out.write(datum.asBool() ? trueBytes : falseBytes); - length = trueBytes.length; - break; - case CHAR: - byte[] pad = new byte[dataType.getLength() - datum.size()]; - bytes = datum.asTextBytes(); - out.write(bytes); - out.write(pad); - length = bytes.length + pad.length; - break; - case TEXT: - case BIT: - case INT2: - case INT4: - case INT8: - case FLOAT4: - case FLOAT8: - case INET4: - case DATE: - case INTERVAL: - bytes = datum.asTextBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIME: - bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIMESTAMP: - bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case INET6: - case BLOB: - bytes = Base64.encodeBase64(datum.asByteArray(), false); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobuf = (ProtobufDatum) datum; - byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); - length = protoBytes.length; - out.write(protoBytes, 0, protoBytes.length); - break; - case NULL_TYPE: - default: - break; - } - return length; - } - - @Override - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { - Datum datum; - TajoDataTypes.Type type = col.getDataType().getType(); - boolean nullField; - if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { - nullField = isNullText(buf, nullChars); - } else { - nullField = isNull(buf, nullChars); - } - - if (nullField) { - datum = NullDatum.get(); - } else { - switch (type) { - case BOOLEAN: - byte bool = buf.readByte(); - datum = DatumFactory.createBool(bool == 't' || bool == 'T'); - break; - case BIT: - datum = DatumFactory.createBit(Byte.parseByte( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); - break; - case CHAR: - datum = DatumFactory.createChar( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); - break; - case INT1: - case INT2: - datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); - break; - case INT4: - datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); - break; - case INT8: - datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); - break; - case FLOAT4: - datum = DatumFactory.createFloat4( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case FLOAT8: - datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); - break; - case TEXT: { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createText(bytes); - break; - } - case DATE: - datum = DatumFactory.createDate( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case TIME: - datum = DatumFactory.createTime( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case TIMESTAMP: - datum = DatumFactory.createTimestamp( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case INTERVAL: - datum = DatumFactory.createInterval( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); - Message.Builder builder = factory.newBuilder(); - try { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - protobufJsonFormat.merge(bytes, builder); - datum = factory.createDatum(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - break; - } - case INET4: - datum = DatumFactory.createInet4( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case BLOB: { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); - break; - } - default: - datum = NullDatum.get(); - break; - } - } - return datum; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java deleted file mode 100644 index 543336f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.util.ReflectionUtils; - -import parquet.bytes.BytesInput; -import parquet.hadoop.BadConfigurationException; -import parquet.hadoop.metadata.CompressionCodecName; - -class CodecFactory { - - public class BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - public BytesDecompressor(CompressionCodec codec) { - this.codec = codec; - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } else { - decompressed = bytes; - } - return decompressed; - } - - private void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - /** - * Encapsulates the logic around hadoop compression - * - * @author Julien Le Dem - * - */ - public static class BytesCompressor { - - private final CompressionCodec codec; - private final Compressor compressor; - private final ByteArrayOutputStream compressedOutBuffer; - private final CompressionCodecName codecName; - - public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { - this.codecName = codecName; - this.codec = codec; - if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); - this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); - } else { - this.compressor = null; - this.compressedOutBuffer = null; - } - } - - public BytesInput compress(BytesInput bytes) throws IOException { - final BytesInput compressedBytes; - if (codec == null) { - compressedBytes = bytes; - } else { - compressedOutBuffer.reset(); - if (compressor != null) { - // null compressor for non-native gzip - compressor.reset(); - } - CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); - bytes.writeAllTo(cos); - cos.finish(); - cos.close(); - compressedBytes = BytesInput.from(compressedOutBuffer); - } - return compressedBytes; - } - - private void release() { - if (compressor != null) { - CodecPool.returnCompressor(compressor); - } - } - - public CompressionCodecName getCodecName() { - return codecName; - } - - } - - private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); - private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); - private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); - private final Configuration configuration; - - public CodecFactory(Configuration configuration) { - this.configuration = configuration; - } - - /** - * - * @param codecName the requested codec - * @return the corresponding hadoop codec. null if UNCOMPRESSED - */ - private CompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - CompressionCodec codec = codecByName.get(codecClassName); - if (codec != null) { - return codec; - } - - try { - Class<?> codecClass = Class.forName(codecClassName); - codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { - BytesCompressor comp = compressors.get(codecName); - if (comp == null) { - CompressionCodec codec = getCodec(codecName); - comp = new BytesCompressor(codecName, codec, pageSize); - compressors.put(codecName, comp); - } - return comp; - } - - public BytesDecompressor getDecompressor(CompressionCodecName codecName) { - BytesDecompressor decomp = decompressors.get(codecName); - if (decomp == null) { - CompressionCodec codec = getCodec(codecName); - decomp = new BytesDecompressor(codec); - decompressors.put(codecName, decomp); - } - return decomp; - } - - public void release() { - for (BytesCompressor compressor : compressors.values()) { - compressor.release(); - } - compressors.clear(); - for (BytesDecompressor decompressor : decompressors.values()) { - decompressor.release(); - } - decompressors.clear(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java deleted file mode 100644 index 5f89ead..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.INFO; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import parquet.Log; -import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageWriteStore; -import parquet.column.page.PageWriter; -import parquet.column.statistics.Statistics; -import parquet.column.statistics.BooleanStatistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; - -class ColumnChunkPageWriteStore implements PageWriteStore { - private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private static final class ColumnChunkPageWriter implements PageWriter { - - private final ColumnDescriptor path; - private final BytesCompressor compressor; - - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - private Set<Encoding> encodings = new HashSet<Encoding>(); - - private Statistics totalStatistics; - - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { - this.path = path; - this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); - } - - @Deprecated - @Override - public void writePage(BytesInput bytes, - int valueCount, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, - int valueCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public long getMemSize() { - return buf.size(); - } - - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.startColumn(path, totalValueCount, compressor.getCodecName()); - if (dictionaryPage != null) { - writer.writeDictionaryPage(dictionaryPage); - encodings.add(dictionaryPage.getEncoding()); - } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); - writer.endColumn(); - if (INFO) { - LOG.info( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); - } - encodings.clear(); - pageCount = 0; - } - - @Override - public long allocatedSize() { - return buf.getCapacity(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - } - - private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); - private final MessageType schema; - private final BytesCompressor compressor; - private final int initialSize; - - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { - this.compressor = compressor; - this.schema = schema; - this.initialSize = initialSize; - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - if (!writers.containsKey(path)) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); - } - return writers.get(path); - } - - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - List<ColumnDescriptor> columns = schema.getColumns(); - for (ColumnDescriptor columnDescriptor : columns) { - ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); - pageWriter.writeToFileWriter(writer); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java deleted file mode 100644 index 61567e5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.column.ColumnDescriptor; -import parquet.column.page.PageReadStore; -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.util.counters.BenchmarkCounter; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.ParquetDecodingException; -import parquet.io.api.RecordMaterializer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import static java.lang.String.format; -import static parquet.Log.DEBUG; - -class InternalParquetRecordReader<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); - - private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); - - private MessageType requestedSchema; - private MessageType fileSchema; - private int columnCount; - private final ReadSupport<T> readSupport; - - private RecordMaterializer<T> recordConverter; - - private T currentValue; - private long total; - private int current = 0; - private int currentBlock = -1; - private ParquetFileReader reader; - private parquet.io.RecordReader<T> recordReader; - private UnboundRecordFilter recordFilter; - - private long totalTimeSpentReadingBytes; - private long totalTimeSpentProcessingRecords; - private long startedAssemblingCurrentBlockAt; - - private long totalCountLoadedSoFar = 0; - - private Path file; - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport) { - this(readSupport, null); - } - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - * @param filter Optional filter for only returning matching records. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter - filter) { - this.readSupport = readSupport; - this.recordFilter = filter; - } - - private void checkRead() throws IOException { - if (current == totalCountLoadedSoFar) { - if (current != 0) { - long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; - totalTimeSpentProcessingRecords += timeAssembling; - LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); - long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; - long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; - long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; - LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); - } - - LOG.info("at row " + current + ". reading next block"); - long t0 = System.currentTimeMillis(); - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { - throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); - } - long timeSpentReading = System.currentTimeMillis() - t0; - totalTimeSpentReadingBytes += timeSpentReading; - BenchmarkCounter.incrementTime(timeSpentReading); - LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); - if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); - MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); - startedAssemblingCurrentBlockAt = System.currentTimeMillis(); - totalCountLoadedSoFar += pages.getRowCount(); - ++ currentBlock; - } - } - - public void close() throws IOException { - reader.close(); - } - - public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - public T getCurrentValue() throws IOException, - InterruptedException { - return currentValue; - } - - public float getProgress() throws IOException, InterruptedException { - return (float) current / total; - } - - public void initialize(MessageType requestedSchema, MessageType fileSchema, - Map<String, String> extraMetadata, Map<String, String> readSupportMetadata, - Path file, List<BlockMetaData> blocks, Configuration configuration) - throws IOException { - this.requestedSchema = requestedSchema; - this.fileSchema = fileSchema; - this.file = file; - this.columnCount = this.requestedSchema.getPaths().size(); - this.recordConverter = readSupport.prepareForRead( - configuration, extraMetadata, fileSchema, - new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); - - List<ColumnDescriptor> columns = requestedSchema.getColumns(); - reader = new ParquetFileReader(configuration, file, blocks, columns); - for (BlockMetaData block : blocks) { - total += block.getRowCount(); - } - LOG.info("RecordReader initialized will read a total of " + total + " records."); - } - - private boolean contains(GroupType group, String[] path, int index) { - if (index == path.length) { - return false; - } - if (group.containsField(path[index])) { - Type type = group.getType(path[index]); - if (type.isPrimitive()) { - return index + 1 == path.length; - } else { - return contains(type.asGroupType(), path, index + 1); - } - } - return false; - } - - public boolean nextKeyValue() throws IOException, InterruptedException { - if (current < total) { - try { - checkRead(); - currentValue = recordReader.read(); - if (DEBUG) LOG.debug("read value: " + currentValue); - current ++; - } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); - } - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java deleted file mode 100644 index 7410d2b..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static java.lang.Math.max; -import static java.lang.Math.min; -import static java.lang.String.format; -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; -import static parquet.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.Map; - -import parquet.Log; -import parquet.column.ParquetProperties.WriterVersion; -import parquet.column.impl.ColumnWriteStoreImpl; -import parquet.hadoop.api.WriteSupport; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.MessageType; - -class InternalParquetRecordWriter<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - - private final ParquetFileWriter w; - private final WriteSupport<T> writeSupport; - private final MessageType schema; - private final Map<String, String> extraMetaData; - private final int blockSize; - private final int pageSize; - private final BytesCompressor compressor; - private final int dictionaryPageSize; - private final boolean enableDictionary; - private final boolean validating; - private final WriterVersion writerVersion; - - private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; - - private ColumnWriteStoreImpl store; - private ColumnChunkPageWriteStore pageStore; - - /** - * @param w the file to write to - * @param writeSupport the class to convert incoming records - * @param schema the schema of the records - * @param extraMetaData extra meta data to write in the footer of the file - * @param blockSize the size of a block in the file (this will be approximate) - * @param codec the codec used to compress - */ - public InternalParquetRecordWriter( - ParquetFileWriter w, - WriteSupport<T> writeSupport, - MessageType schema, - Map<String, String> extraMetaData, - int blockSize, - int pageSize, - BytesCompressor compressor, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - WriterVersion writerVersion) { - this.w = w; - this.writeSupport = checkNotNull(writeSupport, "writeSupport"); - this.schema = schema; - this.extraMetaData = extraMetaData; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.compressor = compressor; - this.dictionaryPageSize = dictionaryPageSize; - this.enableDictionary = enableDictionary; - this.validating = validating; - this.writerVersion = writerVersion; - initStore(); - } - - private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); - MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); - } - - public void close() throws IOException, InterruptedException { - flushStore(); - w.end(extraMetaData); - } - - public void write(T value) throws IOException, InterruptedException { - writeSupport.write(value); - ++ recordCount; - checkBlockSizeReached(); - } - - private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. - long memSize = store.memSize(); - if (memSize > blockSize) { - LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); - flushStore(); - initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); - } else { - float recordSize = (float) memSize / recordCount; - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); - if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); - } - } - } - - public long getEstimatedWrittenSize() throws IOException { - return w.getPos() + store.memSize(); - } - - private void flushStore() - throws IOException { - LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); - if (store.allocatedSize() > 3 * blockSize) { - LOG.warn("Too much memory used: " + store.memUsageString()); - } - w.startBlock(recordCount); - store.flush(); - pageStore.flushToFileWriter(w); - recordCount = 0; - w.endBlock(); - store = null; - pageStore = null; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java deleted file mode 100644 index 73ce7c2..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java +++ /dev/null @@ -1,504 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static parquet.Log.DEBUG; -import static parquet.format.Util.writeFileMetaData; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import parquet.Log; -import parquet.Version; -import parquet.bytes.BytesInput; -import parquet.bytes.BytesUtils; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DictionaryPage; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ColumnPath; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.hadoop.metadata.FileMetaData; -import parquet.hadoop.metadata.GlobalMetaData; -import parquet.hadoop.metadata.ParquetMetadata; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -/** - * Internal implementation of the Parquet file writer as a block container - * - * @author Julien Le Dem - * - */ -public class ParquetFileWriter { - private static final Log LOG = Log.getLog(ParquetFileWriter.class); - - public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); - public static final int CURRENT_VERSION = 1; - - private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); - - private final MessageType schema; - private final FSDataOutputStream out; - private BlockMetaData currentBlock; - private ColumnChunkMetaData currentColumn; - private long currentRecordCount; - private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - private long uncompressedLength; - private long compressedLength; - private Set<parquet.column.Encoding> currentEncodings; - - private CompressionCodecName currentChunkCodec; - private ColumnPath currentChunkPath; - private PrimitiveTypeName currentChunkType; - private long currentChunkFirstDataPage; - private long currentChunkDictionaryPageOffset; - private long currentChunkValueCount; - - private Statistics currentStatistics; - - /** - * Captures the order in which methods should be called - * - * @author Julien Le Dem - * - */ - private enum STATE { - NOT_STARTED { - STATE start() { - return STARTED; - } - }, - STARTED { - STATE startBlock() { - return BLOCK; - } - STATE end() { - return ENDED; - } - }, - BLOCK { - STATE startColumn() { - return COLUMN; - } - STATE endBlock() { - return STARTED; - } - }, - COLUMN { - STATE endColumn() { - return BLOCK; - }; - STATE write() { - return this; - } - }, - ENDED; - - STATE start() throws IOException { return error(); } - STATE startBlock() throws IOException { return error(); } - STATE startColumn() throws IOException { return error(); } - STATE write() throws IOException { return error(); } - STATE endColumn() throws IOException { return error(); } - STATE endBlock() throws IOException { return error(); } - STATE end() throws IOException { return error(); } - - private final STATE error() throws IOException { - throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); - } - } - - private STATE state = STATE.NOT_STARTED; - - /** - * - * @param schema the schema of the data - * @param out the file to write to - * @param codec the codec to use to compress blocks - * @throws IOException if the file can not be created - */ - public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { - super(); - this.schema = schema; - FileSystem fs = file.getFileSystem(configuration); - this.out = fs.create(file, false); - } - - /** - * start the file - * @throws IOException - */ - public void start() throws IOException { - state = state.start(); - if (DEBUG) LOG.debug(out.getPos() + ": start"); - out.write(MAGIC); - } - - /** - * start a block - * @param recordCount the record count in this block - * @throws IOException - */ - public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": start block"); -// out.write(MAGIC); // TODO: add a magic delimiter - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; - } - - /** - * start a column inside a block - * @param descriptor the column descriptor - * @param valueCount the value count in this column - * @param statistics the statistics in this column - * @param compressionCodecName - * @throws IOException - */ - public void startColumn(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); - currentEncodings = new HashSet<parquet.column.Encoding>(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = out.getPos(); - compressedLength = 0; - uncompressedLength = 0; - // need to know what type of stats to initialize to - // better way to do this? - currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); - } - - /** - * writes a dictionary page page - * @param dictionaryPage the dictionary page - */ - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out); - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); - currentEncodings.add(dictionaryPage.getEncoding()); - } - - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentStatistics.mergeStatistics(statistics); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a number of pages at once - * @param bytes bytes to be written including page headers - * @param uncompressedTotalPageSize total uncompressed size (without page headers) - * @param compressedTotalPageSize total compressed size (without page headers) - * @throws IOException - */ - void writeDataPages(BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - List<parquet.column.Encoding> encodings) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); - bytes.writeAllTo(out); - currentEncodings.addAll(encodings); - currentStatistics = totalStats; - } - - /** - * end a column (once all rep, def and data have been written) - * @throws IOException - */ - public void endColumn() throws IOException { - state = state.endColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": end column"); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength)); - if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); - currentColumn = null; - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - } - - /** - * ends a block once all column chunks have been written - * @throws IOException - */ - public void endBlock() throws IOException { - state = state.endBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": end block"); - currentBlock.setRowCount(currentRecordCount); - blocks.add(currentBlock); - currentBlock = null; - } - - /** - * ends a file once all blocks have been written. - * closes the file. - * @param extraMetaData the extra meta data to write in the footer - * @throws IOException - */ - public void end(Map<String, String> extraMetaData) throws IOException { - state = state.end(); - if (DEBUG) LOG.debug(out.getPos() + ": end"); - ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out); - out.close(); - } - - private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { - long footerIndex = out.getPos(); - parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); - writeFileMetaData(parquetMetadata, out); - if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); - out.write(MAGIC); - } - - /** - * writes a _metadata file - * @param configuration the configuration to use to get the FileSystem - * @param outputPath the directory to write the _metadata file to - * @param footers the list of footers to merge - * @throws IOException - */ - public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { - Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE); - FileSystem fs = outputPath.getFileSystem(configuration); - outputPath = outputPath.makeQualified(fs); - FSDataOutputStream metadata = fs.create(metaDataPath); - metadata.write(MAGIC); - ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); - serializeFooter(metadataFooter, metadata); - metadata.close(); - } - - private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) { - String rootPath = root.toString(); - GlobalMetaData fileMetaData = null; - List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); - for (Footer footer : footers) { - String path = footer.getFile().toString(); - if (!path.startsWith(rootPath)) { - throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root); - } - path = path.substring(rootPath.length()); - while (path.startsWith("/")) { - path = path.substring(1); - } - fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData); - for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) { - block.setPath(path); - blocks.add(block); - } - } - return new ParquetMetadata(fileMetaData.merge(), blocks); - } - - /** - * @return the current position in the underlying file - * @throws IOException - */ - public long getPos() throws IOException { - return out.getPos(); - } - - /** - * Will merge the metadata of all the footers together - * @param footers the list files footers to merge - * @return the global meta data for all the footers - */ - static GlobalMetaData getGlobalMetaData(List<Footer> footers) { - GlobalMetaData fileMetaData = null; - for (Footer footer : footers) { - ParquetMetadata currentMetadata = footer.getParquetMetadata(); - fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData); - } - return fileMetaData; - } - - /** - * Will return the result of merging toMerge into mergedMetadata - * @param toMerge the metadata toMerge - * @param mergedMetadata the reference metadata to merge into - * @return the result of the merge - */ - static GlobalMetaData mergeInto( - FileMetaData toMerge, - GlobalMetaData mergedMetadata) { - MessageType schema = null; - Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>(); - Set<String> createdBy = new HashSet<String>(); - if (mergedMetadata != null) { - schema = mergedMetadata.getSchema(); - newKeyValues.putAll(mergedMetadata.getKeyValueMetaData()); - createdBy.addAll(mergedMetadata.getCreatedBy()); - } - if ((schema == null && toMerge.getSchema() != null) - || (schema != null && !schema.equals(toMerge.getSchema()))) { - schema = mergeInto(toMerge.getSchema(), schema); - } - for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) { - Set<String> values = newKeyValues.get(entry.getKey()); - if (values == null) { - values = new HashSet<String>(); - newKeyValues.put(entry.getKey(), values); - } - values.add(entry.getValue()); - } - createdBy.add(toMerge.getCreatedBy()); - return new GlobalMetaData( - schema, - newKeyValues, - createdBy); - } - - /** - * will return the result of merging toMerge into mergedSchema - * @param toMerge the schema to merge into mergedSchema - * @param mergedSchema the schema to append the fields to - * @return the resulting schema - */ - static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) { - if (mergedSchema == null) { - return toMerge; - } - return mergedSchema.union(toMerge); - } - -}
