Repository: tajo Updated Branches: refs/heads/master 5a72e2f62 -> 3e305b15f
http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java new file mode 100644 index 0000000..eb1929e --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.SplittableCompressionCodec; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.ByteBufInputChannel; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.BufferPool; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.fragment.FileFragment; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicInteger; + +public class DelimitedLineReader implements Closeable { + private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class); + private final static int DEFAULT_PAGE_SIZE = 128 * 1024; + + private FileSystem fs; + private FSDataInputStream fis; + private InputStream is; //decompressd stream + private CompressionCodecFactory factory; + private CompressionCodec codec; + private Decompressor decompressor; + + private long startOffset, end, pos; + private boolean eof = true; + private ByteBufLineReader lineReader; + private AtomicInteger tempReadBytes = new AtomicInteger(); + private FileFragment fragment; + private Configuration conf; + + public DelimitedLineReader(Configuration conf, final FileFragment fragment) throws IOException { + this.fragment = fragment; + this.conf = conf; + this.factory = new CompressionCodecFactory(conf); + this.codec = factory.getCodec(fragment.getPath()); + if (this.codec instanceof SplittableCompressionCodec) { + throw new NotImplementedException(); // bzip2 does not support multi-thread model + } + } + + public void init() throws IOException { + if (fs == null) { + fs = FileScanner.getFileSystem((TajoConf) conf, fragment.getPath()); + } + if (fis == null) fis = fs.open(fragment.getPath()); + pos = startOffset = fragment.getStartKey(); + end = startOffset + fragment.getEndKey(); + + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + is = new DataInputStream(codec.createInputStream(fis, decompressor)); + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, BufferPool.directBuffer(DEFAULT_PAGE_SIZE)); + } else { + fis.seek(startOffset); + is = fis; + + ByteBufInputChannel channel = new ByteBufInputChannel(is); + lineReader = new ByteBufLineReader(channel, + BufferPool.directBuffer((int) Math.min(DEFAULT_PAGE_SIZE, end))); + } + eof = false; + } + + public long getCompressedPosition() throws IOException { + long retVal; + if (isCompressed()) { + retVal = fis.getPos(); + } else { + retVal = pos; + } + return retVal; + } + + public long getUnCompressedPosition() throws IOException { + return pos; + } + + public long getReadBytes() { + return pos - startOffset; + } + + public boolean isReadable() { + return !eof; + } + + public ByteBuf readLine() throws IOException { + if (eof) { + return null; + } + + ByteBuf buf = lineReader.readLineBuf(tempReadBytes); + if (buf == null) { + eof = true; + } else { + pos += tempReadBytes.get(); + } + + if (!isCompressed() && getCompressedPosition() > end) { + eof = true; + } + return buf; + } + + public boolean isCompressed() { + return codec != null; + } + + @Override + public void close() throws IOException { + try { + IOUtils.cleanup(LOG, lineReader, is, fis); + fs = null; + is = null; + fis = null; + lineReader = null; + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java new file mode 100644 index 0000000..dbf8435 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -0,0 +1,483 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBuf; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; + +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; + +public class DelimitedTextFile { + + public static final byte LF = '\n'; + public static int EOF = -1; + + private static final Log LOG = LogFactory.getLog(DelimitedTextFile.class); + + public static class DelimitedTextFileAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; + private final int columnNum; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; + private char delimiter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + + private NonSyncByteArrayOutputStream os; + private FieldSerializerDeserializer serde; + + public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) + throws IOException { + super(conf, schema, meta, path); + this.fs = path.getFileSystem(conf); + this.meta = meta; + this.schema = schema; + this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + this.columnNum = schema.size(); + + String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if (compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + try { + // we need to discuss the De/Serializer interface. so custom serde is to disable + String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE, + TextFieldSerializerDeserializer.class.getName()); + serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass), conf); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + + if (os == null) { + os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + + + @Override + public void addTuple(Tuple tuple) throws IOException { + Datum datum; + int rowBytes = 0; + + for (int i = 0; i < columnNum; i++) { + datum = tuple.get(i); + rowBytes += serde.serialize(os, datum, schema.getColumn(i), i, nullChars); + + if (columnNum - 1 > i) { + os.write((byte) delimiter); + rowBytes += 1; + } + } + os.write(LF); + rowBytes += 1; + + pos += rowBytes; + bufferedBytes += rowBytes; + if (bufferedBytes > BUFFER_SIZE) { + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if (os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { + if(outputStream != null){ + flush(); + } + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if (deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class DelimitedTextFileScanner extends FileScanner { + + private boolean splittable = false; + private final long startOffset; + private final long endOffset; + + private int recordCount = 0; + private int[] targetColumnIndexes; + + private ByteBuf nullChars; + private FieldSerializerDeserializer serde; + private DelimitedLineReader reader; + private FieldSplitProcessor processor; + + public DelimitedTextFileScanner(Configuration conf, final Schema schema, final TableMeta meta, + final FileFragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + reader = new DelimitedLineReader(conf, fragment); + if (!reader.isCompressed()) { + splittable = true; + } + + startOffset = fragment.getStartKey(); + endOffset = startOffset + fragment.getEndKey(); + + //Delimiter + String delim = meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + this.processor = new FieldSplitProcessor(StringEscapeUtils.unescapeJava(delim).charAt(0)); + } + + @Override + public void init() throws IOException { + if (nullChars != null) { + nullChars.release(); + } + + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(StorageConstants.TEXT_NULL, + NullDatum.DEFAULT_TEXT)); + byte[] bytes; + if (StringUtils.isEmpty(nullCharacters)) { + bytes = NullDatum.get().asTextBytes(); + } else { + bytes = nullCharacters.getBytes(); + } + + nullChars = BufferPool.directBuffer(bytes.length, bytes.length); + nullChars.writeBytes(bytes); + + if (reader != null) { + reader.close(); + } + reader = new DelimitedLineReader(conf, fragment); + reader.init(); + recordCount = 0; + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + + try { + // we need to discuss the De/Serializer interface. so custom serde is to disable + String serdeClass = this.meta.getOption(StorageConstants.TEXTFILE_SERDE, + TextFieldSerializerDeserializer.class.getName()); + serde = (TextFieldSerializerDeserializer) ReflectionUtils.newInstance(Class.forName(serdeClass), conf); + } catch (Throwable e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset); + } + + if (startOffset > 0) { + reader.readLine(); // skip first line; + } + } + + public ByteBuf readLine() throws IOException { + ByteBuf buf = reader.readLine(); + if (buf == null) { + return null; + } else { + recordCount++; + } + + return buf; + } + + @Override + public float getProgress() { + try { + if (!reader.isReadable()) { + return 1.0f; + } + long filePos = reader.getCompressedPosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(endOffset - filePos, 0); + return Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { + try { + if (!reader.isReadable()) return null; + + ByteBuf buf = readLine(); + if (buf == null) return null; + + if (targets.length == 0) { + return EmptyTuple.get(); + } + + VTuple tuple = new VTuple(schema.size()); + fillTuple(schema, tuple, buf, targetColumnIndexes); + return tuple; + } catch (Throwable t) { + LOG.error("Tuple list current index: " + recordCount + " file offset:" + reader.getCompressedPosition(), t); + throw new IOException(t); + } + } + + private void fillTuple(Schema schema, Tuple dst, ByteBuf lineBuf, int[] target) throws IOException { + int[] projection = target; + if (lineBuf == null || target == null || target.length == 0) { + return; + } + + final int rowLength = lineBuf.readableBytes(); + int start = 0, fieldLength = 0, end = 0; + + //Projection + int currentTarget = 0; + int currentIndex = 0; + + while (end != -1) { + end = lineBuf.forEachByte(start, rowLength - start, processor); + + if (end < 0) { + fieldLength = rowLength - start; + } else { + fieldLength = end - start; + } + + if (projection.length > currentTarget && currentIndex == projection[currentTarget]) { + Datum datum = serde.deserialize(lineBuf.slice(start, fieldLength), + schema.getColumn(currentIndex), currentIndex, nullChars); + dst.put(currentIndex, datum); + currentTarget++; + } + + if (projection.length == currentTarget) { + break; + } + + start = end + 1; + currentIndex++; + } + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + try { + if (nullChars != null) { + nullChars.release(); + nullChars = null; + } + + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + if (LOG.isDebugEnabled()) { + LOG.debug("DelimitedTextFileScanner processed record:" + recordCount); + } + } finally { + IOUtils.cleanup(LOG, reader); + reader = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public boolean isSplittable() { + return splittable; + } + + @Override + public TableStats getInputStats() { + if (tableStats != null && reader != null) { + tableStats.setReadBytes(reader.getReadBytes()); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + tableStats.setNumBytes(fragment.getEndKey()); + } + return tableStats; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java new file mode 100644 index 0000000..a5ac142 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/FieldSplitProcessor.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class FieldSplitProcessor implements ByteBufProcessor { + private char delimiter; //the ascii separate character + + public FieldSplitProcessor(char recordDelimiterByte) { + this.delimiter = recordDelimiterByte; + } + + @Override + public boolean process(byte value) throws Exception { + return delimiter != value; + } + + public char getDelimiter() { + return delimiter; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java new file mode 100644 index 0000000..a130527 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/LineSplitProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import io.netty.buffer.ByteBufProcessor; + +public class LineSplitProcessor implements ByteBufProcessor { + public static final byte CR = '\r'; + public static final byte LF = '\n'; + private boolean prevCharCR = false; //true of prev char was CR + + @Override + public boolean process(byte value) throws Exception { + switch (value) { + case LF: + return false; + case CR: + prevCharCR = true; + return false; + default: + prevCharCR = false; + return true; + } + } + + public boolean isPrevCharCR() { + return prevCharCR; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java new file mode 100644 index 0000000..0057b54 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/TextFieldSerializerDeserializer.java @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.text; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import org.apache.commons.codec.binary.Base64; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.*; +import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; +import org.apache.tajo.storage.FieldSerializerDeserializer; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; +import java.io.OutputStream; + +//Compatibility with Apache Hive +public class TextFieldSerializerDeserializer implements FieldSerializerDeserializer { + public static final byte[] trueBytes = "true".getBytes(); + public static final byte[] falseBytes = "false".getBytes(); + private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); + + private static boolean isNull(ByteBuf val, ByteBuf nullBytes) { + return !val.isReadable() || nullBytes.equals(val); + } + + private static boolean isNullText(ByteBuf val, ByteBuf nullBytes) { + return val.readableBytes() > 0 && nullBytes.equals(val); + } + + @Override + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException { + byte[] bytes; + int length = 0; + TajoDataTypes.DataType dataType = col.getDataType(); + + if (datum == null || datum instanceof NullDatum) { + switch (dataType.getType()) { + case CHAR: + case TEXT: + length = nullChars.length; + out.write(nullChars); + break; + default: + break; + } + return length; + } + + switch (dataType.getType()) { + case BOOLEAN: + out.write(datum.asBool() ? trueBytes : falseBytes); + length = trueBytes.length; + break; + case CHAR: + byte[] pad = new byte[dataType.getLength() - datum.size()]; + bytes = datum.asTextBytes(); + out.write(bytes); + out.write(pad); + length = bytes.length + pad.length; + break; + case TEXT: + case BIT: + case INT2: + case INT4: + case INT8: + case FLOAT4: + case FLOAT8: + case INET4: + case DATE: + case INTERVAL: + bytes = datum.asTextBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIME: + bytes = ((TimeDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case TIMESTAMP: + bytes = ((TimestampDatum) datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); + length = bytes.length; + out.write(bytes); + break; + case INET6: + case BLOB: + bytes = Base64.encodeBase64(datum.asByteArray(), false); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobuf = (ProtobufDatum) datum; + byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); + length = protoBytes.length; + out.write(protoBytes, 0, protoBytes.length); + break; + case NULL_TYPE: + default: + break; + } + return length; + } + + @Override + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException{ + Datum datum; + TajoDataTypes.Type type = col.getDataType().getType(); + boolean nullField; + if (type == TajoDataTypes.Type.TEXT || type == TajoDataTypes.Type.CHAR) { + nullField = isNullText(buf, nullChars); + } else { + nullField = isNull(buf, nullChars); + } + + if (nullField) { + datum = NullDatum.get(); + } else { + switch (type) { + case BOOLEAN: + byte bool = buf.readByte(); + datum = DatumFactory.createBool(bool == 't' || bool == 'T'); + break; + case BIT: + datum = DatumFactory.createBit(Byte.parseByte(buf.toString(CharsetUtil.UTF_8))); + break; + case CHAR: + datum = DatumFactory.createChar(buf.toString(CharsetUtil.UTF_8).trim()); + break; + case INT1: + case INT2: { + //TODO zero-copy + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + } + case INT4: { + //TODO zero-copy + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + } + case INT8: + //TODO zero-copy + datum = DatumFactory.createInt8(buf.toString(CharsetUtil.UTF_8)); + break; + case FLOAT4: + //TODO zero-copy + datum = DatumFactory.createFloat4(buf.toString(CharsetUtil.UTF_8)); + break; + case FLOAT8: { + //TODO zero-copy + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length)); + break; + } + case TEXT: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createText(bytes); + break; + } + case DATE: + datum = DatumFactory.createDate(buf.toString(CharsetUtil.UTF_8)); + break; + case TIME: + datum = DatumFactory.createTime(buf.toString(CharsetUtil.UTF_8)); + break; + case TIMESTAMP: + datum = DatumFactory.createTimestamp(buf.toString(CharsetUtil.UTF_8)); + break; + case INTERVAL: + datum = DatumFactory.createInterval(buf.toString(CharsetUtil.UTF_8)); + break; + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); + Message.Builder builder = factory.newBuilder(); + try { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + protobufJsonFormat.merge(bytes, builder); + datum = factory.createDatum(builder.build()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + break; + } + case INET4: + datum = DatumFactory.createInet4(buf.toString(CharsetUtil.UTF_8)); + break; + case BLOB: { + byte[] bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + datum = DatumFactory.createBlob(Base64.decodeBase64(bytes)); + break; + } + default: + datum = NullDatum.get(); + break; + } + } + return datum; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml index 4669477..f262585 100644 --- a/tajo-storage/src/main/resources/storage-default.xml +++ b/tajo-storage/src/main/resources/storage-default.xml @@ -21,11 +21,6 @@ <configuration> <property> - <name>tajo.storage.manager.v2</name> - <value>false</value> - </property> - - <property> <name>tajo.storage.manager.maxReadBytes</name> <value>8388608</value> <description></description> @@ -40,11 +35,15 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> </property> <!--- Fragment Class Configurations --> <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.csv.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -79,13 +78,13 @@ <!--- Scanner Handler --> <property> - <name>tajo.storage.scanner-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.csv.class</name> - <value>org.apache.tajo.storage.v2.CSVFileScanner</value> + <name>tajo.storage.scanner-handler.csv.class</name> + <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> </property> <property> @@ -94,74 +93,44 @@ </property> <property> - <name>tajo.storage.scanner-handler.v2.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.rcfile.class</name> <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.rcfile.class</name> - <value>org.apache.tajo.storage.v2.RCFileScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.rowfile.class</name> <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.trevni.class</name> <value>org.apache.tajo.storage.trevni.TrevniScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.trevni.class</name> - <value>org.apache.tajo.storage.trevni.TrevniScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.parquet.class</name> <value>org.apache.tajo.storage.parquet.ParquetScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.sequencefile.class</name> <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> </property> <property> - <name>tajo.storage.scanner-handler.v2.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.avro.class</name> <value>org.apache.tajo.storage.avro.AvroScanner</value> </property> + <!--- Appender Handler --> <property> - <name>tajo.storage.scanner-handler.v2.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroScanner</value> + <name>tajo.storage.appender-handler</name> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> </property> - <!--- Appender Handler --> <property> - <name>tajo.storage.appender-handler</name> - <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> </property> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 212f374..fd5a63e 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -37,6 +37,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.sequencefile.SequenceFileScanner; +import org.apache.tajo.storage.text.DelimitedTextFile; import org.apache.tajo.util.CommonTestingUtil; import org.junit.Test; import org.junit.runner.RunWith; @@ -70,7 +71,8 @@ public class TestCompressionStorages { return Arrays.asList(new Object[][]{ {StoreType.CSV}, {StoreType.RCFILE}, - {StoreType.SEQUENCEFILE} + {StoreType.SEQUENCEFILE}, + {StoreType.TEXTFILE} }); } @@ -102,81 +104,11 @@ public class TestCompressionStorages { } @Test - public void testBzip2CodecCompressionData() throws IOException { - storageCompressionTest(storeType, BZip2Codec.class); - } - - @Test public void testLz4CodecCompressionData() throws IOException { if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded()) storageCompressionTest(storeType, Lz4Codec.class); } - @Test - public void testSplitCompressionData() throws IOException { - if(StoreType.CSV != storeType) return; - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT8); - - TableMeta meta = CatalogUtil.newTableMeta(storeType); - meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName()); - - Path tablePath = new Path(testDir, "SplitCompression"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath); - appender.enableStats(); - appender.init(); - - String extention = ""; - if (appender instanceof CSVFile.CSVAppender) { - extention = ((CSVFile.CSVAppender) appender).getExtension(); - } - - int tupleNum = 100000; - VTuple vTuple; - - for (int i = 0; i < tupleNum; i++) { - vTuple = new VTuple(2); - vTuple.put(0, DatumFactory.createInt4(i + 1)); - vTuple.put(1, DatumFactory.createInt8(25l)); - appender.addTuple(vTuple); - } - appender.close(); - - TableStats stat = appender.getStats(); - assertEquals(tupleNum, stat.getNumRows().longValue()); - tablePath = tablePath.suffix(extention); - - FileStatus status = fs.getFileStatus(tablePath); - long fileLen = status.getLen(); - long randomNum = (long) (Math.random() * fileLen) + 1; - - FileFragment[] tablets = new FileFragment[2]; - tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum); - tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum)); - - Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - int tupleCnt = 0; - Tuple tuple; - while ((tuple = scanner.next()) != null) { - tupleCnt++; - } - scanner.close(); - - scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema); - assertTrue(scanner.isSplittable()); - scanner.init(); - while ((tuple = scanner.next()) != null) { - tupleCnt++; - } - - scanner.close(); - assertEquals(tupleNum, tupleCnt); - } - private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException { Schema schema = new Schema(); schema.addColumn("id", Type.INT4); @@ -199,6 +131,8 @@ public class TestCompressionStorages { String extension = ""; if (appender instanceof CSVFile.CSVAppender) { extension = ((CSVFile.CSVAppender) appender).getExtension(); + } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { + extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); } int tupleNum = 100000; http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java new file mode 100644 index 0000000..ef6efdf --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.compress.DeflateCodec; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.text.ByteBufLineReader; +import org.apache.tajo.storage.text.DelimitedTextFile; +import org.apache.tajo.storage.text.DelimitedLineReader; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +public class TestLineReader { + private static String TEST_PATH = "target/test-data/TestLineReader"; + + @Test + public void testByteBufLineReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + Path tablePath = new Path(testDir, "line.data"); + FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + } + appender.close(); + + FileStatus status = fs.getFileStatus(tablePath); + + ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); + assertEquals(status.getLen(), channel.available()); + ByteBufLineReader reader = new ByteBufLineReader(channel); + assertEquals(status.getLen(), reader.available()); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + if(buf == null) break; + + totalRead += bytes.get(); + i++; + } + IOUtils.cleanup(null, reader, channel, fs); + assertEquals(tupleNum, i); + assertEquals(status.getLen(), totalRead); + assertEquals(status.getLen(), reader.readBytes()); + } + + @Test + public void testLineDelimitedReader() throws IOException { + TajoConf conf = new TajoConf(); + Path testDir = CommonTestingUtil.getTestDir(TEST_PATH); + FileSystem fs = testDir.getFileSystem(conf); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("comment", Type.TEXT); + schema.addColumn("comment2", Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta(StoreType.TEXTFILE); + meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); + + Path tablePath = new Path(testDir, "line1." + DeflateCodec.class.getSimpleName()); + FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.enableStats(); + appender.init(); + int tupleNum = 10000; + VTuple vTuple; + + long splitOffset = 0; + for (int i = 0; i < tupleNum; i++) { + vTuple = new VTuple(4); + vTuple.put(0, DatumFactory.createInt4(i + 1)); + vTuple.put(1, DatumFactory.createInt8(25l)); + vTuple.put(2, DatumFactory.createText("emiya muljomdao")); + vTuple.put(3, NullDatum.get()); + appender.addTuple(vTuple); + + if(i == (tupleNum / 2)){ + splitOffset = appender.getOffset(); + } + } + String extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); + appender.close(); + + tablePath = tablePath.suffix(extension); + FileFragment fragment = new FileFragment("table", tablePath, 0, splitOffset); + DelimitedLineReader reader = new DelimitedLineReader(conf, fragment); // if file is compressed, will read to EOF + assertTrue(reader.isCompressed()); + assertFalse(reader.isReadable()); + reader.init(); + assertTrue(reader.isReadable()); + + + int i = 0; + while(reader.isReadable()){ + ByteBuf buf = reader.readLine(); + if(buf == null) break; + i++; + } + + IOUtils.cleanup(null, reader, fs); + assertEquals(tupleNum, i); + + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java new file mode 100644 index 0000000..12ea551 --- /dev/null +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestSplitProcessor.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.CharsetUtil; +import org.apache.tajo.storage.text.FieldSplitProcessor; +import org.apache.tajo.storage.text.LineSplitProcessor; +import org.junit.Test; + +import java.io.IOException; + +import static io.netty.util.ReferenceCountUtil.releaseLater; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSplitProcessor { + + @Test + public void testFieldSplitProcessor() throws IOException { + String data = "abc||de"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + FieldSplitProcessor processor = new FieldSplitProcessor('|'); + + assertEquals(3, buf.forEachByte(0, len, processor)); + assertEquals(4, buf.forEachByte(4, len - 4, processor)); + assertEquals(-1, buf.forEachByte(5, len - 5, processor)); + + } + + @Test + public void testLineSplitProcessor() throws IOException { + String data = "abc\r\n\n"; + final ByteBuf buf = releaseLater( + Unpooled.copiedBuffer(data, CharsetUtil.ISO_8859_1)); + + final int len = buf.readableBytes(); + LineSplitProcessor processor = new LineSplitProcessor(); + + //find CR + assertEquals(3, buf.forEachByte(0, len, processor)); + + // find CRLF + assertEquals(4, buf.forEachByte(4, len - 4, processor)); + assertEquals(buf.getByte(4), '\n'); + // need to skip LF + assertTrue(processor.isPrevCharCR()); + + // find LF + assertEquals(5, buf.forEachByte(5, len - 5, processor)); //line length is zero + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index dca21af..56cef77 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -121,12 +121,14 @@ public class TestStorages { @Parameterized.Parameters public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][] { + //type, splitable, statsable, seekable {StoreType.CSV, true, true, true}, {StoreType.RAW, false, false, true}, {StoreType.RCFILE, true, true, false}, {StoreType.PARQUET, false, false, false}, {StoreType.SEQUENCEFILE, true, true, false}, {StoreType.AVRO, false, false, false}, + {StoreType.TEXTFILE, true, true, false}, }); } @@ -381,7 +383,7 @@ public class TestStorages { KeyValueSet options = new KeyValueSet(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); meta.setOptions(CatalogUtil.newPhysicalProperties(storeType)); - meta.putOption(StorageConstants.CSVFILE_NULL, "\\\\N"); + meta.putOption(StorageConstants.TEXT_NULL, "\\\\N"); meta.putOption(StorageConstants.RCFILE_NULL, "\\\\N"); meta.putOption(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); meta.putOption(StorageConstants.SEQUENCEFILE_NULL, "\\"); http://git-wip-us.apache.org/repos/asf/tajo/blob/3e305b15/tajo-storage/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml index 6bfc902..a81f3d6 100644 --- a/tajo-storage/src/test/resources/storage-default.xml +++ b/tajo-storage/src/test/resources/storage-default.xml @@ -28,11 +28,15 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> </property> <!--- Fragment Class Configurations --> <property> + <name>tajo.storage.fragment.textfile.class</name> + <value>org.apache.tajo.storage.fragment.FileFragment</value> + </property> + <property> <name>tajo.storage.fragment.csv.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -67,6 +71,11 @@ <!--- Scanner Handler --> <property> + <name>tajo.storage.scanner-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> + </property> + + <property> <name>tajo.storage.scanner-handler.csv.class</name> <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> </property> @@ -109,7 +118,12 @@ <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> - <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + </property> + + <property> + <name>tajo.storage.appender-handler.textfile.class</name> + <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> </property> <property>
