http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java new file mode 100644 index 0000000..1448885 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class ByteBufLineReader implements Closeable { + private static int DEFAULT_BUFFER = 64 * 1024; + + private int bufferSize; + private long readBytes; + private ByteBuf buffer; + private final ByteBufInputChannel channel; + private final AtomicInteger tempReadBytes = new AtomicInteger(); + private final LineSplitProcessor processor = new LineSplitProcessor(); + + public ByteBufLineReader(ByteBufInputChannel channel) { + this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); + } + + public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { + this.readBytes = 0; + this.channel = channel; + this.buffer = buf; + this.bufferSize = buf.capacity(); + } + + public long readBytes() { + return readBytes - buffer.readableBytes(); + } + + public long available() throws IOException { + return channel.available() + buffer.readableBytes(); + } + + @Override + public void close() throws IOException { + if (this.buffer.refCnt() > 0) { + this.buffer.release(); + } + this.channel.close(); + } + + public String readLine() throws IOException { + ByteBuf buf = readLineBuf(tempReadBytes); + if (buf != null) { + return buf.toString(CharsetUtil.UTF_8); + } + return null; + } + + private void fillBuffer() throws IOException { + + int tailBytes = 0; + if (this.readBytes > 0) { + this.buffer.markReaderIndex(); + this.buffer.discardSomeReadBytes(); // compact the buffer + tailBytes = this.buffer.writerIndex(); + if (!this.buffer.isWritable()) { + // a line bytes is large than the buffer + BufferPool.ensureWritable(buffer, bufferSize); + this.bufferSize = buffer.capacity(); + } + } + + boolean release = true; + try { + int readBytes = tailBytes; + for (; ; ) { + int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes); + if (localReadBytes < 0) { + break; + } + readBytes += localReadBytes; + if (readBytes == bufferSize) { + break; + } + } + this.readBytes += (readBytes - tailBytes); + release = false; + this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) + } finally { + if (release) { + buffer.release(); + } + } + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { + int startIndex = buffer.readerIndex(); + int readBytes; + int readable; + int newlineLength; //length of terminating newline + + loop: + while (true) { + readable = buffer.readableBytes(); + if (readable <= 0) { + buffer.readerIndex(startIndex); + fillBuffer(); //compact and fill buffer + if (!buffer.isReadable()) { + return null; + } else { + startIndex = 0; // reset the line start position + } + readable = buffer.readableBytes(); + } + + int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); + if (endIndex < 0) { + buffer.readerIndex(buffer.writerIndex()); + } else { + buffer.readerIndex(endIndex + 1); + readBytes = buffer.readerIndex() - startIndex; + if (processor.isPrevCharCR() && buffer.isReadable() + && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + newlineLength = 2; + } else { + newlineLength = 1; + } + break loop; + } + } + reads.set(readBytes); + return buffer.slice(startIndex, readBytes - newlineLength); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java new file mode 100644 index 0000000..10d86bd --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.fragment.FileFragment; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +public class DelimitedLineReader implements Closeable { + private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); + private final static int DEFAULT_PAGE_SIZE = 128 * 1024; + + private FileSystem fs; + private FSDataInputStream fis; + private InputStream is; //decompressd stream + private CompressionCodecFactory factory; + private CompressionCodec codec; + private Decompressor decompressor; + + private long startOffset, end, pos; + private boolean eof = true; + private ByteBufLineReader lineReader; + private AtomicInteger tempReadBytes = new AtomicInteger(); + private FileFragment fragment; + private Configuration conf; + + public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { + this.fragment = fragment; + this.conf = conf; + this.factory = new CompressionCodecFactory(conf); + this.codec = factory.getCodec(fragment.getPath()); + if (this.codec instanceof SplittableCompressionCodec) { + throw new NotImplementedException(); // bzip2 does not support multi-thread model + } + } + + public void init() throws IOException { + if (fs == null) { + fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); + } + if (fis == null) fis = fs.open(fragment.getPath()); + pos = startOffset = fragment.getStartKey(); + end = startOffset + fragment.getLength(); + + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + is = new DataInputStream(codec.createInputStream(fis, decompressor)); + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); + } else { + fis.seek(startOffset); + is = fis; + + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, + BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); + } + eof = false; + } + + public long getCompressedPosition() throws IOException { + long retVal; + if (isCompressed()) { + retVal = fis.getPos(); + } else { + retVal = pos; + } + return retVal; + } + + public long getUnCompressedPosition() throws IOException { + return pos; + } + + public long getReadBytes() { + return pos - startOffset; + } + + public boolean isReadable() { + return !eof; + } + + public ByteBuf readLine() throws IOException { + if (eof) { + return null; + } + + ByteBuf buf = lineReader.readLineBuf(tempReadBytes); + if (buf == null) { + eof = true; + } else { + pos += tempReadBytes.get(); + } + + if (!isCompressed() && getCompressedPosition() > end) { + eof = true; + } + return buf; + } + + public boolean isCompressed() { + return codec != null; + } + + @Override + public void close() throws IOException { + try { + IOUtils.cleanup(LOG, lineReader, is, fis); + fs = null; + is = null; + fis = null; + lineReader = null; + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java new file mode 100644 index 0000000..a337509 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; + +public class DelimitedTextFile { + + public static final byte LF = '\n'; + public static int EOF = -1; + + private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + + public static class DelimitedTextFileAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; + private final int columnNum; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; + private char delimiter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + + private NonSyncByteArrayOutputStream os; + private FieldSerializerDeserializer serde; + + public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) + throws IOException { + super(conf, taskAttemptId, schema, meta, path); + this.fs = path.getFileSystem(conf); + this.meta = meta; + this.schema = schema; + this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + this.columnNum = schema.size(); + + String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if (compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + serde = new TextFieldSerializerDeserializer(); + + if (os == null) { + os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + + + @Override + public void addTuple(Tuple tuple) throws IOException { + Datum datum; + int rowBytes = 0; + + for (int i = 0; i < columnNum; i++) { + datum = tuple.get(i); + rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars); + + if (columnNum - 1 > i) { + os.write((byte) delimiter); + rowBytes += 1; + } + } + os.write(LF); + rowBytes += 1; + + pos += rowBytes; + bufferedBytes += rowBytes; + if (bufferedBytes > BUFFER_SIZE) { + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if (os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { + if(outputStream != null){ + flush(); + } + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if (deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class DelimitedTextFileScanner extends FileScanner { + + private boolean splittable = false; + private final long startOffset; + private final long endOffset; + + private int recordCount = 0; + private int[] targetColumnIndexes; + + private ByteBuf nullChars; + private FieldSerializerDeserializer serde; + private DelimitedLineReader reader; + private FieldSplitProcessor processor; + + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, + final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + reader = new DelimitedLineReader(conf, this.fragment); + if (!reader.isCompressed()) { + splittable = true; + } + + startOffset = this.fragment.getStartKey(); + endOffset = startOffset + fragment.getLength(); + + //Delimiter + String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0)); + } + + @Override + public void init() throws IOException { + if (nullChars != null) { + nullChars.release(); + } + + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + byte[] bytes; + if (StringUtils.isEmpty(nullCharacters)) { + bytes = NullDatum.get().asTextBytes(); + } else { + bytes = nullCharacters.getBytes(); + } + + nullChars = BufferPool.directBuffer(bytes.length, bytes.length); + nullChars.writeBytes(bytes); + + if (reader != null) { + reader.close(); + } + reader = new DelimitedLineReader(conf, fragment); + reader.init(); + recordCount = 0; + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + + serde = new TextFieldSerializerDeserializer(); + + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); + } + + if (startOffset > 0) { + reader.readLine(); // skip first line; + } + } + + public ByteBuf readLine() throws IOException { + ByteBuf buf = reader.readLine(); + if (buf == null) { + return null; + } else { + recordCount++; + } + + return buf; + } + + @Override + public float getProgress() { + try { + if (!reader.isReadable()) { + return 1.0f; + } + long filePos = reader.getCompressedPosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(endOffset - filePos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { + try { + if (!reader.isReadable()) return null; + + ByteBuf buf = readLine(); + if (buf == null) return null; + + if (targets.length == 0) { + return EmptyTuple.get(); + } + + VTuple tuple = new VTuple(schema.size()); + fillTuple(schema, tuple, buf, targetColumnIndexes); + return tuple; + } catch (Throwable t) { + LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t); + throw new IOException(t); + } + } + + private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException { + int[] projection = target; + if (lineBuf == null || target == null || target.length == 0) { + return; + } + + final int rowLength = lineBuf.readableBytes(); + int start = 0, fieldLength = 0, end = 0; + + //Projection + int currentTarget = 0; + int currentIndex = 0; + + while (end != -1) { + end = lineBuf.forEachByte(start, rowLength - start, processor); + + if (end < 0) { + fieldLength = rowLength - start; + } else { + fieldLength = end - start; + } + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + lineBuf.setIndex(start, start + fieldLength); + Datum datum = serde.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); + dst.put(currentIndex, datum); + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + start = end + 1; + currentIndex++; + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + try { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); + } + } finally { + IOUtils.cleanup(LOG, reader); + reader = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public TableStats getInputStats() { + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java new file mode 100644 index 0000000..a5ac142 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class FieldSplitProcessor implements ByteBufProcessor { + private char delimiter; //the ascii separate character + + public FieldSplitProcessor(char recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + return delimiter != value; + } + + public char getDelimiter() { + return delimiter; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java new file mode 100644 index 0000000..a130527 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class LineSplitProcessor implements ByteBufProcessor { + public static final byte CR = '\r'; + public static final byte LF = '\n'; + private boolean prevCharCR = false; //true of prev char was CR + + @Override + public boolean process(byte value) throws Exception { + switch (value) { + case LF: + return false; + case CR: + prevCharCR = true; + return false; + default: + prevCharCR = false; + return true; + } + } + + public boolean isPrevCharCR() { + return prevCharCR; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java new file mode 100644 index 0000000..9722959 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.CharsetDecoder; + +//Compatibility with Apache Hive +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); + + private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { + return !val.isReadable() || nullBytes.equals(val); + } + + private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { + return val.readableBytes() > 0 && nullBytes.equals(val); + } + + @Override + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException { + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullChars.length; + out.write(nullChars); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { + Datum datum; + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(buf, nullChars); + } else { + nullField = isNull(buf, nullChars); + } + + if (nullField) { + datum = NullDatum.get(); + } else { + switch (type) { + case BOOLEAN: + byte bool = buf.readByte(); + datum = DatumFactory.createBool(bool == 't' || bool == 'T'); + break; + case BIT: + datum = DatumFactory.createBit(Byte.parseByte( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); + break; + case CHAR: + datum = DatumFactory.createChar( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); + break; + case INT1: + case INT2: + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); + break; + case INT4: + datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); + break; + case INT8: + datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); + break; + case TEXT: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createText(bytes); + break; + } + case DATE: + datum = DatumFactory.createDate( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIME: + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIMESTAMP: + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case INTERVAL: + datum = DatumFactory.createInterval( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + protobufJsonFormat.merge(bytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + break; + } + case INET4: + datum = DatumFactory.createInet4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case BLOB: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); + break; + } + default: + datum = NullDatum.get(); + break; + } + } + return datum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java new file mode 100644 index 0000000..f76593e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; +import parquet.bytes.BytesInput; +import parquet.hadoop.BadConfigurationException; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +class CodecFactory { + + public class BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + public BytesDecompressor(CompressionCodec codec) { + this.codec = codec; + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + decompressor.reset(); + InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); + decompressed = BytesInput.from(is, uncompressedSize); + } else { + decompressed = bytes; + } + return decompressed; + } + + private void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + /** + * Encapsulates the logic around hadoop compression + * + * @author Julien Le Dem + * + */ + public static class BytesCompressor { + + private final CompressionCodec codec; + private final Compressor compressor; + private final ByteArrayOutputStream compressedOutBuffer; + private final CompressionCodecName codecName; + + public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { + this.codecName = codecName; + this.codec = codec; + if (codec != null) { + this.compressor = CodecPool.getCompressor(codec); + this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); + } else { + this.compressor = null; + this.compressedOutBuffer = null; + } + } + + public BytesInput compress(BytesInput bytes) throws IOException { + final BytesInput compressedBytes; + if (codec == null) { + compressedBytes = bytes; + } else { + compressedOutBuffer.reset(); + if (compressor != null) { + // null compressor for non-native gzip + compressor.reset(); + } + CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); + bytes.writeAllTo(cos); + cos.finish(); + cos.close(); + compressedBytes = BytesInput.from(compressedOutBuffer); + } + return compressedBytes; + } + + private void release() { + if (compressor != null) { + CodecPool.returnCompressor(compressor); + } + } + + public CompressionCodecName getCodecName() { + return codecName; + } + + } + + private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); + private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); + private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); + private final Configuration configuration; + + public CodecFactory(Configuration configuration) { + this.configuration = configuration; + } + + /** + * + * @param codecName the requested codec + * @return the corresponding hadoop codec. null if UNCOMPRESSED + */ + private CompressionCodec getCodec(CompressionCodecName codecName) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + CompressionCodec codec = codecByName.get(codecClassName); + if (codec != null) { + return codec; + } + + try { + Class<?> codecClass = Class.forName(codecClassName); + codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); + codecByName.put(codecClassName, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + + public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { + BytesCompressor comp = compressors.get(codecName); + if (comp == null) { + CompressionCodec codec = getCodec(codecName); + comp = new BytesCompressor(codecName, codec, pageSize); + compressors.put(codecName, comp); + } + return comp; + } + + public BytesDecompressor getDecompressor(CompressionCodecName codecName) { + BytesDecompressor decomp = decompressors.get(codecName); + if (decomp == null) { + CompressionCodec codec = getCodec(codecName); + decomp = new BytesDecompressor(codec); + decompressors.put(codecName, decomp); + } + return decomp; + } + + public void release() { + for (BytesCompressor compressor : compressors.values()) { + compressor.release(); + } + compressors.clear(); + for (BytesDecompressor decompressor : decompressors.values()) { + decompressor.release(); + } + decompressors.clear(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java new file mode 100644 index 0000000..0dedd9b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import parquet.Log; +import parquet.bytes.BytesInput; +import parquet.bytes.CapacityByteArrayOutputStream; +import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; +import parquet.column.page.DictionaryPage; +import parquet.column.page.PageWriteStore; +import parquet.column.page.PageWriter; +import parquet.column.statistics.BooleanStatistics; +import parquet.column.statistics.Statistics; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.io.ParquetEncodingException; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.*; + +import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; +import static parquet.Log.INFO; + +class ColumnChunkPageWriteStore implements PageWriteStore { + private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); + + private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + + private static final class ColumnChunkPageWriter implements PageWriter { + + private final ColumnDescriptor path; + private final BytesCompressor compressor; + + private final CapacityByteArrayOutputStream buf; + private DictionaryPage dictionaryPage; + + private long uncompressedLength; + private long compressedLength; + private long totalValueCount; + private int pageCount; + + private Set<Encoding> encodings = new HashSet<Encoding>(); + + private Statistics totalStatistics; + + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { + this.path = path; + this.compressor = compressor; + this.buf = new CapacityByteArrayOutputStream(initialSize); + this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); + } + + @Deprecated + @Override + public void writePage(BytesInput bytes, + int valueCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + long uncompressedSize = bytes.size(); + BytesInput compressedBytes = compressor.compress(bytes); + long compressedSize = compressedBytes.size(); + BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object + parquetMetadataConverter.writeDataPageHeader( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + buf); + this.uncompressedLength += uncompressedSize; + this.compressedLength += compressedSize; + this.totalValueCount += valueCount; + this.pageCount += 1; + compressedBytes.writeAllTo(buf); + encodings.add(rlEncoding); + encodings.add(dlEncoding); + encodings.add(valuesEncoding); + } + + @Override + public void writePage(BytesInput bytes, + int valueCount, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + long uncompressedSize = bytes.size(); + BytesInput compressedBytes = compressor.compress(bytes); + long compressedSize = compressedBytes.size(); + parquetMetadataConverter.writeDataPageHeader( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + buf); + this.uncompressedLength += uncompressedSize; + this.compressedLength += compressedSize; + this.totalValueCount += valueCount; + this.pageCount += 1; + this.totalStatistics.mergeStatistics(statistics); + compressedBytes.writeAllTo(buf); + encodings.add(rlEncoding); + encodings.add(dlEncoding); + encodings.add(valuesEncoding); + } + + @Override + public long getMemSize() { + return buf.size(); + } + + public void writeToFileWriter(ParquetFileWriter writer) throws IOException { + writer.startColumn(path, totalValueCount, compressor.getCodecName()); + if (dictionaryPage != null) { + writer.writeDictionaryPage(dictionaryPage); + encodings.add(dictionaryPage.getEncoding()); + } + writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); + writer.endColumn(); + if (INFO) { + LOG.info( + String.format( + "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", + buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) + + (dictionaryPage != null ? String.format( + ", dic { %,d entries, %,dB raw, %,dB comp}", + dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) + : "")); + } + encodings.clear(); + pageCount = 0; + } + + @Override + public long allocatedSize() { + return buf.getCapacity(); + } + + @Override + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + if (this.dictionaryPage != null) { + throw new ParquetEncodingException("Only one dictionary page is allowed"); + } + BytesInput dictionaryBytes = dictionaryPage.getBytes(); + int uncompressedSize = (int)dictionaryBytes.size(); + BytesInput compressedBytes = compressor.compress(dictionaryBytes); + this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); + } + + @Override + public String memUsageString(String prefix) { + return buf.memUsageString(prefix + " ColumnChunkPageWriter"); + } + } + + private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); + private final MessageType schema; + private final BytesCompressor compressor; + private final int initialSize; + + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { + this.compressor = compressor; + this.schema = schema; + this.initialSize = initialSize; + } + + @Override + public PageWriter getPageWriter(ColumnDescriptor path) { + if (!writers.containsKey(path)) { + writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); + } + return writers.get(path); + } + + public void flushToFileWriter(ParquetFileWriter writer) throws IOException { + List<ColumnDescriptor> columns = schema.getColumns(); + for (ColumnDescriptor columnDescriptor : columns) { + ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); + pageWriter.writeToFileWriter(writer); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java new file mode 100644 index 0000000..6bbd7b5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.Log; +import parquet.column.ColumnDescriptor; +import parquet.column.page.PageReadStore; +import parquet.filter.UnboundRecordFilter; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.api.ReadSupport; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.util.counters.BenchmarkCounter; +import parquet.io.ColumnIOFactory; +import parquet.io.MessageColumnIO; +import parquet.io.ParquetDecodingException; +import parquet.io.api.RecordMaterializer; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static parquet.Log.DEBUG; + +class InternalParquetRecordReader<T> { + private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); + + private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); + + private MessageType requestedSchema; + private MessageType fileSchema; + private int columnCount; + private final ReadSupport<T> readSupport; + + private RecordMaterializer<T> recordConverter; + + private T currentValue; + private long total; + private int current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private parquet.io.RecordReader<T> recordReader; + private UnboundRecordFilter recordFilter; + + private long totalTimeSpentReadingBytes; + private long totalTimeSpentProcessingRecords; + private long startedAssemblingCurrentBlockAt; + + private long totalCountLoadedSoFar = 0; + + private Path file; + + /** + * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. + */ + public InternalParquetRecordReader(ReadSupport<T> readSupport) { + this(readSupport, null); + } + + /** + * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. + * @param filter Optional filter for only returning matching records. + */ + public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter + filter) { + this.readSupport = readSupport; + this.recordFilter = filter; + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + if (current != 0) { + long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; + totalTimeSpentProcessingRecords += timeAssembling; + LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); + long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; + long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; + long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; + LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); + } + + LOG.info("at row " + current + ". reading next block"); + long t0 = System.currentTimeMillis(); + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); + } + long timeSpentReading = System.currentTimeMillis() - t0; + totalTimeSpentReadingBytes += timeSpentReading; + BenchmarkCounter.incrementTime(timeSpentReading); + LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); + if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); + MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); + recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); + startedAssemblingCurrentBlockAt = System.currentTimeMillis(); + totalCountLoadedSoFar += pages.getRowCount(); + ++ currentBlock; + } + } + + public void close() throws IOException { + reader.close(); + } + + public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + public T getCurrentValue() throws IOException, + InterruptedException { + return currentValue; + } + + public float getProgress() throws IOException, InterruptedException { + return (float) current / total; + } + + public void initialize(MessageType requestedSchema, MessageType fileSchema, + Map<String, String> extraMetadata, Map<String, String> readSupportMetadata, + Path file, List<BlockMetaData> blocks, Configuration configuration) + throws IOException { + this.requestedSchema = requestedSchema; + this.fileSchema = fileSchema; + this.file = file; + this.columnCount = this.requestedSchema.getPaths().size(); + this.recordConverter = readSupport.prepareForRead( + configuration, extraMetadata, fileSchema, + new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); + + List<ColumnDescriptor> columns = requestedSchema.getColumns(); + reader = new ParquetFileReader(configuration, file, blocks, columns); + for (BlockMetaData block : blocks) { + total += block.getRowCount(); + } + LOG.info("RecordReader initialized will read a total of " + total + " records."); + } + + private boolean contains(GroupType group, String[] path, int index) { + if (index == path.length) { + return false; + } + if (group.containsField(path[index])) { + Type type = group.getType(path[index]); + if (type.isPrimitive()) { + return index + 1 == path.length; + } else { + return contains(type.asGroupType(), path, index + 1); + } + } + return false; + } + + public boolean nextKeyValue() throws IOException, InterruptedException { + if (current < total) { + try { + checkRead(); + currentValue = recordReader.read(); + if (DEBUG) LOG.debug("read value: " + currentValue); + current ++; + } catch (RuntimeException e) { + throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); + } + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java new file mode 100644 index 0000000..532d9a2 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import parquet.Log; +import parquet.column.ParquetProperties.WriterVersion; +import parquet.column.impl.ColumnWriteStoreImpl; +import parquet.hadoop.api.WriteSupport; +import parquet.io.ColumnIOFactory; +import parquet.io.MessageColumnIO; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.Map; + +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.String.format; +import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; +import static parquet.Log.DEBUG; +import static parquet.Preconditions.checkNotNull; + +class InternalParquetRecordWriter<T> { + private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); + + private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; + private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; + private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; + + private final ParquetFileWriter w; + private final WriteSupport<T> writeSupport; + private final MessageType schema; + private final Map<String, String> extraMetaData; + private final int blockSize; + private final int pageSize; + private final BytesCompressor compressor; + private final int dictionaryPageSize; + private final boolean enableDictionary; + private final boolean validating; + private final WriterVersion writerVersion; + + private long recordCount = 0; + private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; + + private ColumnWriteStoreImpl store; + private ColumnChunkPageWriteStore pageStore; + + /** + * @param w the file to write to + * @param writeSupport the class to convert incoming records + * @param schema the schema of the records + * @param extraMetaData extra meta data to write in the footer of the file + * @param blockSize the size of a block in the file (this will be approximate) + * @param codec the codec used to compress + */ + public InternalParquetRecordWriter( + ParquetFileWriter w, + WriteSupport<T> writeSupport, + MessageType schema, + Map<String, String> extraMetaData, + int blockSize, + int pageSize, + BytesCompressor compressor, + int dictionaryPageSize, + boolean enableDictionary, + boolean validating, + WriterVersion writerVersion) { + this.w = w; + this.writeSupport = checkNotNull(writeSupport, "writeSupport"); + this.schema = schema; + this.extraMetaData = extraMetaData; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.compressor = compressor; + this.dictionaryPageSize = dictionaryPageSize; + this.enableDictionary = enableDictionary; + this.validating = validating; + this.writerVersion = writerVersion; + initStore(); + } + + private void initStore() { + // we don't want this number to be too small + // ideally we divide the block equally across the columns + // it is unlikely all columns are going to be the same size. + int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); + pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); + // we don't want this number to be too small either + // ideally, slightly bigger than the page size, but not bigger than the block buffer + int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); + store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); + MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); + writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); + } + + public void close() throws IOException, InterruptedException { + flushStore(); + w.end(extraMetaData); + } + + public void write(T value) throws IOException, InterruptedException { + writeSupport.write(value); + ++ recordCount; + checkBlockSizeReached(); + } + + private void checkBlockSizeReached() throws IOException { + if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. + long memSize = store.memSize(); + if (memSize > blockSize) { + LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); + flushStore(); + initStore(); + recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); + } else { + float recordSize = (float) memSize / recordCount; + recordCountForNextMemCheck = min( + max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway + recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead + ); + if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); + } + } + } + + public long getEstimatedWrittenSize() throws IOException { + return w.getPos() + store.memSize(); + } + + private void flushStore() + throws IOException { + LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); + if (store.allocatedSize() > 3 * blockSize) { + LOG.warn("Too much memory used: " + store.memUsageString()); + } + w.startBlock(recordCount); + store.flush(); + pageStore.flushToFileWriter(w); + recordCount = 0; + w.endBlock(); + store = null; + pageStore = null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java new file mode 100644 index 0000000..f1c5368 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.Log; +import parquet.Version; +import parquet.bytes.BytesInput; +import parquet.bytes.BytesUtils; +import parquet.column.ColumnDescriptor; +import parquet.column.page.DictionaryPage; +import parquet.column.statistics.Statistics; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.hadoop.Footer; +import parquet.hadoop.metadata.*; +import parquet.io.ParquetEncodingException; +import parquet.schema.MessageType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.*; +import java.util.Map.Entry; + +import static parquet.Log.DEBUG; +import static parquet.format.Util.writeFileMetaData; + +/** + * Internal implementation of the Parquet file writer as a block container + * + * @author Julien Le Dem + * + */ +public class ParquetFileWriter { + private static final Log LOG = Log.getLog(ParquetFileWriter.class); + + public static final String PARQUET_METADATA_FILE = "_metadata"; + public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); + public static final int CURRENT_VERSION = 1; + + private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); + + private final MessageType schema; + private final FSDataOutputStream out; + private BlockMetaData currentBlock; + private ColumnChunkMetaData currentColumn; + private long currentRecordCount; + private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + private long uncompressedLength; + private long compressedLength; + private Set<parquet.column.Encoding> currentEncodings; + + private CompressionCodecName currentChunkCodec; + private ColumnPath currentChunkPath; + private PrimitiveTypeName currentChunkType; + private long currentChunkFirstDataPage; + private long currentChunkDictionaryPageOffset; + private long currentChunkValueCount; + + private Statistics currentStatistics; + + /** + * Captures the order in which methods should be called + * + * @author Julien Le Dem + * + */ + private enum STATE { + NOT_STARTED { + STATE start() { + return STARTED; + } + }, + STARTED { + STATE startBlock() { + return BLOCK; + } + STATE end() { + return ENDED; + } + }, + BLOCK { + STATE startColumn() { + return COLUMN; + } + STATE endBlock() { + return STARTED; + } + }, + COLUMN { + STATE endColumn() { + return BLOCK; + }; + STATE write() { + return this; + } + }, + ENDED; + + STATE start() throws IOException { return error(); } + STATE startBlock() throws IOException { return error(); } + STATE startColumn() throws IOException { return error(); } + STATE write() throws IOException { return error(); } + STATE endColumn() throws IOException { return error(); } + STATE endBlock() throws IOException { return error(); } + STATE end() throws IOException { return error(); } + + private final STATE error() throws IOException { + throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); + } + } + + private STATE state = STATE.NOT_STARTED; + + /** + * + * @param schema the schema of the data + * @param out the file to write to + * @param codec the codec to use to compress blocks + * @throws java.io.IOException if the file can not be created + */ + public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { + super(); + this.schema = schema; + FileSystem fs = file.getFileSystem(configuration); + this.out = fs.create(file, false); + } + + /** + * start the file + * @throws java.io.IOException + */ + public void start() throws IOException { + state = state.start(); + if (DEBUG) LOG.debug(out.getPos() + ": start"); + out.write(MAGIC); + } + + /** + * start a block + * @param recordCount the record count in this block + * @throws java.io.IOException + */ + public void startBlock(long recordCount) throws IOException { + state = state.startBlock(); + if (DEBUG) LOG.debug(out.getPos() + ": start block"); +// out.write(MAGIC); // TODO: add a magic delimiter + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; + } + + /** + * start a column inside a block + * @param descriptor the column descriptor + * @param valueCount the value count in this column + * @param statistics the statistics in this column + * @param compressionCodecName + * @throws java.io.IOException + */ + public void startColumn(ColumnDescriptor descriptor, + long valueCount, + CompressionCodecName compressionCodecName) throws IOException { + state = state.startColumn(); + if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); + currentEncodings = new HashSet<parquet.column.Encoding>(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = out.getPos(); + compressedLength = 0; + uncompressedLength = 0; + // need to know what type of stats to initialize to + // better way to do this? + currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); + } + + /** + * writes a dictionary page page + * @param dictionaryPage the dictionary page + */ + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + state = state.write(); + if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out); + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); + dictionaryPage.getBytes().writeAllTo(out); + currentEncodings.add(dictionaryPage.getEncoding()); + } + + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + */ + @Deprecated + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + parquet.column.Encoding rlEncoding, + parquet.column.Encoding dlEncoding, + parquet.column.Encoding valuesEncoding) throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); + int compressedPageSize = (int)bytes.size(); + metadataConverter.writeDataPageHeader( + uncompressedPageSize, compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); + bytes.writeAllTo(out); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * writes a single page + * @param valueCount count of values + * @param uncompressedPageSize the size of the data once uncompressed + * @param bytes the compressed data for the page without header + * @param rlEncoding encoding of the repetition level + * @param dlEncoding encoding of the definition level + * @param valuesEncoding encoding of values + */ + public void writeDataPage( + int valueCount, int uncompressedPageSize, + BytesInput bytes, + Statistics statistics, + parquet.column.Encoding rlEncoding, + parquet.column.Encoding dlEncoding, + parquet.column.Encoding valuesEncoding) throws IOException { + state = state.write(); + long beforeHeader = out.getPos(); + if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); + int compressedPageSize = (int)bytes.size(); + metadataConverter.writeDataPageHeader( + uncompressedPageSize, compressedPageSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + out); + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); + bytes.writeAllTo(out); + currentStatistics.mergeStatistics(statistics); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + } + + /** + * writes a number of pages at once + * @param bytes bytes to be written including page headers + * @param uncompressedTotalPageSize total uncompressed size (without page headers) + * @param compressedTotalPageSize total compressed size (without page headers) + * @throws java.io.IOException + */ + void writeDataPages(BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + List<parquet.column.Encoding> encodings) throws IOException { + state = state.write(); + if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); + bytes.writeAllTo(out); + currentEncodings.addAll(encodings); + currentStatistics = totalStats; + } + + /** + * end a column (once all rep, def and data have been written) + * @throws java.io.IOException + */ + public void endColumn() throws IOException { + state = state.endColumn(); + if (DEBUG) LOG.debug(out.getPos() + ": end column"); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength)); + if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); + currentColumn = null; + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + } + + /** + * ends a block once all column chunks have been written + * @throws java.io.IOException + */ + public void endBlock() throws IOException { + state = state.endBlock(); + if (DEBUG) LOG.debug(out.getPos() + ": end block"); + currentBlock.setRowCount(currentRecordCount); + blocks.add(currentBlock); + currentBlock = null; + } + + /** + * ends a file once all blocks have been written. + * closes the file. + * @param extraMetaData the extra meta data to write in the footer + * @throws java.io.IOException + */ + public void end(Map<String, String> extraMetaData) throws IOException { + state = state.end(); + if (DEBUG) LOG.debug(out.getPos() + ": end"); + ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out); + out.close(); + } + + private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { + long footerIndex = out.getPos(); + parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); + writeFileMetaData(parquetMetadata, out); + if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); + out.write(MAGIC); + } + + /** + * writes a _metadata file + * @param configuration the configuration to use to get the FileSystem + * @param outputPath the directory to write the _metadata file to + * @param footers the list of footers to merge + * @throws java.io.IOException + */ + public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { + Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE); + FileSystem fs = outputPath.getFileSystem(configuration); + outputPath = outputPath.makeQualified(fs); + FSDataOutputStream metadata = fs.create(metaDataPath); + metadata.write(MAGIC); + ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); + serializeFooter(metadataFooter, metadata); + metadata.close(); + } + + private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) { + String rootPath = root.toString(); + GlobalMetaData fileMetaData = null; + List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + for (Footer footer : footers) { + String path = footer.getFile().toString(); + if (!path.startsWith(rootPath)) { + throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root); + } + path = path.substring(rootPath.length()); + while (path.startsWith("/")) { + path = path.substring(1); + } + fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData); + for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) { + block.setPath(path); + blocks.add(block); + } + } + return new ParquetMetadata(fileMetaData.merge(), blocks); + } + + /** + * @return the current position in the underlying file + * @throws java.io.IOException + */ + public long getPos() throws IOException { + return out.getPos(); + } + + /** + * Will merge the metadata of all the footers together + * @param footers the list files footers to merge + * @return the global meta data for all the footers + */ + static GlobalMetaData getGlobalMetaData(List<Footer> footers) { + GlobalMetaData fileMetaData = null; + for (Footer footer : footers) { + ParquetMetadata currentMetadata = footer.getParquetMetadata(); + fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData); + } + return fileMetaData; + } + + /** + * Will return the result of merging toMerge into mergedMetadata + * @param toMerge the metadata toMerge + * @param mergedMetadata the reference metadata to merge into + * @return the result of the merge + */ + static GlobalMetaData mergeInto( + FileMetaData toMerge, + GlobalMetaData mergedMetadata) { + MessageType schema = null; + Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>(); + Set<String> createdBy = new HashSet<String>(); + if (mergedMetadata != null) { + schema = mergedMetadata.getSchema(); + newKeyValues.putAll(mergedMetadata.getKeyValueMetaData()); + createdBy.addAll(mergedMetadata.getCreatedBy()); + } + if ((schema == null && toMerge.getSchema() != null) + || (schema != null && !schema.equals(toMerge.getSchema()))) { + schema = mergeInto(toMerge.getSchema(), schema); + } + for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) { + Set<String> values = newKeyValues.get(entry.getKey()); + if (values == null) { + values = new HashSet<String>(); + newKeyValues.put(entry.getKey(), values); + } + values.add(entry.getValue()); + } + createdBy.add(toMerge.getCreatedBy()); + return new GlobalMetaData( + schema, + newKeyValues, + createdBy); + } + + /** + * will return the result of merging toMerge into mergedSchema + * @param toMerge the schema to merge into mergedSchema + * @param mergedSchema the schema to append the fields to + * @return the resulting schema + */ + static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) { + if (mergedSchema == null) { + return toMerge; + } + return mergedSchema.union(toMerge); + } + +}
