Repository: tajo Updated Branches: refs/heads/master 83228ce67 -> 6d852081e
http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java deleted file mode 100644 index 4f58e68..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * Line reader for compressed splits - * - * Reading records from a compressed split is tricky, as the - * LineRecordReader is using the reported compressed input stream - * position directly to determine when a split has ended. In addition the - * compressed input stream is usually faking the actual byte position, often - * updating it only after the first compressed block after the split is - * accessed. - * - * Depending upon where the last compressed block of the split ends relative - * to the record delimiters it can be easy to accidentally drop the last - * record or duplicate the last record between this split and the next. - * - * Split end scenarios: - * - * 1) Last block of split ends in the middle of a record - * Nothing special that needs to be done here, since the compressed input - * stream will report a position after the split end once the record - * is fully read. The consumer of the next split will discard the - * partial record at the start of the split normally, and no data is lost - * or duplicated between the splits. - * - * 2) Last block of split ends in the middle of a delimiter - * The line reader will continue to consume bytes into the next block to - * locate the end of the delimiter. If a custom delimiter is being used - * then the next record must be read by this split or it will be dropped. - * The consumer of the next split will not recognize the partial - * delimiter at the beginning of its split and will discard it along with - * the next record. - * - * However for the default delimiter processing there is a special case - * because CR, LF, and CRLF are all valid record delimiters. If the - * block ends with a CR then the reader must peek at the next byte to see - * if it is an LF and therefore part of the same record delimiter. - * Peeking at the next byte is an access to the next block and triggers - * the stream to report the end of the split. There are two cases based - * on the next byte: - * - * A) The next byte is LF - * The split needs to end after the current record is returned. The - * consumer of the next split will discard the first record, which - * is degenerate since LF is itself a delimiter, and start consuming - * records after that byte. If the current split tries to read - * another record then the record will be duplicated between splits. - * - * B) The next byte is not LF - * The current record will be returned but the stream will report - * the split has ended due to the peek into the next block. If the - * next record is not read then it will be lost, as the consumer of - * the next split will discard it before processing subsequent - * records. Therefore the next record beyond the reported split end - * must be consumed by this split to avoid data loss. - * - * 3) Last block of split ends at the beginning of a delimiter - * This is equivalent to case 1, as the reader will consume bytes into - * the next block and trigger the end of the split. No further records - * should be read as the consumer of the next split will discard the - * (degenerate) record at the beginning of its split. - * - * 4) Last block of split ends at the end of a delimiter - * Nothing special needs to be done here. The reader will not start - * examining the bytes into the next block until the next record is read, - * so the stream will not report the end of the split just yet. Once the - * next record is read then the next block will be accessed and the - * stream will indicate the end of the split. The consumer of the next - * split will correctly discard the first record of its split, and no - * data is lost or duplicated. - * - * If the default delimiter is used and the block ends at a CR then this - * is treated as case 2 since the reader does not yet know without - * looking at subsequent bytes whether the delimiter has ended. - * - * NOTE: It is assumed that compressed input streams *never* return bytes from - * multiple compressed blocks from a single read. Failure to do so will - * violate the buffering performed by this class, as it will access - * bytes into the next block after the split before returning all of the - * records from the previous block. - */ - -public class CompressedSplitLineReader extends SplitLineReader { - SplitCompressionInputStream scin; - private boolean usingCRLF; - private boolean needAdditionalRecord = false; - private boolean finished = false; - - public CompressedSplitLineReader(SplitCompressionInputStream in, - Configuration conf, - byte[] recordDelimiterBytes) - throws IOException { - super(in, conf, recordDelimiterBytes); - scin = in; - usingCRLF = (recordDelimiterBytes == null); - } - - @Override - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - int bytesRead = in.read(buffer); - - // If the split ended in the middle of a record delimiter then we need - // to read one additional record, as the consumer of the next split will - // not recognize the partial delimiter as a record. - // However if using the default delimiter and the next character is a - // linefeed then next split will treat it as a delimiter all by itself - // and the additional record read should not be performed. - if (inDelimiter && bytesRead > 0) { - if (usingCRLF) { - needAdditionalRecord = (buffer[0] != '\n'); - } else { - needAdditionalRecord = true; - } - } - return bytesRead; - } - - @Override - public int readLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength - , int maxBytesToConsume) throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public boolean needAdditionalRecordAfterSplit() { - return !finished && needAdditionalRecord; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java deleted file mode 100644 index 0f31baf..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java +++ /dev/null @@ -1,559 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * A class that provides a line reader from an input stream. - * Depending on the constructor used, lines will either be terminated by: - * <ul> - * <li>one of the following: '\n' (LF) , '\r' (CR), - * or '\r\n' (CR+LF).</li> - * <li><em>or</em>, a custom byte sequence delimiter</li> - * </ul> - * In both cases, EOF also terminates an otherwise unterminated - * line. - */ - -public class LineReader implements Closeable { - private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; - private int bufferSize = DEFAULT_BUFFER_SIZE; - private InputStream in; - private byte[] buffer; - // the number of bytes of real data in the buffer - private int bufferLength = 0; - // the current position in the buffer - private int bufferPosn = 0; - - private static final byte CR = '\r'; - private static final byte LF = '\n'; - - // The line delimiter - private final byte[] recordDelimiterBytes; - - /** - * Create a line reader that reads from the given stream using the - * default buffer-size (64k). - * - * @param in The input stream - * @throws java.io.IOException - */ - public LineReader(InputStream in) { - this(in, DEFAULT_BUFFER_SIZE); - } - - /** - * Create a line reader that reads from the given stream using the - * given buffer-size. - * - * @param in The input stream - * @param bufferSize Size of the read buffer - * @throws java.io.IOException - */ - public LineReader(InputStream in, int bufferSize) { - this.in = in; - this.bufferSize = bufferSize; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = null; - } - - /** - * Create a line reader that reads from the given stream using the - * <code>io.file.buffer.size</code> specified in the given - * <code>Configuration</code>. - * - * @param in input stream - * @param conf configuration - * @throws java.io.IOException - */ - public LineReader(InputStream in, Configuration conf) throws IOException { - this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); - } - - /** - * Create a line reader that reads from the given stream using the - * default buffer-size, and using a custom delimiter of array of - * bytes. - * - * @param in The input stream - * @param recordDelimiterBytes The delimiter - */ - public LineReader(InputStream in, byte[] recordDelimiterBytes) { - this.in = in; - this.bufferSize = DEFAULT_BUFFER_SIZE; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - /** - * Create a line reader that reads from the given stream using the - * given buffer-size, and using a custom delimiter of array of - * bytes. - * - * @param in The input stream - * @param bufferSize Size of the read buffer - * @param recordDelimiterBytes The delimiter - * @throws java.io.IOException - */ - public LineReader(InputStream in, int bufferSize, - byte[] recordDelimiterBytes) { - this.in = in; - this.bufferSize = bufferSize; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - /** - * Create a line reader that reads from the given stream using the - * <code>io.file.buffer.size</code> specified in the given - * <code>Configuration</code>, and using a custom delimiter of array of - * bytes. - * - * @param in input stream - * @param conf configuration - * @param recordDelimiterBytes The delimiter - * @throws java.io.IOException - */ - public LineReader(InputStream in, Configuration conf, - byte[] recordDelimiterBytes) throws IOException { - this.in = in; - this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - - /** - * Close the underlying stream. - * - * @throws java.io.IOException - */ - public void close() throws IOException { - in.close(); - } - - public void reset() { - bufferLength = 0; - bufferPosn = 0; - - } - - /** - * Read one line from the InputStream into the given Text. - * - * @param str the object to store the given line (without newline) - * @param maxLineLength the maximum number of bytes to store into str; - * the rest of the line is silently discarded. - * @param maxBytesToConsume the maximum number of bytes to consume - * in this call. This is only a hint, because if the line cross - * this threshold, we allow it to happen. It can overshoot - * potentially by as much as one buffer length. - * @return the number of bytes read including the (longest) newline - * found. - * @throws java.io.IOException if the underlying stream throws - */ - public int readLine(Text str, int maxLineLength, - int maxBytesToConsume) throws IOException { - if (this.recordDelimiterBytes != null) { - return readCustomLine(str, maxLineLength, maxBytesToConsume); - } else { - return readDefaultLine(str, maxLineLength, maxBytesToConsume); - } - } - - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - return in.read(buffer); - } - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - /* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - */ - str.clear(); - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = fillBuffer(in, buffer, prevCharCR); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); - - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - return (int) bytesConsumed; - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength - , int maxBytesToConsume) - throws IOException { - /* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - */ - - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = fillBuffer(in, buffer, prevCharCR); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - str.write(buffer, startPosn, appendLength); - txtLength += appendLength; - } - } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); - - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - - if (bytesConsumed > 0) offsets.add(txtLength); - return (int) bytesConsumed; - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - -/* int validIdx = 0; - public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets, - long pos, int maxLineLength, int maxBytesToConsume) - throws IOException { - *//* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - *//* - //str.clear(); - str.reset(); - offsets.clear(); - foffsets.clear(); - - validIdx = 0; - long bufferBytesConsumed = 0; - - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = in.read(buffer); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - - if (appendLength > 0) { - str.write(buffer, startPosn, appendLength); - //System.out.println(startPosn + "," + appendLength); - //str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - - if(newlineLength > 0){ - validIdx++; - - if (bytesConsumed > (long)Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - offsets.add(txtLength); - foffsets.add(pos); - pos+= bytesConsumed; - bufferBytesConsumed += bytesConsumed; - - txtLength = 0; - newlineLength = 0; - prevCharCR = false; //true of prev char was CR - bytesConsumed = 0; - } else { - bufferBytesConsumed += bytesConsumed; - bytesConsumed = 0; - } - } while ((bufferBytesConsumed < 256 * 1024)); - - return (int)bufferBytesConsumed; - }*/ - - /** - * Read a line terminated by a custom delimiter. - */ - private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - /* We're reading data from inputStream, but the head of the stream may be - * already captured in the previous buffer, so we have several cases: - * - * 1. The buffer tail does not contain any character sequence which - * matches with the head of delimiter. We count it as a - * ambiguous byte count = 0 - * - * 2. The buffer tail contains a X number of characters, - * that forms a sequence, which matches with the - * head of delimiter. We count ambiguous byte count = X - * - * // *** eg: A segment of input file is as follows - * - * " record 1792: I found this bug very interesting and - * I have completely read about it. record 1793: This bug - * can be solved easily record 1794: This ." - * - * delimiter = "record"; - * - * supposing:- String at the end of buffer = - * "I found this bug very interesting and I have completely re" - * There for next buffer = "ad about it. record 179 ...." - * - * The matching characters in the input - * buffer tail and delimiter head = "re" - * Therefore, ambiguous byte count = 2 **** // - * - * 2.1 If the following bytes are the remaining characters of - * the delimiter, then we have to capture only up to the starting - * position of delimiter. That means, we need not include the - * ambiguous characters in str. - * - * 2.2 If the following bytes are not the remaining characters of - * the delimiter ( as mentioned in the example ), - * then we have to include the ambiguous characters in str. - */ - str.clear(); - int txtLength = 0; // tracks str.getLength(), as an optimization - long bytesConsumed = 0; - int delPosn = 0; - int ambiguousByteCount = 0; // To capture the ambiguous characters count - do { - int startPosn = bufferPosn; // Start from previous end position - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); - if (bufferLength <= 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { - if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { - delPosn++; - if (delPosn >= recordDelimiterBytes.length) { - bufferPosn++; - break; - } - } else if (delPosn != 0) { - bufferPosn--; - delPosn = 0; - } - } - int readLength = bufferPosn - startPosn; - bytesConsumed += readLength; - int appendLength = readLength - delPosn; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - if (ambiguousByteCount > 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - //appending the ambiguous characters (refer case 2.2) - bytesConsumed += ambiguousByteCount; - ambiguousByteCount = 0; - } - str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - if (bufferPosn >= bufferLength) { - if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { - ambiguousByteCount = delPosn; - bytesConsumed -= ambiguousByteCount; //to be consumed in next - } - } - } while (delPosn < recordDelimiterBytes.length - && bytesConsumed < maxBytesToConsume); - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before delimiter: " + bytesConsumed); - } - return (int) bytesConsumed; - } - - /** - * Read from the InputStream into the given Text. - * - * @param str the object to store the given line - * @param maxLineLength the maximum number of bytes to store into str. - * @return the number of bytes read including the newline - * @throws java.io.IOException if the underlying stream throws - */ - public int readLine(Text str, int maxLineLength) throws IOException { - return readLine(str, maxLineLength, Integer.MAX_VALUE); - } - - /** - * Read from the InputStream into the given Text. - * - * @param str the object to store the given line - * @return the number of bytes read including the newline - * @throws java.io.IOException if the underlying stream throws - */ - public int readLine(Text str) throws IOException { - return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java deleted file mode 100644 index 3579674..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.InputStream; - -public class SplitLineReader extends LineReader { - public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) { - super(in, recordDelimiterBytes); - } - - public SplitLineReader(InputStream in, Configuration conf, - byte[] recordDelimiterBytes) throws IOException { - super(in, conf, recordDelimiterBytes); - } - - public boolean needAdditionalRecordAfterSplit() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java index ab1c808..5f94aae 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -24,9 +24,8 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.storage.StorageFragmentProtos.*; +import org.apache.tajo.BuiltinStorages; +import org.apache.tajo.storage.StorageFragmentProtos.FileFragmentProto; import org.apache.tajo.util.TUtil; import java.io.IOException; @@ -231,7 +230,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); fragmentBuilder.setId(this.tableName); - fragmentBuilder.setStoreType(CatalogUtil.getStoreTypeString(StoreType.CSV)); + fragmentBuilder.setStoreType(BuiltinStorages.TEXT); fragmentBuilder.setContents(builder.buildPartial().toByteString()); return fragmentBuilder.build(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index f50a20d..67d0646 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -68,10 +68,9 @@ public class TestCompressionStorages { @Parameterized.Parameters public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][]{ - {"CSV"}, + {"TEXT"}, {"RCFILE"}, - {"SEQUENCEFILE"}, - {"TEXT"} + {"SEQUENCEFILE"} }); } @@ -128,9 +127,7 @@ public class TestCompressionStorages { appender.init(); String extension = ""; - if (appender instanceof CSVFile.CSVAppender) { - extension = ((CSVFile.CSVAppender) appender).getExtension(); - } else if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { + if (appender instanceof DelimitedTextFile.DelimitedTextFileAppender) { extension = ((DelimitedTextFile.DelimitedTextFileAppender) appender).getExtension(); } @@ -155,14 +152,6 @@ public class TestCompressionStorages { tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema); - - if (storeType.equalsIgnoreCase("CSV")) { - if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { - assertTrue(scanner.isSplittable()); - } else { - assertFalse(scanner.isSplittable()); - } - } scanner.init(); if (storeType.equalsIgnoreCase("SEQUENCEFILE")) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index 1119968..e9e44b5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -102,7 +102,7 @@ public class TestFileSystems { schema.addColumn("age", Type.INT4); schema.addColumn("name", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); + TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Tuple[] tuples = new Tuple[4]; for (int i = 0; i < tuples.length; i++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java index 09b91ea..37fbfe4 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -69,7 +69,7 @@ public class TestFileTablespace { schema.addColumn("age",Type.INT4); schema.addColumn("name",Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); + TableMeta meta = CatalogUtil.newTableMeta("TEXT"); VTuple[] tuples = new VTuple[4]; for(int i=0; i < tuples.length; i++) { @@ -136,7 +136,7 @@ public class TestFileTablespace { schema.addColumn("id", Type.INT4); schema.addColumn("age",Type.INT4); schema.addColumn("name",Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); + TableMeta meta = CatalogUtil.newTableMeta("TEXT"); List<Fragment> splits = Lists.newArrayList(); // Get FileFragments in partition batch @@ -193,7 +193,7 @@ public class TestFileTablespace { schema.addColumn("id", Type.INT4); schema.addColumn("age", Type.INT4); schema.addColumn("name", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); + TableMeta meta = CatalogUtil.newTableMeta("TEXT"); List<Fragment> splits = Lists.newArrayList(); splits.addAll(sm.getSplits("data", meta, schema, tablePath)); http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 0ba75dd..4faf8b9 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -76,7 +76,7 @@ public class TestMergeScanner { @Parameters public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][] { - {"CSV"}, + {"TEXT"}, {"RAW"}, {"RCFILE"}, {"PARQUET"}, @@ -200,7 +200,7 @@ public class TestMergeScanner { private static boolean isProjectableStorage(String type) { if (type.equalsIgnoreCase("RCFILE") || type.equalsIgnoreCase("PARQUET") || - type.equalsIgnoreCase("CSV") || + type.equalsIgnoreCase("TEXT") || type.equalsIgnoreCase("SEQUENCEFILE") || type.equalsIgnoreCase("AVRO")) { return true; http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index c3f31a0..afe0f13 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -135,7 +135,6 @@ public class TestStorages { public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][] { //type, splitable, statsable, seekable - {"CSV", true, true, true}, {"RAW", false, true, true}, {"RCFILE", true, true, false}, {"PARQUET", false, false, false}, @@ -776,7 +775,7 @@ public class TestStorages { @Test public void testTime() throws IOException { - if (storeType.equalsIgnoreCase("CSV") || storeType.equalsIgnoreCase("RAW")) { + if (storeType.equalsIgnoreCase("TEXT") || storeType.equalsIgnoreCase("RAW")) { Schema schema = new Schema(); schema.addColumn("col1", Type.DATE); schema.addColumn("col2", Type.TIME); @@ -1021,7 +1020,7 @@ public class TestStorages { @Test public final void testInsertFixedCharTypeWithOverSize() throws Exception { - if (storeType.equalsIgnoreCase("CSV") == false && + if (storeType.equalsIgnoreCase("TEXT") == false && storeType.equalsIgnoreCase("SEQUENCEFILE") == false && storeType.equalsIgnoreCase("RCFILE") == false && storeType.equalsIgnoreCase("PARQUET") == false) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index 22fb607..ed010c0 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -72,7 +72,6 @@ public class TestBSTIndex { @Parameterized.Parameters public static Collection<Object[]> generateParameters() { return Arrays.asList(new Object[][]{ - {"CSV"}, {"RAW"}, {"TEXT"} }); http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 72810fd..c198965 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -21,6 +21,7 @@ package org.apache.tajo.storage.index; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.catalog.*; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -37,7 +38,6 @@ import org.junit.Test; import java.io.IOException; -import static org.apache.tajo.storage.CSVFile.CSVScanner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -72,7 +72,7 @@ public class TestSingleCSVFileBSTIndex { @Test public void testFindValueInSingleCSV() throws IOException { - meta = CatalogUtil.newTableMeta("CSV"); + meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT); Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); fs.mkdirs(tablePath.getParent()); @@ -111,7 +111,7 @@ public class TestSingleCSVFileBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + SeekableScanner fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -135,7 +135,7 @@ public class TestSingleCSVFileBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp); reader.open(); - fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); fileScanner.init(); for (int i = 0; i < TUPLE_NUM - 1; i++) { tuple.put(0, DatumFactory.createInt8(i)); @@ -161,7 +161,7 @@ public class TestSingleCSVFileBSTIndex { @Test public void testFindNextKeyValueInSingleCSV() throws IOException { - meta = CatalogUtil.newTableMeta("CSV"); + meta = CatalogUtil.newTableMeta(BuiltinStorages.TEXT); Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", "table1.csv"); @@ -200,7 +200,7 @@ public class TestSingleCSVFileBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner fileScanner = new CSVScanner(conf, schema, meta, tablet); + SeekableScanner fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); fileScanner.init(); Tuple keyTuple; long offset; @@ -221,7 +221,7 @@ public class TestSingleCSVFileBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); reader.open(); - fileScanner = new CSVScanner(conf, schema, meta, tablet); + fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema); fileScanner.init(); Tuple result; for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6d852081/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml index 6a9e7ce..2de1617 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-hdfs/src/test/resources/storage-default.xml @@ -38,7 +38,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>text,csv,json,raw,rcfile,row,parquet,sequencefile,avro</value> + <value>text,json,raw,rcfile,row,parquet,sequencefile,avro</value> </property> <!--- Fragment Class Configurations --> @@ -47,10 +47,6 @@ <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> <property> - <name>tajo.storage.fragment.csv.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> <name>tajo.storage.fragment.json.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> @@ -86,11 +82,6 @@ </property> <property> - <name>tajo.storage.scanner-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> - </property> - - <property> <name>tajo.storage.scanner-handler.json.class</name> <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> </property> @@ -128,7 +119,7 @@ <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> - <value>text,csv,raw,rcfile,row,parquet,sequencefile,avro</value> + <value>text,raw,rcfile,row,parquet,sequencefile,avro</value> </property> <property> @@ -137,11 +128,6 @@ </property> <property> - <name>tajo.storage.appender-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> - </property> - - <property> <name>tajo.storage.appender-handler.json.class</name> <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> </property>
