http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java deleted file mode 100644 index 2f742c6..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.ByteBufInputChannel; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicInteger; - -public class ByteBufLineReader implements Closeable { - private static int DEFAULT_BUFFER = 64 * 1024; - - private int bufferSize; - private long readBytes; - private int startIndex; - private boolean eof = false; - private ByteBuf buffer; - private final ByteBufInputChannel channel; - private final AtomicInteger lineReadBytes = new AtomicInteger(); - private final LineSplitProcessor processor = new LineSplitProcessor(); - - public ByteBufLineReader(ByteBufInputChannel channel) { - this(channel, BufferPool.directBuffer(DEFAULT_BUFFER)); - } - - public ByteBufLineReader(ByteBufInputChannel channel, ByteBuf buf) { - this.readBytes = 0; - this.channel = channel; - this.buffer = buf; - this.bufferSize = buf.capacity(); - } - - public long readBytes() { - return readBytes - buffer.readableBytes(); - } - - @Override - public void close() throws IOException { - if (this.buffer.refCnt() > 0) { - this.buffer.release(); - } - this.channel.close(); - } - - public String readLine() throws IOException { - ByteBuf buf = readLineBuf(lineReadBytes); - if (buf != null) { - return buf.toString(CharsetUtil.UTF_8); - } - return null; - } - - private void fillBuffer() throws IOException { - - int tailBytes = 0; - if (this.readBytes > 0) { - //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes) - this.buffer.markReaderIndex(); - this.buffer.discardReadBytes(); // compact the buffer - tailBytes = this.buffer.writerIndex(); - if (!this.buffer.isWritable()) { - // a line bytes is large than the buffer - BufferPool.ensureWritable(buffer, bufferSize * 2); - this.bufferSize = buffer.capacity(); - } - this.startIndex = 0; - } - - boolean release = true; - try { - int readBytes = tailBytes; - for (; ; ) { - int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes); - if (localReadBytes < 0) { - if (buffer.isWritable()) { - //if read bytes is less than the buffer capacity, there is no more bytes in the channel - eof = true; - } - break; - } - readBytes += localReadBytes; - if (readBytes == bufferSize) { - break; - } - } - this.readBytes += (readBytes - tailBytes); - release = false; - - this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) - } finally { - if (release) { - buffer.release(); - } - } - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { - int readBytes = 0; // newline + text line bytes - int newlineLength = 0; //length of terminating newline - int readable; - - this.startIndex = buffer.readerIndex(); - - loop: - while (true) { - readable = buffer.readableBytes(); - if (readable <= 0) { - buffer.readerIndex(this.startIndex); - fillBuffer(); //compact and fill buffer - - //if buffer.writerIndex() is zero, there is no bytes in buffer - if (!buffer.isReadable() && buffer.writerIndex() == 0) { - reads.set(0); - return null; - } else { - //skip first newLine - if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { - buffer.skipBytes(1); - if(eof && !buffer.isReadable()) { - reads.set(1); - return null; - } - - newlineLength++; - readBytes++; - startIndex = buffer.readerIndex(); - } - } - readable = buffer.readableBytes(); - } - - int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor); - if (endIndex < 0) { - //does not appeared terminating newline - buffer.readerIndex(buffer.writerIndex()); // set to end buffer - if(eof){ - readBytes += (buffer.readerIndex() - startIndex); - break loop; - } - } else { - buffer.readerIndex(endIndex + 1); - readBytes += (buffer.readerIndex() - startIndex); //past newline + text line - - //appeared terminating CRLF - if (processor.isPrevCharCR() && buffer.isReadable() - && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { - buffer.skipBytes(1); - readBytes++; - newlineLength += 2; - } else { - newlineLength += 1; - } - break loop; - } - } - reads.set(readBytes); - return buffer.slice(startIndex, readBytes - newlineLength); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java deleted file mode 100644 index 1599f62..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.FieldSerializerDeserializer; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public class CSVLineDeserializer extends TextLineDeserializer { - private FieldSplitProcessor processor; - private FieldSerializerDeserializer fieldSerDer; - private ByteBuf nullChars; - - public CSVLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - super(schema, meta, targetColumnIndexes); - } - - @Override - public void init() { - this.processor = new FieldSplitProcessor(CSVLineSerDe.getFieldDelimiter(meta)); - - if (nullChars != null) { - nullChars.release(); - } - nullChars = TextLineSerDe.getNullChars(meta); - - fieldSerDer = new TextFieldSerializerDeserializer(meta); - } - - public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError { - int[] projection = targetColumnIndexes; - if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) { - return; - } - - final int rowLength = lineBuf.readableBytes(); - int start = 0, fieldLength = 0, end = 0; - - //Projection - int currentTarget = 0; - int currentIndex = 0; - - while (end != -1) { - end = lineBuf.forEachByte(start, rowLength - start, processor); - - if (end < 0) { - fieldLength = rowLength - start; - } else { - fieldLength = end - start; - } - - if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { - lineBuf.setIndex(start, start + fieldLength); - Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars); - output.put(currentIndex, datum); - currentTarget++; - } - - if (projection.length == currentTarget) { - break; - } - - start = end + 1; - currentIndex++; - } - } - - @Override - public void release() { - if (nullChars != null) { - nullChars.release(); - nullChars = null; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java deleted file mode 100644 index 2fe7f23..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerDe.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.StorageConstants; - -public class CSVLineSerDe extends TextLineSerDe { - @Override - public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - return new CSVLineDeserializer(schema, meta, targetColumnIndexes); - } - - @Override - public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { - return new CSVLineSerializer(schema, meta); - } - - public static char getFieldDelimiter(TableMeta meta) { - return StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_DELIMITER, - StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java deleted file mode 100644 index c0fc18f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/CSVLineSerializer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.FieldSerializerDeserializer; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; -import java.io.OutputStream; - -public class CSVLineSerializer extends TextLineSerializer { - private FieldSerializerDeserializer serde; - - private byte [] nullChars; - private char delimiter; - private int columnNum; - - public CSVLineSerializer(Schema schema, TableMeta meta) { - super(schema, meta); - } - - @Override - public void init() { - nullChars = TextLineSerDe.getNullCharsAsBytes(meta); - delimiter = CSVLineSerDe.getFieldDelimiter(meta); - columnNum = schema.size(); - - serde = new TextFieldSerializerDeserializer(meta); - } - - @Override - public int serialize(OutputStream out, Tuple input) throws IOException { - int writtenBytes = 0; - - for (int i = 0; i < columnNum; i++) { - Datum datum = input.get(i); - writtenBytes += serde.serialize(out, datum, schema.getColumn(i), i, nullChars); - - if (columnNum - 1 > i) { - out.write((byte) delimiter); - writtenBytes += 1; - } - } - - return writtenBytes; - } - - @Override - public void release() { - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java deleted file mode 100644 index 0efe030..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ /dev/null @@ -1,156 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; -import org.apache.tajo.common.exception.NotImplementedException; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.ByteBufInputChannel; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.Closeable; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicInteger; - -public class DelimitedLineReader implements Closeable { - private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); - private final static int DEFAULT_PAGE_SIZE = 128 * 1024; - - private FileSystem fs; - private FSDataInputStream fis; - private InputStream is; //decompressd stream - private CompressionCodecFactory factory; - private CompressionCodec codec; - private Decompressor decompressor; - - private long startOffset, end, pos; - private boolean eof = true; - private ByteBufLineReader lineReader; - private AtomicInteger lineReadBytes = new AtomicInteger(); - private FileFragment fragment; - private Configuration conf; - - public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { - this.fragment = fragment; - this.conf = conf; - this.factory = new CompressionCodecFactory(conf); - this.codec = factory.getCodec(fragment.getPath()); - if (this.codec instanceof SplittableCompressionCodec) { - throw new NotImplementedException(); // bzip2 does not support multi-thread model - } - } - - public void init() throws IOException { - if (fs == null) { - fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); - } - if (fis == null) fis = fs.open(fragment.getPath()); - pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getEndKey(); - - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - is = new DataInputStream(codec.createInputStream(fis, decompressor)); - ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); - } else { - fis.seek(startOffset); - is = fis; - - ByteBufInputChannel channel = new ByteBufInputChannel(is); - lineReader = new ByteBufLineReader(channel, - BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); - } - eof = false; - } - - public long getCompressedPosition() throws IOException { - long retVal; - if (isCompressed()) { - retVal = fis.getPos(); - } else { - retVal = pos; - } - return retVal; - } - - public long getUnCompressedPosition() throws IOException { - return pos; - } - - public long getReadBytes() { - return pos - startOffset; - } - - public boolean isReadable() { - return !eof; - } - - public ByteBuf readLine() throws IOException { - if (eof) { - return null; - } - - ByteBuf buf = lineReader.readLineBuf(lineReadBytes); - pos += lineReadBytes.get(); - if (buf == null) { - eof = true; - } - - if (!isCompressed() && getCompressedPosition() > end) { - eof = true; - } - return buf; - } - - public boolean isCompressed() { - return codec != null; - } - - @Override - public void close() throws IOException { - try { - IOUtils.cleanup(LOG, lineReader, is, fis); - fs = null; - is = null; - fis = null; - lineReader = null; - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java deleted file mode 100644 index ab8a0b5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ /dev/null @@ -1,478 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; -import org.apache.tajo.util.ReflectionUtil; - -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import static org.apache.tajo.storage.StorageConstants.DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM; -import static org.apache.tajo.storage.StorageConstants.TEXT_ERROR_TOLERANCE_MAXNUM; - -public class DelimitedTextFile { - - public static final byte LF = '\n'; - public static int EOF = -1; - - private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); - - /** it caches line serde classes. */ - private static final Map<String, Class<? extends TextLineSerDe>> serdeClassCache = - new ConcurrentHashMap<String, Class<? extends TextLineSerDe>>(); - - /** - * By default, DelimitedTextFileScanner uses CSVLineSerder. If a table property 'text.serde.class' is given, - * it will use the specified serder class. - * - * @return TextLineSerder - */ - public static TextLineSerDe getLineSerde(TableMeta meta) { - TextLineSerDe lineSerder; - - String serDeClassName; - - // if there is no given serde class, it will use CSV line serder. - serDeClassName = meta.getOption(StorageConstants.TEXT_SERDE_CLASS, StorageConstants.DEFAULT_TEXT_SERDE_CLASS); - - try { - Class<? extends TextLineSerDe> serdeClass; - - if (serdeClassCache.containsKey(serDeClassName)) { - serdeClass = serdeClassCache.get(serDeClassName); - } else { - serdeClass = (Class<? extends TextLineSerDe>) Class.forName(serDeClassName); - serdeClassCache.put(serDeClassName, serdeClass); - } - lineSerder = (TextLineSerDe) ReflectionUtil.newInstance(serdeClass); - } catch (Throwable e) { - throw new RuntimeException("TextLineSerde class cannot be initialized.", e); - } - - return lineSerder; - } - - public static class DelimitedTextFileAppender extends FileAppender { - private final TableMeta meta; - private final Schema schema; - private final FileSystem fs; - private FSDataOutputStream fos; - private DataOutputStream outputStream; - private CompressionOutputStream deflateFilter; - private TableStatistics stats = null; - private Compressor compressor; - private CompressionCodecFactory codecFactory; - private CompressionCodec codec; - private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; - private int bufferedBytes = 0; - private long pos = 0; - - private NonSyncByteArrayOutputStream os; - private TextLineSerializer serializer; - - public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) - throws IOException { - super(conf, schema, meta, path); - this.fs = path.getFileSystem(conf); - this.meta = meta; - this.schema = schema; - } - - public TextLineSerDe getLineSerde() { - return DelimitedTextFile.getLineSerde(meta); - } - - @Override - public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { - String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); - codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByClassName(codecName); - compressor = CodecPool.getCompressor(codec); - if (compressor != null) compressor.reset(); //builtin gzip is null - - String extension = codec.getDefaultExtension(); - compressedPath = path.suffix(extension); - - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); - deflateFilter = codec.createOutputStream(fos, compressor); - outputStream = new DataOutputStream(deflateFilter); - - } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); - outputStream = new DataOutputStream(new BufferedOutputStream(fos)); - } - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - serializer = getLineSerde().createSerializer(schema, meta); - serializer.init(); - - if (os == null) { - os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); - } - - os.reset(); - pos = fos.getPos(); - bufferedBytes = 0; - super.init(); - } - - @Override - public void addTuple(Tuple tuple) throws IOException { - // write - int rowBytes = serializer.serialize(os, tuple); - - // new line - os.write(LF); - rowBytes += 1; - - // update positions - pos += rowBytes; - bufferedBytes += rowBytes; - - // refill buffer if necessary - if (bufferedBytes > BUFFER_SIZE) { - flushBuffer(); - } - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - private void flushBuffer() throws IOException { - if (os.getLength() > 0) { - os.writeTo(outputStream); - os.reset(); - bufferedBytes = 0; - } - } - - @Override - public long getOffset() throws IOException { - return pos; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - - try { - serializer.release(); - - if(outputStream != null){ - flush(); - } - - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - - if (deflateFilter != null) { - deflateFilter.finish(); - deflateFilter.resetState(); - deflateFilter = null; - } - - os.close(); - } finally { - IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - - public boolean isCompress() { - return compressor != null; - } - - public String getExtension() { - return codec != null ? codec.getDefaultExtension() : ""; - } - } - - public static class DelimitedTextFileScanner extends FileScanner { - private boolean splittable = false; - private final long startOffset; - - private final long endOffset; - /** The number of actual read records */ - private int recordCount = 0; - private int[] targetColumnIndexes; - - private DelimitedLineReader reader; - private TextLineDeserializer deserializer; - - private int errorPrintOutMaxNum = 5; - /** Maximum number of permissible errors */ - private int errorTorrenceMaxNum; - /** How many errors have occurred? */ - private int errorNum; - - public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, - final FileFragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - reader = new DelimitedLineReader(conf, fragment); - if (!reader.isCompressed()) { - splittable = true; - } - - startOffset = fragment.getStartKey(); - endOffset = startOffset + fragment.getEndKey(); - - errorTorrenceMaxNum = - Integer.parseInt(meta.getOption(TEXT_ERROR_TOLERANCE_MAXNUM, DEFAULT_TEXT_ERROR_TOLERANCE_MAXNUM)); - } - - @Override - public void init() throws IOException { - if (reader != null) { - reader.close(); - } - - reader = new DelimitedLineReader(conf, fragment); - reader.init(); - recordCount = 0; - - if (targets == null) { - targets = schema.toArray(); - } - - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - - super.init(); - Arrays.sort(targetColumnIndexes); - if (LOG.isDebugEnabled()) { - LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); - } - - if (startOffset > 0) { - reader.readLine(); // skip first line; - } - - deserializer = getLineSerde().createDeserializer(schema, meta, targetColumnIndexes); - deserializer.init(); - } - - public TextLineSerDe getLineSerde() { - return DelimitedTextFile.getLineSerde(meta); - } - - @Override - public float getProgress() { - try { - if (!reader.isReadable()) { - return 1.0f; - } - long filePos = reader.getCompressedPosition(); - if (startOffset == filePos) { - return 0.0f; - } else { - long readBytes = filePos - startOffset; - long remainingBytes = Math.max(endOffset - filePos, 0); - return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return 0.0f; - } - } - - @Override - public Tuple next() throws IOException { - VTuple tuple; - - if (!reader.isReadable()) { - return null; - } - - try { - - // this loop will continue until one tuple is build or EOS (end of stream). - do { - - ByteBuf buf = reader.readLine(); - - // if no more line, then return EOT (end of tuple) - if (buf == null) { - return null; - } - - // If there is no required column, we just read each line - // and then return an empty tuple without parsing line. - if (targets.length == 0) { - recordCount++; - return EmptyTuple.get(); - } - - tuple = new VTuple(schema.size()); - - try { - deserializer.deserialize(buf, tuple); - // if a line is read normaly, it exists this loop. - break; - - } catch (TextLineParsingError tae) { - - errorNum++; - - // suppress too many log prints, which probably cause performance degradation - if (errorNum < errorPrintOutMaxNum) { - LOG.warn("Ignore JSON Parse Error (" + errorNum + "): ", tae); - } - - // Only when the maximum error torrence limit is set (i.e., errorTorrenceMaxNum >= 0), - // it checks if the number of parsing error exceeds the max limit. - // Otherwise, it will ignore all parsing errors. - if (errorTorrenceMaxNum >= 0 && errorNum > errorTorrenceMaxNum) { - throw tae; - } - continue; - } - - } while (reader.isReadable()); // continue until EOS - - // recordCount means the number of actual read records. We increment the count here. - recordCount++; - - return tuple; - - } catch (Throwable t) { - LOG.error(t); - throw new IOException(t); - } - } - - @Override - public void reset() throws IOException { - init(); - } - - @Override - public void close() throws IOException { - try { - if (deserializer != null) { - deserializer.release(); - } - - if (tableStats != null && reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - } - if (LOG.isDebugEnabled()) { - LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); - } - } finally { - IOUtils.cleanup(LOG, reader); - reader = null; - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public boolean isSplittable() { - return splittable; - } - - @Override - public TableStats getInputStats() { - if (tableStats != null && reader != null) { - tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - tableStats.setNumBytes(fragment.getEndKey()); - } - return tableStats; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java deleted file mode 100644 index a5ac142..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBufProcessor; - -public class FieldSplitProcessor implements ByteBufProcessor { - private char delimiter; //the ascii separate character - - public FieldSplitProcessor(char recordDelimiterByte) { - this.delimiter = recordDelimiterByte; - } - - @Override - public boolean process(byte value) throws Exception { - return delimiter != value; - } - - public char getDelimiter() { - return delimiter; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java deleted file mode 100644 index a130527..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBufProcessor; - -public class LineSplitProcessor implements ByteBufProcessor { - public static final byte CR = '\r'; - public static final byte LF = '\n'; - private boolean prevCharCR = false; //true of prev char was CR - - @Override - public boolean process(byte value) throws Exception { - switch (value) { - case LF: - return false; - case CR: - prevCharCR = true; - return false; - default: - prevCharCR = false; - return true; - } - } - - public boolean isPrevCharCR() { - return prevCharCR; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java deleted file mode 100644 index ae7565d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java +++ /dev/null @@ -1,253 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import com.google.protobuf.Message; -import io.netty.buffer.ByteBuf; -import io.netty.util.CharsetUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.*; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.storage.FieldSerializerDeserializer; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.util.NumberUtil; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.CharsetDecoder; -import java.util.TimeZone; - -public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { - public static final byte[] trueBytes = "true".getBytes(); - public static final byte[] falseBytes = "false".getBytes(); - private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - private final CharsetDecoder decoder = CharsetUtil.getDecoder(CharsetUtil.UTF_8); - - private final boolean hasTimezone; - private final TimeZone timezone; - - public TextFieldSerializerDeserializer(TableMeta meta) { - hasTimezone = meta.containsOption(StorageConstants.TIMEZONE); - timezone = TimeZone.getTimeZone(meta.getOption(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); - } - - private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { - return !val.isReadable() || nullBytes.equals(val); - } - - private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { - return val.readableBytes() > 0 && nullBytes.equals(val); - } - - @Override - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) - throws IOException { - byte[] bytes; - int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); - - if (datum == null || datum instanceof NullDatum) { - switch (dataType.getType()) { - case CHAR: - case TEXT: - length = nullChars.length; - out.write(nullChars); - break; - default: - break; - } - return length; - } - - switch (dataType.getType()) { - case BOOLEAN: - out.write(datum.asBool() ? trueBytes : falseBytes); - length = trueBytes.length; - break; - case CHAR: - byte[] pad = new byte[dataType.getLength() - datum.size()]; - bytes = datum.asTextBytes(); - out.write(bytes); - out.write(pad); - length = bytes.length + pad.length; - break; - case TEXT: - case BIT: - case INT2: - case INT4: - case INT8: - case FLOAT4: - case FLOAT8: - case INET4: - case DATE: - case INTERVAL: - bytes = datum.asTextBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIME: - if (hasTimezone) { - bytes = ((TimeDatum) datum).asChars(timezone, true).getBytes(); - } else { - bytes = datum.asTextBytes(); - } - length = bytes.length; - out.write(bytes); - break; - case TIMESTAMP: - if (hasTimezone) { - bytes = ((TimestampDatum) datum).asChars(timezone, true).getBytes(); - } else { - bytes = datum.asTextBytes(); - } - length = bytes.length; - out.write(bytes); - break; - case INET6: - case BLOB: - bytes = Base64.encodeBase64(datum.asByteArray(), false); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobuf = (ProtobufDatum) datum; - byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); - length = protoBytes.length; - out.write(protoBytes, 0, protoBytes.length); - break; - case NULL_TYPE: - default: - break; - } - return length; - } - - @Override - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException { - Datum datum; - TajoDataTypes.Type type = col.getDataType().getType(); - boolean nullField; - if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { - nullField = isNullText(buf, nullChars); - } else { - nullField = isNull(buf, nullChars); - } - - if (nullField) { - datum = NullDatum.get(); - } else { - switch (type) { - case BOOLEAN: - byte bool = buf.readByte(); - datum = DatumFactory.createBool(bool == 't' || bool == 'T'); - break; - case BIT: - datum = DatumFactory.createBit(Byte.parseByte( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString())); - break; - case CHAR: - datum = DatumFactory.createChar( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString().trim()); - break; - case INT1: - case INT2: - datum = DatumFactory.createInt2((short) NumberUtil.parseInt(buf)); - break; - case INT4: - datum = DatumFactory.createInt4(NumberUtil.parseInt(buf)); - break; - case INT8: - datum = DatumFactory.createInt8(NumberUtil.parseLong(buf)); - break; - case FLOAT4: - datum = DatumFactory.createFloat4( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case FLOAT8: - datum = DatumFactory.createFloat8(NumberUtil.parseDouble(buf)); - break; - case TEXT: { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createText(bytes); - break; - } - case DATE: - datum = DatumFactory.createDate( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case TIME: - if (hasTimezone) { - datum = DatumFactory.createTime( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); - } else { - datum = DatumFactory.createTime( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - } - break; - case TIMESTAMP: - if (hasTimezone) { - datum = DatumFactory.createTimestamp( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString(), timezone); - } else { - datum = DatumFactory.createTimestamp( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - } - break; - case INTERVAL: - datum = DatumFactory.createInterval( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); - Message.Builder builder = factory.newBuilder(); - try { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - protobufJsonFormat.merge(bytes, builder); - datum = factory.createDatum(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - break; - } - case INET4: - datum = DatumFactory.createInet4( - decoder.decode(buf.nioBuffer(buf.readerIndex(), buf.readableBytes())).toString()); - break; - case BLOB: { - byte[] bytes = new byte[buf.readableBytes()]; - buf.readBytes(bytes); - datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); - break; - } - default: - datum = NullDatum.get(); - break; - } - } - return datum; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java deleted file mode 100644 index 7ebfa79..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineDeserializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -/** - * Reads a text line and fills a Tuple with values - */ -public abstract class TextLineDeserializer { - protected Schema schema; - protected TableMeta meta; - protected int [] targetColumnIndexes; - - public TextLineDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes) { - this.schema = schema; - this.meta = meta; - this.targetColumnIndexes = targetColumnIndexes; - } - - /** - * Initialize SerDe - */ - public abstract void init(); - - /** - * It fills a tuple with a read fields in a given line. - * - * @param buf Read line - * @param output Tuple to be filled with read fields - * @throws java.io.IOException - */ - public abstract void deserialize(final ByteBuf buf, Tuple output) throws IOException, TextLineParsingError; - - /** - * Release external resources - */ - public abstract void release(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java deleted file mode 100644 index f0bae5e..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineParsingError.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -public class TextLineParsingError extends Exception { - - public TextLineParsingError(Throwable t) { - super(t); - } - - public TextLineParsingError(String message, Throwable t) { - super(t.getMessage() + ", Error line: " + message); - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java deleted file mode 100644 index e81e289..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerDe.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import io.netty.buffer.ByteBuf; -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.BufferPool; -import org.apache.tajo.storage.StorageConstants; - -/** - * Pluggable Text Line SerDe class - */ -public abstract class TextLineSerDe { - - public TextLineSerDe() { - } - - public abstract TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int [] targetColumnIndexes); - - public abstract TextLineSerializer createSerializer(Schema schema, TableMeta meta); - - public static ByteBuf getNullChars(TableMeta meta) { - byte[] nullCharByteArray = getNullCharsAsBytes(meta); - - ByteBuf nullChars = BufferPool.directBuffer(nullCharByteArray.length, nullCharByteArray.length); - nullChars.writeBytes(nullCharByteArray); - - return nullChars; - } - - public static byte [] getNullCharsAsBytes(TableMeta meta) { - byte [] nullChars; - - String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, - NullDatum.DEFAULT_TEXT)); - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - - return nullChars; - } - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java deleted file mode 100644 index 0c2761f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextLineSerializer.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.text; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * Write a Tuple into single text formatted line - */ -public abstract class TextLineSerializer { - protected Schema schema; - protected TableMeta meta; - - public TextLineSerializer(Schema schema, TableMeta meta) { - this.schema = schema; - this.meta = meta; - } - - public abstract void init(); - - public abstract int serialize(OutputStream out, Tuple input) throws IOException; - - public abstract void release(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java deleted file mode 100644 index 543336f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionOutputStream; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.util.ReflectionUtils; - -import parquet.bytes.BytesInput; -import parquet.hadoop.BadConfigurationException; -import parquet.hadoop.metadata.CompressionCodecName; - -class CodecFactory { - - public class BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - public BytesDecompressor(CompressionCodec codec) { - this.codec = codec; - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } else { - decompressed = bytes; - } - return decompressed; - } - - private void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - /** - * Encapsulates the logic around hadoop compression - * - * @author Julien Le Dem - * - */ - public static class BytesCompressor { - - private final CompressionCodec codec; - private final Compressor compressor; - private final ByteArrayOutputStream compressedOutBuffer; - private final CompressionCodecName codecName; - - public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { - this.codecName = codecName; - this.codec = codec; - if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); - this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); - } else { - this.compressor = null; - this.compressedOutBuffer = null; - } - } - - public BytesInput compress(BytesInput bytes) throws IOException { - final BytesInput compressedBytes; - if (codec == null) { - compressedBytes = bytes; - } else { - compressedOutBuffer.reset(); - if (compressor != null) { - // null compressor for non-native gzip - compressor.reset(); - } - CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); - bytes.writeAllTo(cos); - cos.finish(); - cos.close(); - compressedBytes = BytesInput.from(compressedOutBuffer); - } - return compressedBytes; - } - - private void release() { - if (compressor != null) { - CodecPool.returnCompressor(compressor); - } - } - - public CompressionCodecName getCodecName() { - return codecName; - } - - } - - private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>(); - private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>(); - private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>(); - private final Configuration configuration; - - public CodecFactory(Configuration configuration) { - this.configuration = configuration; - } - - /** - * - * @param codecName the requested codec - * @return the corresponding hadoop codec. null if UNCOMPRESSED - */ - private CompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - CompressionCodec codec = codecByName.get(codecClassName); - if (codec != null) { - return codec; - } - - try { - Class<?> codecClass = Class.forName(codecClassName); - codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { - BytesCompressor comp = compressors.get(codecName); - if (comp == null) { - CompressionCodec codec = getCodec(codecName); - comp = new BytesCompressor(codecName, codec, pageSize); - compressors.put(codecName, comp); - } - return comp; - } - - public BytesDecompressor getDecompressor(CompressionCodecName codecName) { - BytesDecompressor decomp = decompressors.get(codecName); - if (decomp == null) { - CompressionCodec codec = getCodec(codecName); - decomp = new BytesDecompressor(codec); - decompressors.put(codecName, decomp); - } - return decomp; - } - - public void release() { - for (BytesCompressor compressor : compressors.values()) { - compressor.release(); - } - compressors.clear(); - for (BytesDecompressor decompressor : decompressors.values()) { - decompressor.release(); - } - decompressors.clear(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java deleted file mode 100644 index 5f89ead..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java +++ /dev/null @@ -1,211 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.INFO; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import parquet.Log; -import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageWriteStore; -import parquet.column.page.PageWriter; -import parquet.column.statistics.Statistics; -import parquet.column.statistics.BooleanStatistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; - -class ColumnChunkPageWriteStore implements PageWriteStore { - private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private static final class ColumnChunkPageWriter implements PageWriter { - - private final ColumnDescriptor path; - private final BytesCompressor compressor; - - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - private Set<Encoding> encodings = new HashSet<Encoding>(); - - private Statistics totalStatistics; - - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { - this.path = path; - this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); - } - - @Deprecated - @Override - public void writePage(BytesInput bytes, - int valueCount, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, - int valueCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public long getMemSize() { - return buf.size(); - } - - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.startColumn(path, totalValueCount, compressor.getCodecName()); - if (dictionaryPage != null) { - writer.writeDictionaryPage(dictionaryPage); - encodings.add(dictionaryPage.getEncoding()); - } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings)); - writer.endColumn(); - if (INFO) { - LOG.info( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); - } - encodings.clear(); - pageCount = 0; - } - - @Override - public long allocatedSize() { - return buf.getCapacity(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - } - - private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>(); - private final MessageType schema; - private final BytesCompressor compressor; - private final int initialSize; - - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { - this.compressor = compressor; - this.schema = schema; - this.initialSize = initialSize; - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - if (!writers.containsKey(path)) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); - } - return writers.get(path); - } - - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - List<ColumnDescriptor> columns = schema.getColumns(); - for (ColumnDescriptor columnDescriptor : columns) { - ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); - pageWriter.writeToFileWriter(writer); - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java deleted file mode 100644 index 61567e5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.column.ColumnDescriptor; -import parquet.column.page.PageReadStore; -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.util.counters.BenchmarkCounter; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.ParquetDecodingException; -import parquet.io.api.RecordMaterializer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import static java.lang.String.format; -import static parquet.Log.DEBUG; - -class InternalParquetRecordReader<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); - - private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); - - private MessageType requestedSchema; - private MessageType fileSchema; - private int columnCount; - private final ReadSupport<T> readSupport; - - private RecordMaterializer<T> recordConverter; - - private T currentValue; - private long total; - private int current = 0; - private int currentBlock = -1; - private ParquetFileReader reader; - private parquet.io.RecordReader<T> recordReader; - private UnboundRecordFilter recordFilter; - - private long totalTimeSpentReadingBytes; - private long totalTimeSpentProcessingRecords; - private long startedAssemblingCurrentBlockAt; - - private long totalCountLoadedSoFar = 0; - - private Path file; - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport) { - this(readSupport, null); - } - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - * @param filter Optional filter for only returning matching records. - */ - public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter - filter) { - this.readSupport = readSupport; - this.recordFilter = filter; - } - - private void checkRead() throws IOException { - if (current == totalCountLoadedSoFar) { - if (current != 0) { - long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; - totalTimeSpentProcessingRecords += timeAssembling; - LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); - long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; - long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; - long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; - LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); - } - - LOG.info("at row " + current + ". reading next block"); - long t0 = System.currentTimeMillis(); - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { - throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); - } - long timeSpentReading = System.currentTimeMillis() - t0; - totalTimeSpentReadingBytes += timeSpentReading; - BenchmarkCounter.incrementTime(timeSpentReading); - LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); - if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema); - MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); - startedAssemblingCurrentBlockAt = System.currentTimeMillis(); - totalCountLoadedSoFar += pages.getRowCount(); - ++ currentBlock; - } - } - - public void close() throws IOException { - reader.close(); - } - - public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - public T getCurrentValue() throws IOException, - InterruptedException { - return currentValue; - } - - public float getProgress() throws IOException, InterruptedException { - return (float) current / total; - } - - public void initialize(MessageType requestedSchema, MessageType fileSchema, - Map<String, String> extraMetadata, Map<String, String> readSupportMetadata, - Path file, List<BlockMetaData> blocks, Configuration configuration) - throws IOException { - this.requestedSchema = requestedSchema; - this.fileSchema = fileSchema; - this.file = file; - this.columnCount = this.requestedSchema.getPaths().size(); - this.recordConverter = readSupport.prepareForRead( - configuration, extraMetadata, fileSchema, - new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); - - List<ColumnDescriptor> columns = requestedSchema.getColumns(); - reader = new ParquetFileReader(configuration, file, blocks, columns); - for (BlockMetaData block : blocks) { - total += block.getRowCount(); - } - LOG.info("RecordReader initialized will read a total of " + total + " records."); - } - - private boolean contains(GroupType group, String[] path, int index) { - if (index == path.length) { - return false; - } - if (group.containsField(path[index])) { - Type type = group.getType(path[index]); - if (type.isPrimitive()) { - return index + 1 == path.length; - } else { - return contains(type.asGroupType(), path, index + 1); - } - } - return false; - } - - public boolean nextKeyValue() throws IOException, InterruptedException { - if (current < total) { - try { - checkRead(); - currentValue = recordReader.read(); - if (DEBUG) LOG.debug("read value: " + currentValue); - current ++; - } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); - } - return true; - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java deleted file mode 100644 index 7410d2b..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import static java.lang.Math.max; -import static java.lang.Math.min; -import static java.lang.String.format; -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; -import static parquet.Preconditions.checkNotNull; - -import java.io.IOException; -import java.util.Map; - -import parquet.Log; -import parquet.column.ParquetProperties.WriterVersion; -import parquet.column.impl.ColumnWriteStoreImpl; -import parquet.hadoop.api.WriteSupport; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.MessageType; - -class InternalParquetRecordWriter<T> { - private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - - private final ParquetFileWriter w; - private final WriteSupport<T> writeSupport; - private final MessageType schema; - private final Map<String, String> extraMetaData; - private final int blockSize; - private final int pageSize; - private final BytesCompressor compressor; - private final int dictionaryPageSize; - private final boolean enableDictionary; - private final boolean validating; - private final WriterVersion writerVersion; - - private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; - - private ColumnWriteStoreImpl store; - private ColumnChunkPageWriteStore pageStore; - - /** - * @param w the file to write to - * @param writeSupport the class to convert incoming records - * @param schema the schema of the records - * @param extraMetaData extra meta data to write in the footer of the file - * @param blockSize the size of a block in the file (this will be approximate) - * @param codec the codec used to compress - */ - public InternalParquetRecordWriter( - ParquetFileWriter w, - WriteSupport<T> writeSupport, - MessageType schema, - Map<String, String> extraMetaData, - int blockSize, - int pageSize, - BytesCompressor compressor, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - WriterVersion writerVersion) { - this.w = w; - this.writeSupport = checkNotNull(writeSupport, "writeSupport"); - this.schema = schema; - this.extraMetaData = extraMetaData; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.compressor = compressor; - this.dictionaryPageSize = dictionaryPageSize; - this.enableDictionary = enableDictionary; - this.validating = validating; - this.writerVersion = writerVersion; - initStore(); - } - - private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); - MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); - } - - public void close() throws IOException, InterruptedException { - flushStore(); - w.end(extraMetaData); - } - - public void write(T value) throws IOException, InterruptedException { - writeSupport.write(value); - ++ recordCount; - checkBlockSizeReached(); - } - - private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. - long memSize = store.memSize(); - if (memSize > blockSize) { - LOG.info(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); - flushStore(); - initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); - } else { - float recordSize = (float) memSize / recordCount; - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); - if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); - } - } - } - - public long getEstimatedWrittenSize() throws IOException { - return w.getPos() + store.memSize(); - } - - private void flushStore() - throws IOException { - LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); - if (store.allocatedSize() > 3 * blockSize) { - LOG.warn("Too much memory used: " + store.memUsageString()); - } - w.startBlock(recordCount); - store.flush(); - pageStore.flushToFileWriter(w); - recordCount = 0; - w.endBlock(); - store = null; - pageStore = null; - } -} \ No newline at end of file
