http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java new file mode 100644 index 0000000..2f742c6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +public class ByteBufLineReader implements Closeable { + private static int DEFAULT_BUFFER = 64 * 1024; + + private int bufferSize; + private long readBytes; + private int startIndex; + private boolean eof = false; + private ByteBuf buffer; + private final ByteBufInputChannel channel; + private final AtomicInteger lineReadBytes = new AtomicInteger(); + private final LineSplitProcessor processor = new LineSplitProcessor(); + + public ByteBufLineReader(ByteBufInputChannel channel) { + this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); + } + + public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { + this.readBytes = 0; + this.channel = channel; + this.buffer = buf; + this.bufferSize = buf.capacity(); + } + + public long readBytes() { + return readBytes - buffer.readableBytes(); + } + + @Override + public void close() throws IOException { + if (this.buffer.refCnt() > 0) { + this.buffer.release(); + } + this.channel.close(); + } + + public String readLine() throws IOException { + ByteBuf buf = readLineBuf(lineReadBytes); + if (buf != null) { + return buf.toString(CharsetUtil.UTF_8); + } + return null; + } + + private void fillBuffer() throws IOException { + + int tailBytes = 0; + if (this.readBytes > 0) { + //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes) + this.buffer.markReaderIndex(); + this.buffer.discardReadBytes(); // compact the buffer + tailBytes = this.buffer.writerIndex(); + if (!this.buffer.isWritable()) { + // a line bytes is large than the buffer + BufferPool.ensureWritable(buffer, bufferSize * 2); + this.bufferSize = buffer.capacity(); + } + this.startIndex = 0; + } + + boolean release = true; + try { + int readBytes = tailBytes; + for (; ; ) { + int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes); + if (localReadBytes < 0) { + if (buffer.isWritable()) { + //if read bytes is less than the buffer capacity, there is no more bytes in the channel + eof = true; + } + break; + } + readBytes += localReadBytes; + if (readBytes == bufferSize) { + break; + } + } + this.readBytes += (readBytes - tailBytes); + release = false; + + this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) + } finally { + if (release) { + buffer.release(); + } + } + } + + /** + * Read a line terminated by one of CR, LF, or CRLF. + */ + public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { + int readBytes = 0; // newline + text line bytes + int newlineLength = 0; //length of terminating newline + int readable; + + this.startIndex = buffer.readerIndex(); + + loop: + while (true) { + readable = buffer.readableBytes(); + if (readable <= 0) { + buffer.readerIndex(this.startIndex); + fillBuffer(); //compact and fill buffer + + //if buffer.writerIndex() is zero, there is no bytes in buffer + if (!buffer.isReadable() && buffer.writerIndex() == 0) { + reads.set(0); + return null; + } else { + //skip first newLine + if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + if(eof && !buffer.isReadable()) { + reads.set(1); + return null; + } + + newlineLength++; + readBytes++; + startIndex = buffer.readerIndex(); + } + } + readable = buffer.readableBytes(); + } + + int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); + if (endIndex < 0) { + //does not appeared terminating newline + buffer.readerIndex(buffer.writerIndex()); // set to end buffer + if(eof){ + readBytes += (buffer.readerIndex() - startIndex); + break loop; + } + } else { + buffer.readerIndex(endIndex + 1); + readBytes += (buffer.readerIndex() - startIndex); //past newline + text line + + //appeared terminating CRLF + if (processor.isPrevCharCR() && buffer.isReadable() + && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + readBytes++; + newlineLength += 2; + } else { + newlineLength += 1; + } + break loop; + } + } + reads.set(readBytes); + return buffer.slice(startIndex, readBytes - newlineLength); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java new file mode 100644 index 0000000..1599f62 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class CSVLineDeserializer extends TextLineDeserializer { + private FieldSplitProcessor processor; + private FieldSerializerDeserializer fieldSerDer; + private ByteBuf nullChars; + + public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + super(schema, meta, targetColumnIndexes); + } + + @Override + public void init() { + this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); + + if (nullChars != null) { + nullChars.release(); + } + nullChars = TextLineSerDe.getNullChars(meta); + + fieldSerDer = new TextFieldSerializerDeserializer(meta); + } + + public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { + int[] projection = targetColumnIndexes; + if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { + return; + } + + final int rowLength = lineBuf.readableBytes(); + int start = 0, fieldLength = 0, end = 0; + + //Projection + int currentTarget = 0; + int currentIndex = 0; + + while (end != -1) { + end = lineBuf.forEachByte(start, rowLength - start, processor); + + if (end < 0) { + fieldLength = rowLength - start; + } else { + fieldLength = end - start; + } + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + lineBuf.setIndex(start, start + fieldLength); + Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); + output.put(currentIndex, datum); + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + start = end + 1; + currentIndex++; + } + } + + @Override + public void release() { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java new file mode 100644 index 0000000..2fe7f23 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.StorageConstants; + +public class CSVLineSerDe extends TextLineSerDe { + @Override + public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { + return new CSVLineDeserializer(schema, meta, targetColumnIndexes); + } + + @Override + public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { + return new CSVLineSerializer(schema, meta); + } + + public static char getFieldDelimiter(TableMeta meta) { + return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java new file mode 100644 index 0000000..53a0ef3 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +public class CSVLineSerializer extends TextLineSerializer { + private FieldSerializerDeserializer serde; + + private byte [] nullChars; + private char delimiter; + private int columnNum; + + public CSVLineSerializer(Schema schema, TableMeta meta) { + super(schema, meta); + } + + @Override + public void init() { + nullChars = TextLineSerDe.getNullCharsAsBytes(meta); + delimiter = CSVLineSerDe.getFieldDelimiter(meta); + columnNum = schema.size(); + + serde = new TextFieldSerializerDeserializer(meta); + } + + @Override + public int serialize(OutputStream out, Tuple input) throws IOException { + int writtenBytes = 0; + + for (int i = 0; i < columnNum; i++) { + Datum datum = input.get(i); + writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); + + if (columnNum - 1 > i) { + out.write((byte) delimiter); + writtenBytes += 1; + } + } + + return writtenBytes; + } + + @Override + public void release() { + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java new file mode 100644 index 0000000..1b433b5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.ByteBufInputChannel; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.fragment.FileFragment; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +public class DelimitedLineReader implements Closeable { + private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); + private final static int DEFAULT_PAGE_SIZE = 128 * 1024; + + private FileSystem fs; + private FSDataInputStream fis; + private InputStream is; //decompressd stream + private CompressionCodecFactory factory; + private CompressionCodec codec; + private Decompressor decompressor; + + private long startOffset, end, pos; + private boolean eof = true; + private ByteBufLineReader lineReader; + private AtomicInteger lineReadBytes = new AtomicInteger(); + private FileFragment fragment; + private Configuration conf; + + public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { + this.fragment = fragment; + this.conf = conf; + this.factory = new CompressionCodecFactory(conf); + this.codec = factory.getCodec(fragment.getPath()); + if (this.codec instanceof SplittableCompressionCodec) { + throw new NotImplementedException(); // bzip2 does not support multi-thread model + } + } + + public void init() throws IOException { + if (fs == null) { + fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); + } + if (fis == null) fis = fs.open(fragment.getPath()); + pos = startOffset = fragment.getStartKey(); + end = startOffset + fragment.getLength(); + + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + is = new DataInputStream(codec.createInputStream(fis, decompressor)); + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); + } else { + fis.seek(startOffset); + is = fis; + + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, + BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); + } + eof = false; + } + + public long getCompressedPosition() throws IOException { + long retVal; + if (isCompressed()) { + retVal = fis.getPos(); + } else { + retVal = pos; + } + return retVal; + } + + public long getUnCompressedPosition() throws IOException { + return pos; + } + + public long getReadBytes() { + return pos - startOffset; + } + + public boolean isReadable() { + return !eof; + } + + public ByteBuf readLine() throws IOException { + if (eof) { + return null; + } + + ByteBuf buf = lineReader.readLineBuf(lineReadBytes); + pos += lineReadBytes.get(); + if (buf == null) { + eof = true; + } + + if (!isCompressed() && getCompressedPosition() > end) { + eof = true; + } + return buf; + } + + public boolean isCompressed() { + return codec != null; + } + + @Override + public void close() throws IOException { + try { + IOUtils.cleanup(LOG, lineReader, is, fis); + fs = null; + is = null; + fis = null; + lineReader = null; + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java new file mode 100644 index 0000000..8824e3e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -0,0 +1,481 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.util.ReflectionUtil; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM; +import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM; + +public class DelimitedTextFile { + + public static final byte LF = '\n'; + + private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + + /** it caches line serde classes. */ + private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache = + new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>(); + + /** + * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given, + * it will use the specified serder class. + * + * @return TextLineSerder + */ + public static TextLineSerDe getLineSerde(TableMeta meta) { + TextLineSerDe lineSerder; + + String serDeClassName; + + // if there is no given serde class, it will use CSV line serder. + serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS); + + try { + Class<? extends TextLineSerDe> serdeClass; + + if (serdeClassCache.containsKey(serDeClassName)) { + serdeClass = serdeClassCache.get(serDeClassName); + } else { + serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName); + serdeClassCache.put(serDeClassName, serdeClass); + } + lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); + } catch (Throwable e) { + throw new RuntimeException("TextLineSerde class cannot be initialized.", e); + } + + return lineSerder; + } + + public static class DelimitedTextFileAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + + private NonSyncByteArrayOutputStream os; + private TextLineSerializer serializer; + + public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) + throws IOException { + super(conf, taskAttemptId, schema, meta, path); + this.fs = path.getFileSystem(conf); + this.meta = meta; + this.schema = schema; + } + + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if (compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + serializer = getLineSerde().createSerializer(schema, meta); + serializer.init(); + + if (os == null) { + os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + + @Override + public void addTuple(Tuple tuple) throws IOException { + // write + int rowBytes = serializer.serialize(os, tuple); + + // new line + os.write(LF); + rowBytes += 1; + + // update positions + pos += rowBytes; + bufferedBytes += rowBytes; + + // refill buffer if necessary + if (bufferedBytes > BUFFER_SIZE) { + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if (os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { + serializer.release(); + + if(outputStream != null){ + flush(); + } + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if (deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class DelimitedTextFileScanner extends FileScanner { + private boolean splittable = false; + private final long startOffset; + + private final long endOffset; + /** The number of actual read records */ + private int recordCount = 0; + private int[] targetColumnIndexes; + + private DelimitedLineReader reader; + private TextLineDeserializer deserializer; + + private int errorPrintOutMaxNum = 5; + /** Maximum number of permissible errors */ + private int errorTorrenceMaxNum; + /** How many errors have occurred? */ + private int errorNum; + + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, + final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + reader = new DelimitedLineReader(conf, this.fragment); + if (!reader.isCompressed()) { + splittable = true; + } + + startOffset = this.fragment.getStartKey(); + endOffset = startOffset + fragment.getLength(); + + errorTorrenceMaxNum = + Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); + } + + + @Override + public void init() throws IOException { + if (reader != null) { + reader.close(); + } + + reader = new DelimitedLineReader(conf, fragment); + reader.init(); + recordCount = 0; + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); + } + + if (startOffset > 0) { + reader.readLine(); // skip first line; + } + + deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); + deserializer.init(); + } + + public TextLineSerDe getLineSerde() { + return DelimitedTextFile.getLineSerde(meta); + } + + @Override + public float getProgress() { + try { + if (!reader.isReadable()) { + return 1.0f; + } + long filePos = reader.getCompressedPosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(endOffset - filePos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { + VTuple tuple; + + if (!reader.isReadable()) { + return null; + } + + try { + + // this loop will continue until one tuple is build or EOS (end of stream). + do { + + ByteBuf buf = reader.readLine(); + + // if no more line, then return EOT (end of tuple) + if (buf == null) { + return null; + } + + // If there is no required column, we just read each line + // and then return an empty tuple without parsing line. + if (targets.length == 0) { + recordCount++; + return EmptyTuple.get(); + } + + tuple = new VTuple(schema.size()); + + try { + deserializer.deserialize(buf, tuple); + // if a line is read normaly, it exists this loop. + break; + + } catch (TextLineParsingError tae) { + + errorNum++; + + // suppress too many log prints, which probably cause performance degradation + if (errorNum < errorPrintOutMaxNum) { + LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae); + } + + // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), + // it checks if the number of parsing error exceeds the max limit. + // Otherwise, it will ignore all parsing errors. + if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { + throw tae; + } + continue; + } + + } while (reader.isReadable()); // continue until EOS + + // recordCount means the number of actual read records. We increment the count here. + recordCount++; + + return tuple; + + } catch (Throwable t) { + LOG.error(t); + throw new IOException(t); + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + try { + if (deserializer != null) { + deserializer.release(); + } + + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); + } + } finally { + IOUtils.cleanup(LOG, reader); + reader = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public TableStats getInputStats() { + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java new file mode 100644 index 0000000..a5ac142 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class FieldSplitProcessor implements ByteBufProcessor { + private char delimiter; //the ascii separate character + + public FieldSplitProcessor(char recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + return delimiter != value; + } + + public char getDelimiter() { + return delimiter; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java new file mode 100644 index 0000000..a130527 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class LineSplitProcessor implements ByteBufProcessor { + public static final byte CR = '\r'; + public static final byte LF = '\n'; + private boolean prevCharCR = false; //true of prev char was CR + + @Override + public boolean process(byte value) throws Exception { + switch (value) { + case LF: + return false; + case CR: + prevCharCR = true; + return false; + default: + prevCharCR = false; + return true; + } + } + + public boolean isPrevCharCR() { + return prevCharCR; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java new file mode 100644 index 0000000..ae7565d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.CharsetDecoder; +import java.util.TimeZone; + +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); + + private final boolean hasTimezone; + private final TimeZone timezone; + + public TextFieldSerializerDeserializer(TableMeta meta) { + hasTimezone = meta.containsOption(StorageConstants.TIMEZONE); + timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); + } + + private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { + return !val.isReadable() || nullBytes.equals(val); + } + + private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { + return val.readableBytes() > 0 && nullBytes.equals(val); + } + + @Override + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) + throws IOException { + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullChars.length; + out.write(nullChars); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + if (hasTimezone) { + bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes(); + } else { + bytes = datum.asTextBytes(); + } + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + if (hasTimezone) { + bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes(); + } else { + bytes = datum.asTextBytes(); + } + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { + Datum datum; + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(buf, nullChars); + } else { + nullField = isNull(buf, nullChars); + } + + if (nullField) { + datum = NullDatum.get(); + } else { + switch (type) { + case BOOLEAN: + byte bool = buf.readByte(); + datum = DatumFactory.createBool(bool == 't' || bool == 'T'); + break; + case BIT: + datum = DatumFactory.createBit(Byte.parseByte( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); + break; + case CHAR: + datum = DatumFactory.createChar( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); + break; + case INT1: + case INT2: + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); + break; + case INT4: + datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); + break; + case INT8: + datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); + break; + case TEXT: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createText(bytes); + break; + } + case DATE: + datum = DatumFactory.createDate( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case TIME: + if (hasTimezone) { + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); + } else { + datum = DatumFactory.createTime( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + } + break; + case TIMESTAMP: + if (hasTimezone) { + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); + } else { + datum = DatumFactory.createTimestamp( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + } + break; + case INTERVAL: + datum = DatumFactory.createInterval( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + protobufJsonFormat.merge(bytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + break; + } + case INET4: + datum = DatumFactory.createInet4( + decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); + break; + case BLOB: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); + break; + } + default: + datum = NullDatum.get(); + break; + } + } + return datum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java new file mode 100644 index 0000000..7ebfa79 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +/** + * Reads a text line and fills a Tuple with values + */ +public abstract class TextLineDeserializer { + protected Schema schema; + protected TableMeta meta; + protected int [] targetColumnIndexes; + + public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { + this.schema = schema; + this.meta = meta; + this.targetColumnIndexes = targetColumnIndexes; + } + + /** + * Initialize SerDe + */ + public abstract void init(); + + /** + * It fills a tuple with a read fields in a given line. + * + * @param buf Read line + * @param output Tuple to be filled with read fields + * @throws java.io.IOException + */ + public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError; + + /** + * Release external resources + */ + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java new file mode 100644 index 0000000..f0bae5e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +public class TextLineParsingError extends Exception { + + public TextLineParsingError(Throwable t) { + super(t); + } + + public TextLineParsingError(String message, Throwable t) { + super(t.getMessage() + ", Error line: " + message); + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java new file mode 100644 index 0000000..e81e289 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.StorageConstants; + +/** + * Pluggable Text Line SerDe class + */ +public abstract class TextLineSerDe { + + public TextLineSerDe() { + } + + public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); + + public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); + + public static ByteBuf getNullChars(TableMeta meta) { + byte[] nullCharByteArray = getNullCharsAsBytes(meta); + + ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); + nullChars.writeBytes(nullCharByteArray); + + return nullChars; + } + + public static byte [] getNullCharsAsBytes(TableMeta meta) { + byte [] nullChars; + + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + + return nullChars; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java new file mode 100644 index 0000000..0c2761f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Write a Tuple into single text formatted line + */ +public abstract class TextLineSerializer { + protected Schema schema; + protected TableMeta meta; + + public TextLineSerializer(Schema schema, TableMeta meta) { + this.schema = schema; + this.meta = meta; + } + + public abstract void init(); + + public abstract int serialize(OutputStream out, Tuple input) throws IOException; + + public abstract void release(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java new file mode 100644 index 0000000..f76593e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.util.ReflectionUtils; +import parquet.bytes.BytesInput; +import parquet.hadoop.BadConfigurationException; +import parquet.hadoop.metadata.CompressionCodecName; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +class CodecFactory { + + public class BytesDecompressor { + + private final CompressionCodec codec; + private final Decompressor decompressor; + + public BytesDecompressor(CompressionCodec codec) { + this.codec = codec; + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + } else { + decompressor = null; + } + } + + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + final BytesInput decompressed; + if (codec != null) { + decompressor.reset(); + InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); + decompressed = BytesInput.from(is, uncompressedSize); + } else { + decompressed = bytes; + } + return decompressed; + } + + private void release() { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + } + } + + /** + * Encapsulates the logic around hadoop compression + * + * @author Julien Le Dem + * + */ + public static class BytesCompressor { + + private final CompressionCodec codec; + private final Compressor compressor; + private final ByteArrayOutputStream compressedOutBuffer; + private final CompressionCodecName codecName; + + public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { + this.codecName = codecName; + this.codec = codec; + if (codec != null) { + this.compressor = CodecPool.getCompressor(codec); + this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); + } else { + this.compressor = null; + this.compressedOutBuffer = null; + } + } + + public BytesInput compress(BytesInput bytes) throws IOException { + final BytesInput compressedBytes; + if (codec == null) { + compressedBytes = bytes; + } else { + compressedOutBuffer.reset(); + if (compressor != null) { + // null compressor for non-native gzip + compressor.reset(); + } + CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); + bytes.writeAllTo(cos); + cos.finish(); + cos.close(); + compressedBytes = BytesInput.from(compressedOutBuffer); + } + return compressedBytes; + } + + private void release() { + if (compressor != null) { + CodecPool.returnCompressor(compressor); + } + } + + public CompressionCodecName getCodecName() { + return codecName; + } + + } + + private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); + private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); + private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); + private final Configuration configuration; + + public CodecFactory(Configuration configuration) { + this.configuration = configuration; + } + + /** + * + * @param codecName the requested codec + * @return the corresponding hadoop codec. null if UNCOMPRESSED + */ + private CompressionCodec getCodec(CompressionCodecName codecName) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + CompressionCodec codec = codecByName.get(codecClassName); + if (codec != null) { + return codec; + } + + try { + Class<?> codecClass = Class.forName(codecClassName); + codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); + codecByName.put(codecClassName, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + + public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { + BytesCompressor comp = compressors.get(codecName); + if (comp == null) { + CompressionCodec codec = getCodec(codecName); + comp = new BytesCompressor(codecName, codec, pageSize); + compressors.put(codecName, comp); + } + return comp; + } + + public BytesDecompressor getDecompressor(CompressionCodecName codecName) { + BytesDecompressor decomp = decompressors.get(codecName); + if (decomp == null) { + CompressionCodec codec = getCodec(codecName); + decomp = new BytesDecompressor(codec); + decompressors.put(codecName, decomp); + } + return decomp; + } + + public void release() { + for (BytesCompressor compressor : compressors.values()) { + compressor.release(); + } + compressors.clear(); + for (BytesDecompressor decompressor : decompressors.values()) { + decompressor.release(); + } + decompressors.clear(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java new file mode 100644 index 0000000..0dedd9b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import parquet.Log; +import parquet.bytes.BytesInput; +import parquet.bytes.CapacityByteArrayOutputStream; +import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; +import parquet.column.page.DictionaryPage; +import parquet.column.page.PageWriteStore; +import parquet.column.page.PageWriter; +import parquet.column.statistics.BooleanStatistics; +import parquet.column.statistics.Statistics; +import parquet.format.converter.ParquetMetadataConverter; +import parquet.io.ParquetEncodingException; +import parquet.schema.MessageType; + +import java.io.IOException; +import java.util.*; + +import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; +import static parquet.Log.INFO; + +class ColumnChunkPageWriteStore implements PageWriteStore { + private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); + + private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); + + private static final class ColumnChunkPageWriter implements PageWriter { + + private final ColumnDescriptor path; + private final BytesCompressor compressor; + + private final CapacityByteArrayOutputStream buf; + private DictionaryPage dictionaryPage; + + private long uncompressedLength; + private long compressedLength; + private long totalValueCount; + private int pageCount; + + private Set<Encoding> encodings = new HashSet<Encoding>(); + + private Statistics totalStatistics; + + private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { + this.path = path; + this.compressor = compressor; + this.buf = new CapacityByteArrayOutputStream(initialSize); + this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); + } + + @Deprecated + @Override + public void writePage(BytesInput bytes, + int valueCount, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + long uncompressedSize = bytes.size(); + BytesInput compressedBytes = compressor.compress(bytes); + long compressedSize = compressedBytes.size(); + BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object + parquetMetadataConverter.writeDataPageHeader( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + buf); + this.uncompressedLength += uncompressedSize; + this.compressedLength += compressedSize; + this.totalValueCount += valueCount; + this.pageCount += 1; + compressedBytes.writeAllTo(buf); + encodings.add(rlEncoding); + encodings.add(dlEncoding); + encodings.add(valuesEncoding); + } + + @Override + public void writePage(BytesInput bytes, + int valueCount, + Statistics statistics, + Encoding rlEncoding, + Encoding dlEncoding, + Encoding valuesEncoding) throws IOException { + long uncompressedSize = bytes.size(); + BytesInput compressedBytes = compressor.compress(bytes); + long compressedSize = compressedBytes.size(); + parquetMetadataConverter.writeDataPageHeader( + (int)uncompressedSize, + (int)compressedSize, + valueCount, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + buf); + this.uncompressedLength += uncompressedSize; + this.compressedLength += compressedSize; + this.totalValueCount += valueCount; + this.pageCount += 1; + this.totalStatistics.mergeStatistics(statistics); + compressedBytes.writeAllTo(buf); + encodings.add(rlEncoding); + encodings.add(dlEncoding); + encodings.add(valuesEncoding); + } + + @Override + public long getMemSize() { + return buf.size(); + } + + public void writeToFileWriter(ParquetFileWriter writer) throws IOException { + writer.startColumn(path, totalValueCount, compressor.getCodecName()); + if (dictionaryPage != null) { + writer.writeDictionaryPage(dictionaryPage); + encodings.add(dictionaryPage.getEncoding()); + } + writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); + writer.endColumn(); + if (INFO) { + LOG.info( + String.format( + "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", + buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) + + (dictionaryPage != null ? String.format( + ", dic { %,d entries, %,dB raw, %,dB comp}", + dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) + : "")); + } + encodings.clear(); + pageCount = 0; + } + + @Override + public long allocatedSize() { + return buf.getCapacity(); + } + + @Override + public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { + if (this.dictionaryPage != null) { + throw new ParquetEncodingException("Only one dictionary page is allowed"); + } + BytesInput dictionaryBytes = dictionaryPage.getBytes(); + int uncompressedSize = (int)dictionaryBytes.size(); + BytesInput compressedBytes = compressor.compress(dictionaryBytes); + this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); + } + + @Override + public String memUsageString(String prefix) { + return buf.memUsageString(prefix + " ColumnChunkPageWriter"); + } + } + + private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); + private final MessageType schema; + private final BytesCompressor compressor; + private final int initialSize; + + public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { + this.compressor = compressor; + this.schema = schema; + this.initialSize = initialSize; + } + + @Override + public PageWriter getPageWriter(ColumnDescriptor path) { + if (!writers.containsKey(path)) { + writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); + } + return writers.get(path); + } + + public void flushToFileWriter(ParquetFileWriter writer) throws IOException { + List<ColumnDescriptor> columns = schema.getColumns(); + for (ColumnDescriptor columnDescriptor : columns) { + ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); + pageWriter.writeToFileWriter(writer); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java new file mode 100644 index 0000000..6bbd7b5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import parquet.Log; +import parquet.column.ColumnDescriptor; +import parquet.column.page.PageReadStore; +import parquet.filter.UnboundRecordFilter; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.api.ReadSupport; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.util.counters.BenchmarkCounter; +import parquet.io.ColumnIOFactory; +import parquet.io.MessageColumnIO; +import parquet.io.ParquetDecodingException; +import parquet.io.api.RecordMaterializer; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.Type; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static java.lang.String.format; +import static parquet.Log.DEBUG; + +class InternalParquetRecordReader<T> { + private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); + + private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); + + private MessageType requestedSchema; + private MessageType fileSchema; + private int columnCount; + private final ReadSupport<T> readSupport; + + private RecordMaterializer<T> recordConverter; + + private T currentValue; + private long total; + private int current = 0; + private int currentBlock = -1; + private ParquetFileReader reader; + private parquet.io.RecordReader<T> recordReader; + private UnboundRecordFilter recordFilter; + + private long totalTimeSpentReadingBytes; + private long totalTimeSpentProcessingRecords; + private long startedAssemblingCurrentBlockAt; + + private long totalCountLoadedSoFar = 0; + + private Path file; + + /** + * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. + */ + public InternalParquetRecordReader(ReadSupport<T> readSupport) { + this(readSupport, null); + } + + /** + * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. + * @param filter Optional filter for only returning matching records. + */ + public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter + filter) { + this.readSupport = readSupport; + this.recordFilter = filter; + } + + private void checkRead() throws IOException { + if (current == totalCountLoadedSoFar) { + if (current != 0) { + long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; + totalTimeSpentProcessingRecords += timeAssembling; + LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); + long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; + long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; + long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; + LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); + } + + LOG.info("at row " + current + ". reading next block"); + long t0 = System.currentTimeMillis(); + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); + } + long timeSpentReading = System.currentTimeMillis() - t0; + totalTimeSpentReadingBytes += timeSpentReading; + BenchmarkCounter.incrementTime(timeSpentReading); + LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); + if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); + MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); + recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); + startedAssemblingCurrentBlockAt = System.currentTimeMillis(); + totalCountLoadedSoFar += pages.getRowCount(); + ++ currentBlock; + } + } + + public void close() throws IOException { + reader.close(); + } + + public Void getCurrentKey() throws IOException, InterruptedException { + return null; + } + + public T getCurrentValue() throws IOException, + InterruptedException { + return currentValue; + } + + public float getProgress() throws IOException, InterruptedException { + return (float) current / total; + } + + public void initialize(MessageType requestedSchema, MessageType fileSchema, + Map<String, String> extraMetadata, Map<String, String> readSupportMetadata, + Path file, List<BlockMetaData> blocks, Configuration configuration) + throws IOException { + this.requestedSchema = requestedSchema; + this.fileSchema = fileSchema; + this.file = file; + this.columnCount = this.requestedSchema.getPaths().size(); + this.recordConverter = readSupport.prepareForRead( + configuration, extraMetadata, fileSchema, + new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); + + List<ColumnDescriptor> columns = requestedSchema.getColumns(); + reader = new ParquetFileReader(configuration, file, blocks, columns); + for (BlockMetaData block : blocks) { + total += block.getRowCount(); + } + LOG.info("RecordReader initialized will read a total of " + total + " records."); + } + + private boolean contains(GroupType group, String[] path, int index) { + if (index == path.length) { + return false; + } + if (group.containsField(path[index])) { + Type type = group.getType(path[index]); + if (type.isPrimitive()) { + return index + 1 == path.length; + } else { + return contains(type.asGroupType(), path, index + 1); + } + } + return false; + } + + public boolean nextKeyValue() throws IOException, InterruptedException { + if (current < total) { + try { + checkRead(); + currentValue = recordReader.read(); + if (DEBUG) LOG.debug("read value: " + currentValue); + current ++; + } catch (RuntimeException e) { + throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); + } + return true; + } + return false; + } +}
