http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java deleted file mode 100644 index 66c610a..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/LineReader.java +++ /dev/null @@ -1,559 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * A class that provides a line reader from an input stream. - * Depending on the constructor used, lines will either be terminated by: - * <ul> - * <li>one of the following: '\n' (LF) , '\r' (CR), - * or '\r\n' (CR+LF).</li> - * <li><em>or</em>, a custom byte sequence delimiter</li> - * </ul> - * In both cases, EOF also terminates an otherwise unterminated - * line. - */ - -public class LineReader implements Closeable { - private static final int DEFAULT_BUFFER_SIZE = 64 * 1024; - private int bufferSize = DEFAULT_BUFFER_SIZE; - private InputStream in; - private byte[] buffer; - // the number of bytes of real data in the buffer - private int bufferLength = 0; - // the current position in the buffer - private int bufferPosn = 0; - - private static final byte CR = '\r'; - private static final byte LF = '\n'; - - // The line delimiter - private final byte[] recordDelimiterBytes; - - /** - * Create a line reader that reads from the given stream using the - * default buffer-size (64k). - * - * @param in The input stream - * @throws IOException - */ - public LineReader(InputStream in) { - this(in, DEFAULT_BUFFER_SIZE); - } - - /** - * Create a line reader that reads from the given stream using the - * given buffer-size. - * - * @param in The input stream - * @param bufferSize Size of the read buffer - * @throws IOException - */ - public LineReader(InputStream in, int bufferSize) { - this.in = in; - this.bufferSize = bufferSize; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = null; - } - - /** - * Create a line reader that reads from the given stream using the - * <code>io.file.buffer.size</code> specified in the given - * <code>Configuration</code>. - * - * @param in input stream - * @param conf configuration - * @throws IOException - */ - public LineReader(InputStream in, Configuration conf) throws IOException { - this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE)); - } - - /** - * Create a line reader that reads from the given stream using the - * default buffer-size, and using a custom delimiter of array of - * bytes. - * - * @param in The input stream - * @param recordDelimiterBytes The delimiter - */ - public LineReader(InputStream in, byte[] recordDelimiterBytes) { - this.in = in; - this.bufferSize = DEFAULT_BUFFER_SIZE; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - /** - * Create a line reader that reads from the given stream using the - * given buffer-size, and using a custom delimiter of array of - * bytes. - * - * @param in The input stream - * @param bufferSize Size of the read buffer - * @param recordDelimiterBytes The delimiter - * @throws IOException - */ - public LineReader(InputStream in, int bufferSize, - byte[] recordDelimiterBytes) { - this.in = in; - this.bufferSize = bufferSize; - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - /** - * Create a line reader that reads from the given stream using the - * <code>io.file.buffer.size</code> specified in the given - * <code>Configuration</code>, and using a custom delimiter of array of - * bytes. - * - * @param in input stream - * @param conf configuration - * @param recordDelimiterBytes The delimiter - * @throws IOException - */ - public LineReader(InputStream in, Configuration conf, - byte[] recordDelimiterBytes) throws IOException { - this.in = in; - this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); - this.buffer = new byte[this.bufferSize]; - this.recordDelimiterBytes = recordDelimiterBytes; - } - - - /** - * Close the underlying stream. - * - * @throws IOException - */ - public void close() throws IOException { - in.close(); - } - - public void reset() { - bufferLength = 0; - bufferPosn = 0; - - } - - /** - * Read one line from the InputStream into the given Text. - * - * @param str the object to store the given line (without newline) - * @param maxLineLength the maximum number of bytes to store into str; - * the rest of the line is silently discarded. - * @param maxBytesToConsume the maximum number of bytes to consume - * in this call. This is only a hint, because if the line cross - * this threshold, we allow it to happen. It can overshoot - * potentially by as much as one buffer length. - * @return the number of bytes read including the (longest) newline - * found. - * @throws IOException if the underlying stream throws - */ - public int readLine(Text str, int maxLineLength, - int maxBytesToConsume) throws IOException { - if (this.recordDelimiterBytes != null) { - return readCustomLine(str, maxLineLength, maxBytesToConsume); - } else { - return readDefaultLine(str, maxLineLength, maxBytesToConsume); - } - } - - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - return in.read(buffer); - } - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - /* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - */ - str.clear(); - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = fillBuffer(in, buffer, prevCharCR); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); - - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - return (int) bytesConsumed; - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength - , int maxBytesToConsume) - throws IOException { - /* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - */ - - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = fillBuffer(in, buffer, prevCharCR); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - str.write(buffer, startPosn, appendLength); - txtLength += appendLength; - } - } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); - - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - - if (bytesConsumed > 0) offsets.add(txtLength); - return (int) bytesConsumed; - } - - /** - * Read a line terminated by one of CR, LF, or CRLF. - */ - -/* int validIdx = 0; - public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets, - long pos, int maxLineLength, int maxBytesToConsume) - throws IOException { - *//* We're reading data from in, but the head of the stream may be - * already buffered in buffer, so we have several cases: - * 1. No newline characters are in the buffer, so we need to copy - * everything and read another buffer from the stream. - * 2. An unambiguously terminated line is in buffer, so we just - * copy to str. - * 3. Ambiguously terminated line is in buffer, i.e. buffer ends - * in CR. In this case we copy everything up to CR to str, but - * we also need to see what follows CR: if it's LF, then we - * need consume LF as well, so next call to readLine will read - * from after that. - * We use a flag prevCharCR to signal if previous character was CR - * and, if it happens to be at the end of the buffer, delay - * consuming it until we have a chance to look at the char that - * follows. - *//* - //str.clear(); - str.reset(); - offsets.clear(); - foffsets.clear(); - - validIdx = 0; - long bufferBytesConsumed = 0; - - int txtLength = 0; //tracks str.getLength(), as an optimization - int newlineLength = 0; //length of terminating newline - boolean prevCharCR = false; //true of prev char was CR - long bytesConsumed = 0; - do { - - int startPosn = bufferPosn; //starting from where we left off the last time - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - if (prevCharCR) { - ++bytesConsumed; //account for CR from previous read - } - bufferLength = in.read(buffer); - if (bufferLength <= 0) { - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline - if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; - ++bufferPosn; // at next invocation proceed from following byte - break; - } - if (prevCharCR) { //CR + notLF, we are at notLF - newlineLength = 1; - break; - } - prevCharCR = (buffer[bufferPosn] == CR); - } - int readLength = bufferPosn - startPosn; - if (prevCharCR && newlineLength == 0) { - --readLength; //CR at the end of the buffer - } - bytesConsumed += readLength; - int appendLength = readLength - newlineLength; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - - if (appendLength > 0) { - str.write(buffer, startPosn, appendLength); - //System.out.println(startPosn + "," + appendLength); - //str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - - if(newlineLength > 0){ - validIdx++; - - if (bytesConsumed > (long)Integer.MAX_VALUE) { - throw new IOException("Too many bytes before newline: " + bytesConsumed); - } - offsets.add(txtLength); - foffsets.add(pos); - pos+= bytesConsumed; - bufferBytesConsumed += bytesConsumed; - - txtLength = 0; - newlineLength = 0; - prevCharCR = false; //true of prev char was CR - bytesConsumed = 0; - } else { - bufferBytesConsumed += bytesConsumed; - bytesConsumed = 0; - } - } while ((bufferBytesConsumed < 256 * 1024)); - - return (int)bufferBytesConsumed; - }*/ - - /** - * Read a line terminated by a custom delimiter. - */ - private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - /* We're reading data from inputStream, but the head of the stream may be - * already captured in the previous buffer, so we have several cases: - * - * 1. The buffer tail does not contain any character sequence which - * matches with the head of delimiter. We count it as a - * ambiguous byte count = 0 - * - * 2. The buffer tail contains a X number of characters, - * that forms a sequence, which matches with the - * head of delimiter. We count ambiguous byte count = X - * - * // *** eg: A segment of input file is as follows - * - * " record 1792: I found this bug very interesting and - * I have completely read about it. record 1793: This bug - * can be solved easily record 1794: This ." - * - * delimiter = "record"; - * - * supposing:- String at the end of buffer = - * "I found this bug very interesting and I have completely re" - * There for next buffer = "ad about it. record 179 ...." - * - * The matching characters in the input - * buffer tail and delimiter head = "re" - * Therefore, ambiguous byte count = 2 **** // - * - * 2.1 If the following bytes are the remaining characters of - * the delimiter, then we have to capture only up to the starting - * position of delimiter. That means, we need not include the - * ambiguous characters in str. - * - * 2.2 If the following bytes are not the remaining characters of - * the delimiter ( as mentioned in the example ), - * then we have to include the ambiguous characters in str. - */ - str.clear(); - int txtLength = 0; // tracks str.getLength(), as an optimization - long bytesConsumed = 0; - int delPosn = 0; - int ambiguousByteCount = 0; // To capture the ambiguous characters count - do { - int startPosn = bufferPosn; // Start from previous end position - if (bufferPosn >= bufferLength) { - startPosn = bufferPosn = 0; - bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); - if (bufferLength <= 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - break; // EOF - } - } - for (; bufferPosn < bufferLength; ++bufferPosn) { - if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { - delPosn++; - if (delPosn >= recordDelimiterBytes.length) { - bufferPosn++; - break; - } - } else if (delPosn != 0) { - bufferPosn--; - delPosn = 0; - } - } - int readLength = bufferPosn - startPosn; - bytesConsumed += readLength; - int appendLength = readLength - delPosn; - if (appendLength > maxLineLength - txtLength) { - appendLength = maxLineLength - txtLength; - } - if (appendLength > 0) { - if (ambiguousByteCount > 0) { - str.append(recordDelimiterBytes, 0, ambiguousByteCount); - //appending the ambiguous characters (refer case 2.2) - bytesConsumed += ambiguousByteCount; - ambiguousByteCount = 0; - } - str.append(buffer, startPosn, appendLength); - txtLength += appendLength; - } - if (bufferPosn >= bufferLength) { - if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { - ambiguousByteCount = delPosn; - bytesConsumed -= ambiguousByteCount; //to be consumed in next - } - } - } while (delPosn < recordDelimiterBytes.length - && bytesConsumed < maxBytesToConsume); - if (bytesConsumed > (long) Integer.MAX_VALUE) { - throw new IOException("Too many bytes before delimiter: " + bytesConsumed); - } - return (int) bytesConsumed; - } - - /** - * Read from the InputStream into the given Text. - * - * @param str the object to store the given line - * @param maxLineLength the maximum number of bytes to store into str. - * @return the number of bytes read including the newline - * @throws IOException if the underlying stream throws - */ - public int readLine(Text str, int maxLineLength) throws IOException { - return readLine(str, maxLineLength, Integer.MAX_VALUE); - } - - /** - * Read from the InputStream into the given Text. - * - * @param str the object to store the given line - * @return the number of bytes read including the newline - * @throws IOException if the underlying stream throws - */ - public int readLine(Text str) throws IOException { - return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java deleted file mode 100644 index f19b61f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.datum.*; -import org.apache.tajo.util.ClassSize; - -public class MemoryUtil { - - /** Overhead for an NullDatum */ - public static final long NULL_DATUM; - - /** Overhead for an BoolDatum */ - public static final long BOOL_DATUM; - - /** Overhead for an CharDatum */ - public static final long CHAR_DATUM; - - /** Overhead for an BitDatum */ - public static final long BIT_DATUM; - - /** Overhead for an Int2Datum */ - public static final long INT2_DATUM; - - /** Overhead for an Int4Datum */ - public static final long INT4_DATUM; - - /** Overhead for an Int8Datum */ - public static final long INT8_DATUM; - - /** Overhead for an Float4Datum */ - public static final long FLOAT4_DATUM; - - /** Overhead for an Float8Datum */ - public static final long FLOAT8_DATUM; - - /** Overhead for an TextDatum */ - public static final long TEXT_DATUM; - - /** Overhead for an BlobDatum */ - public static final long BLOB_DATUM; - - /** Overhead for an DateDatum */ - public static final long DATE_DATUM; - - /** Overhead for an TimeDatum */ - public static final long TIME_DATUM; - - /** Overhead for an TimestampDatum */ - public static final long TIMESTAMP_DATUM; - - static { - NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false); - - CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false); - - BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false); - - BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false); - - INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false); - - INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false); - - INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false); - - FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false); - - FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false); - - TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false); - - BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false); - - DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false); - - TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false); - - TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false); - } - - public static long calculateMemorySize(Tuple tuple) { - long total = ClassSize.OBJECT; - for (Datum datum : tuple.getValues()) { - switch (datum.type()) { - - case NULL_TYPE: - total += NULL_DATUM; - break; - - case BOOLEAN: - total += BOOL_DATUM; - break; - - case BIT: - total += BIT_DATUM; - break; - - case CHAR: - total += CHAR_DATUM + datum.size(); - break; - - case INT1: - case INT2: - total += INT2_DATUM; - break; - - case INT4: - total += INT4_DATUM; - break; - - case INT8: - total += INT8_DATUM; - break; - - case FLOAT4: - total += FLOAT4_DATUM; - break; - - case FLOAT8: - total += FLOAT4_DATUM; - break; - - case TEXT: - total += TEXT_DATUM + datum.size(); - break; - - case DATE: - total += DATE_DATUM; - break; - - case TIME: - total += TIME_DATUM; - break; - - case TIMESTAMP: - total += TIMESTAMP_DATUM; - break; - - default: - break; - } - } - - return total; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java deleted file mode 100644 index 4122c76..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.ImmutableList; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -public class MergeScanner implements Scanner { - private Configuration conf; - private TableMeta meta; - private Schema schema; - private List<FileFragment> fragments; - private Iterator<FileFragment> iterator; - private FileFragment currentFragment; - private Scanner currentScanner; - private Tuple tuple; - private boolean projectable = false; - private boolean selectable = false; - private Schema target; - private float progress; - protected TableStats tableStats; - - public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList) - throws IOException { - this(conf, schema, meta, rawFragmentList, schema); - } - - public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList, - Schema target) - throws IOException { - this.conf = conf; - this.schema = schema; - this.meta = meta; - this.target = target; - - this.fragments = new ArrayList<FileFragment>(); - - long numBytes = 0; - for (FileFragment eachFileFragment: rawFragmentList) { - numBytes += eachFileFragment.getEndKey(); - if (eachFileFragment.getEndKey() > 0) { - fragments.add(eachFileFragment); - } - } - - // it should keep the input order. Otherwise, it causes wrong result of sort queries. - this.reset(); - - if (currentScanner != null) { - this.projectable = currentScanner.isProjectable(); - this.selectable = currentScanner.isSelectable(); - } - - tableStats = new TableStats(); - - tableStats.setNumBytes(numBytes); - tableStats.setNumBlocks(fragments.size()); - - for(Column eachColumn: schema.getColumns()) { - ColumnStats columnStats = new ColumnStats(eachColumn); - tableStats.addColumnStat(columnStats); - } - } - - @Override - public void init() throws IOException { - progress = 0.0f; - } - - @Override - public Tuple next() throws IOException { - if (currentScanner != null) - tuple = currentScanner.next(); - - if (tuple != null) { - return tuple; - } else { - if (currentScanner != null) { - currentScanner.close(); - TableStats scannerTableStsts = currentScanner.getInputStats(); - if (scannerTableStsts != null) { - tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes()); - tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows()); - } - } - currentScanner = getNextScanner(); - if (currentScanner != null) { - tuple = currentScanner.next(); - } - } - return tuple; - } - - @Override - public void reset() throws IOException { - this.iterator = fragments.iterator(); - this.currentScanner = getNextScanner(); - } - - private Scanner getNextScanner() throws IOException { - if (iterator.hasNext()) { - currentFragment = iterator.next(); - currentScanner = StorageManager.getStorageManager((TajoConf)conf).getScanner(meta, schema, - currentFragment, target); - currentScanner.init(); - return currentScanner; - } else { - return null; - } - } - - @Override - public void close() throws IOException { - if(currentScanner != null) { - currentScanner.close(); - currentScanner = null; - } - iterator = null; - progress = 1.0f; - } - - @Override - public boolean isProjectable() { - return projectable; - } - - @Override - public void setTarget(Column[] targets) { - this.target = new Schema(targets); - } - - @Override - public boolean isSelectable() { - return selectable; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public Schema getSchema() { - return schema; - } - - @Override - public boolean isSplittable(){ - return false; - } - - @Override - public float getProgress() { - if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) { - TableStats scannerTableStsts = currentScanner.getInputStats(); - long currentScannerReadBytes = 0; - if (scannerTableStsts != null) { - currentScannerReadBytes = scannerTableStsts.getReadBytes(); - } - - return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes(); - } else { - return progress; - } - } - - @Override - public TableStats getInputStats() { - return tableStats; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java deleted file mode 100644 index 4cec67d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java +++ /dev/null @@ -1,62 +0,0 @@ -package org.apache.tajo.storage; /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; - -public class NullScanner extends FileScanner { - public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) { - super(conf, schema, meta, fragment); - } - - @Override - public Tuple next() throws IOException { - progress = 1.0f; - - return null; - } - - @Override - public void reset() throws IOException { - progress = 0.0f; - } - - @Override - public void close() throws IOException { - progress = 0.0f; - } - - @Override - public boolean isProjectable() { - return false; - } - - @Override - public boolean isSelectable() { - return true; - } - - @Override - public boolean isSplittable() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java deleted file mode 100644 index 94d13ee..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/NumericPathComparator.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.Path; - -import java.util.Comparator; - -public class NumericPathComparator implements Comparator<Path> { - - @Override - public int compare(Path p1, Path p2) { - int num1 = Integer.parseInt(p1.getName()); - int num2 = Integer.parseInt(p2.getName()); - - return num1 - num2; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java deleted file mode 100644 index 2fae243..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ /dev/null @@ -1,772 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.protobuf.Message; -import io.netty.buffer.ByteBuf; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.BitArray; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -public class RawFile { - private static final Log LOG = LogFactory.getLog(RawFile.class); - - public static class RawFileScanner extends FileScanner implements SeekableScanner { - private FileChannel channel; - private DataType[] columnTypes; - - private ByteBuffer buffer; - private ByteBuf buf; - private Tuple tuple; - - private int headerSize = 0; // Header size of a tuple - private BitArray nullFlags; - private static final int RECORD_SIZE = 4; - private boolean eos = false; - private long startOffset; - private long endOffset; - private FileInputStream fis; - private long recordCount; - private long totalReadBytes; - private long filePosition; - private boolean forceFillBuffer; - - public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { - super(conf, schema, meta, fragment); - } - - public void init() throws IOException { - File file; - try { - if (fragment.getPath().toUri().getScheme() != null) { - file = new File(fragment.getPath().toUri()); - } else { - file = new File(fragment.getPath().toString()); - } - } catch (IllegalArgumentException iae) { - throw new IOException(iae); - } - - fis = new FileInputStream(file); - channel = fis.getChannel(); - filePosition = startOffset = fragment.getStartKey(); - endOffset = fragment.getStartKey() + fragment.getEndKey(); - - if (LOG.isDebugEnabled()) { - LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size() - + ", fragment length :" + fragment.getEndKey()); - } - - buf = BufferPool.directBuffer(64 * StorageUnit.KB); - buffer = buf.nioBuffer(0, buf.capacity()); - - columnTypes = new DataType[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - columnTypes[i] = schema.getColumn(i).getDataType(); - } - - tuple = new VTuple(columnTypes.length); - nullFlags = new BitArray(schema.size()); - headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize - - // initial set position - if (fragment.getStartKey() > 0) { - channel.position(fragment.getStartKey()); - } - - forceFillBuffer = true; - super.init(); - } - - @Override - public long getNextOffset() throws IOException { - return filePosition - (forceFillBuffer ? 0 : buffer.remaining()); - } - - @Override - public void seek(long offset) throws IOException { - eos = false; - filePosition = channel.position(); - - // do not fill the buffer if the offset is already included in the buffer. - if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){ - buffer.position((int)(offset - (filePosition - buffer.limit()))); - } else { - if(offset < startOffset || offset > startOffset + fragment.getEndKey()){ - throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d", - startOffset, startOffset + fragment.getEndKey(), offset)); - } - channel.position(offset); - filePosition = offset; - buffer.clear(); - forceFillBuffer = true; - fillBuffer(); - } - } - - private boolean fillBuffer() throws IOException { - if(!forceFillBuffer) buffer.compact(); - - int bytesRead = channel.read(buffer); - forceFillBuffer = false; - if (bytesRead == -1) { - eos = true; - return false; - } else { - buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero - filePosition += bytesRead; - totalReadBytes += bytesRead; - return true; - } - } - - /** - * Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n An unsigned 32-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - * @return A signed 32-bit integer. - */ - public static int decodeZigZag32(final int n) { - return (n >>> 1) ^ -(n & 1); - } - - /** - * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n An unsigned 64-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - * @return A signed 64-bit integer. - */ - public static long decodeZigZag64(final long n) { - return (n >>> 1) ^ -(n & 1); - } - - - /** - * Read a raw Varint from the stream. If larger than 32 bits, discard the - * upper bits. - */ - public int readRawVarint32() throws IOException { - byte tmp = buffer.get(); - if (tmp >= 0) { - return tmp; - } - int result = tmp & 0x7f; - if ((tmp = buffer.get()) >= 0) { - result |= tmp << 7; - } else { - result |= (tmp & 0x7f) << 7; - if ((tmp = buffer.get()) >= 0) { - result |= tmp << 14; - } else { - result |= (tmp & 0x7f) << 14; - if ((tmp = buffer.get()) >= 0) { - result |= tmp << 21; - } else { - result |= (tmp & 0x7f) << 21; - result |= (tmp = buffer.get()) << 28; - if (tmp < 0) { - // Discard upper 32 bits. - for (int i = 0; i < 5; i++) { - if (buffer.get() >= 0) { - return result; - } - } - throw new IOException("Invalid Variable int32"); - } - } - } - } - return result; - } - - /** Read a raw Varint from the stream. */ - public long readRawVarint64() throws IOException { - int shift = 0; - long result = 0; - while (shift < 64) { - final byte b = buffer.get(); - result |= (long)(b & 0x7F) << shift; - if ((b & 0x80) == 0) { - return result; - } - shift += 7; - } - throw new IOException("Invalid Variable int64"); - } - - @Override - public Tuple next() throws IOException { - if(eos) return null; - - if (forceFillBuffer || buffer.remaining() < headerSize) { - if (!fillBuffer()) { - return null; - } - } - - // backup the buffer state - int bufferLimit = buffer.limit(); - int recordSize = buffer.getInt(); - int nullFlagSize = buffer.getShort(); - - buffer.limit(buffer.position() + nullFlagSize); - nullFlags.fromByteBuffer(buffer); - // restore the start of record contents - buffer.limit(bufferLimit); - if (buffer.remaining() < (recordSize - headerSize)) { - - //if the buffer reaches the writable size, the buffer increase the record size - reSizeBuffer(recordSize); - - if (!fillBuffer()) { - return null; - } - } - - for (int i = 0; i < columnTypes.length; i++) { - // check if the i'th column is null - if (nullFlags.get(i)) { - tuple.put(i, DatumFactory.createNullDatum()); - continue; - } - - switch (columnTypes[i].getType()) { - case BOOLEAN : - tuple.put(i, DatumFactory.createBool(buffer.get())); - break; - - case BIT : - tuple.put(i, DatumFactory.createBit(buffer.get())); - break; - - case CHAR : - int realLen = readRawVarint32(); - byte[] buf = new byte[realLen]; - buffer.get(buf); - tuple.put(i, DatumFactory.createChar(buf)); - break; - - case INT2 : - tuple.put(i, DatumFactory.createInt2(buffer.getShort())); - break; - - case INT4 : - tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); - break; - - case INT8 : - tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); - break; - - case FLOAT4 : - tuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); - break; - - case FLOAT8 : - tuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); - break; - - case TEXT : { - int len = readRawVarint32(); - byte [] strBytes = new byte[len]; - buffer.get(strBytes); - tuple.put(i, DatumFactory.createText(strBytes)); - break; - } - - case BLOB : { - int len = readRawVarint32(); - byte [] rawBytes = new byte[len]; - buffer.get(rawBytes); - tuple.put(i, DatumFactory.createBlob(rawBytes)); - break; - } - - case PROTOBUF: { - int len = readRawVarint32(); - byte [] rawBytes = new byte[len]; - buffer.get(rawBytes); - - ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(rawBytes); - tuple.put(i, factory.createDatum(builder.build())); - break; - } - - case INET4 : - byte [] ipv4Bytes = new byte[4]; - buffer.get(ipv4Bytes); - tuple.put(i, DatumFactory.createInet4(ipv4Bytes)); - break; - - case DATE: { - int val = buffer.getInt(); - if (val < Integer.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); - } - break; - } - case TIME: - case TIMESTAMP: { - long val = buffer.getLong(); - if (val < Long.MIN_VALUE + 1) { - tuple.put(i, DatumFactory.createNullDatum()); - } else { - tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); - } - break; - } - case NULL_TYPE: - tuple.put(i, NullDatum.get()); - break; - - default: - } - } - - recordCount++; - - if(filePosition - buffer.remaining() >= endOffset){ - eos = true; - } - return new VTuple(tuple); - } - - private void reSizeBuffer(int writableBytes){ - if (buffer.capacity() - buffer.remaining() < writableBytes) { - buf.setIndex(buffer.position(), buffer.limit()); - buf.markReaderIndex(); - buf.discardSomeReadBytes(); - buf.ensureWritable(writableBytes); - buffer = buf.nioBuffer(0, buf.capacity()); - buffer.limit(buf.writerIndex()); - } - } - - @Override - public void reset() throws IOException { - // reset the buffer - buffer.clear(); - forceFillBuffer = true; - filePosition = fragment.getStartKey(); - channel.position(filePosition); - eos = false; - } - - @Override - public void close() throws IOException { - if(buf != null){ - buffer.clear(); - buffer = null; - - buf.release(); - buf = null; - } - - IOUtils.cleanup(LOG, channel, fis); - } - - @Override - public boolean isProjectable() { - return false; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public boolean isSplittable(){ - return false; - } - - @Override - public TableStats getInputStats() { - if(tableStats != null){ - tableStats.setNumRows(recordCount); - tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n) - tableStats.setNumBytes(fragment.getEndKey()); - } - return tableStats; - } - - @Override - public float getProgress() { - if(eos) { - return 1.0f; - } - - if (filePosition - startOffset == 0) { - return 0.0f; - } else { - return Math.min(1.0f, ((float) filePosition / endOffset)); - } - } - } - - public static class RawFileAppender extends FileAppender { - private FileChannel channel; - private RandomAccessFile randomAccessFile; - private DataType[] columnTypes; - - private ByteBuffer buffer; - private ByteBuf buf; - private BitArray nullFlags; - private int headerSize = 0; - private static final int RECORD_SIZE = 4; - private long pos; - - private TableStatistics stats; - - public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, path); - } - - public void init() throws IOException { - File file; - try { - if (path.toUri().getScheme() != null) { - file = new File(path.toUri()); - } else { - file = new File(path.toString()); - } - } catch (IllegalArgumentException iae) { - throw new IOException(iae); - } - - randomAccessFile = new RandomAccessFile(file, "rw"); - channel = randomAccessFile.getChannel(); - pos = 0; - - columnTypes = new DataType[schema.size()]; - for (int i = 0; i < schema.size(); i++) { - columnTypes[i] = schema.getColumn(i).getDataType(); - } - - buf = BufferPool.directBuffer(64 * StorageUnit.KB); - buffer = buf.nioBuffer(0, buf.capacity()); - - // comput the number of bytes, representing the null flags - - nullFlags = new BitArray(schema.size()); - headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - super.init(); - } - - @Override - public long getOffset() throws IOException { - return pos; - } - - private void flushBuffer() throws IOException { - buffer.flip(); - channel.write(buffer); - buffer.clear(); - } - - private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten) - throws IOException { - - // if the buffer reaches the limit, - // write the bytes from 0 to the previous record. - if (buffer.remaining() < sizeToBeWritten) { - - int limit = buffer.position(); - buffer.limit(recordOffset); - buffer.flip(); - channel.write(buffer); - buffer.position(recordOffset); - buffer.limit(limit); - buffer.compact(); - - return true; - } else { - return false; - } - } - - /** - * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n A signed 32-bit integer. - * @return An unsigned 32-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - */ - public static int encodeZigZag32(final int n) { - // Note: the right-shift must be arithmetic - return (n << 1) ^ (n >> 31); - } - - /** - * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers - * into values that can be efficiently encoded with varint. (Otherwise, - * negative values must be sign-extended to 64 bits to be varint encoded, - * thus always taking 10 bytes on the wire.) - * - * @param n A signed 64-bit integer. - * @return An unsigned 64-bit integer, stored in a signed int because - * Java has no explicit unsigned support. - */ - public static long encodeZigZag64(final long n) { - // Note: the right-shift must be arithmetic - return (n << 1) ^ (n >> 63); - } - - /** - * Encode and write a varint. {@code value} is treated as - * unsigned, so it won't be sign-extended if negative. - */ - public void writeRawVarint32(int value) throws IOException { - while (true) { - if ((value & ~0x7F) == 0) { - buffer.put((byte) value); - return; - } else { - buffer.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } - - /** - * Compute the number of bytes that would be needed to encode a varint. - * {@code value} is treated as unsigned, so it won't be sign-extended if - * negative. - */ - public static int computeRawVarint32Size(final int value) { - if ((value & (0xffffffff << 7)) == 0) return 1; - if ((value & (0xffffffff << 14)) == 0) return 2; - if ((value & (0xffffffff << 21)) == 0) return 3; - if ((value & (0xffffffff << 28)) == 0) return 4; - return 5; - } - - /** Encode and write a varint. */ - public void writeRawVarint64(long value) throws IOException { - while (true) { - if ((value & ~0x7FL) == 0) { - buffer.put((byte) value); - return; - } else { - buffer.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } - - @Override - public void addTuple(Tuple t) throws IOException { - - if (buffer.remaining() < headerSize) { - flushBuffer(); - } - - // skip the row header - int recordOffset = buffer.position(); - buffer.position(recordOffset + headerSize); - // reset the null flags - nullFlags.clear(); - for (int i = 0; i < schema.size(); i++) { - if (enabledStats) { - stats.analyzeField(i, t.get(i)); - } - - if (t.isNull(i)) { - nullFlags.set(i); - continue; - } - - // 8 is the maximum bytes size of all types - if (flushBufferAndReplace(recordOffset, 8)) { - recordOffset = 0; - } - - switch(columnTypes[i].getType()) { - case NULL_TYPE: - nullFlags.set(i); - continue; - - case BOOLEAN: - case BIT: - buffer.put(t.getByte(i)); - break; - - case INT2 : - buffer.putShort(t.getInt2(i)); - break; - - case INT4 : - writeRawVarint32(encodeZigZag32(t.getInt4(i))); - break; - - case INT8 : - writeRawVarint64(encodeZigZag64(t.getInt8(i))); - break; - - case FLOAT4 : - buffer.putFloat(t.getFloat4(i)); - break; - - case FLOAT8 : - buffer.putDouble(t.getFloat8(i)); - break; - - case CHAR: - case TEXT: { - byte [] strBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(strBytes.length); - buffer.put(strBytes); - break; - } - - case DATE: - buffer.putInt(t.getInt4(i)); - break; - - case TIME: - case TIMESTAMP: - buffer.putLong(t.getInt8(i)); - break; - - case BLOB : { - byte [] rawBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(rawBytes.length); - buffer.put(rawBytes); - break; - } - - case PROTOBUF: { - byte [] rawBytes = t.getBytes(i); - if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { - recordOffset = 0; - } - writeRawVarint32(rawBytes.length); - buffer.put(rawBytes); - break; - } - - case INET4 : - buffer.put(t.getBytes(i)); - break; - - default: - throw new IOException("Cannot support data type: " + columnTypes[i].getType()); - } - } - - // write a record header - int bufferPos = buffer.position(); - buffer.position(recordOffset); - buffer.putInt(bufferPos - recordOffset); - byte [] flags = nullFlags.toArray(); - buffer.putShort((short) flags.length); - buffer.put(flags); - - pos += bufferPos - recordOffset; - buffer.position(bufferPos); - - if (enabledStats) { - stats.incrementRow(); - } - } - - @Override - public void flush() throws IOException { - if(buffer != null){ - flushBuffer(); - } - } - - @Override - public void close() throws IOException { - flush(); - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); - } - - if(buf != null){ - buffer.clear(); - buffer = null; - - buf.release(); - buf = null; - } - - IOUtils.cleanup(LOG, channel, randomAccessFile); - } - - @Override - public TableStats getStats() { - if (enabledStats) { - stats.setNumBytes(pos); - return stats.getTableStat(); - } else { - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java deleted file mode 100644 index db36771..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java +++ /dev/null @@ -1,496 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.util.BitArray; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Arrays; - -public class RowFile { - public static final Log LOG = LogFactory.getLog(RowFile.class); - - private static final int SYNC_ESCAPE = -1; - private static final int SYNC_HASH_SIZE = 16; - private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; - private final static int DEFAULT_BUFFER_SIZE = 65535; - public static int SYNC_INTERVAL; - - public static class RowFileScanner extends FileScanner { - private FileSystem fs; - private FSDataInputStream in; - private Tuple tuple; - - private byte[] sync = new byte[SYNC_HASH_SIZE]; - private byte[] checkSync = new byte[SYNC_HASH_SIZE]; - private long start, end; - - private ByteBuffer buffer; - private final int tupleHeaderSize; - private BitArray nullFlags; - private long bufferStartPos; - - public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - - SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, - ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE; - - nullFlags = new BitArray(schema.size()); - tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8); - this.start = fragment.getStartKey(); - this.end = this.start + fragment.getEndKey(); - } - - public void init() throws IOException { - // set default page size. - fs = fragment.getPath().getFileSystem(conf); - in = fs.open(fragment.getPath()); - buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size()); - buffer.flip(); - - readHeader(); - - // find the correct position from the start - if (this.start > in.getPos()) { - long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0; - in.seek(realStart); - } - bufferStartPos = in.getPos(); - fillBuffer(); - - if (start != 0) { - // TODO: improve - boolean syncFound = false; - while (!syncFound) { - if (buffer.remaining() < SYNC_SIZE) { - fillBuffer(); - } - buffer.mark(); - syncFound = checkSync(); - if (!syncFound) { - buffer.reset(); - buffer.get(); // proceed one byte - } - } - bufferStartPos += buffer.position(); - buffer.compact(); - buffer.flip(); - } - - super.init(); - } - - private void readHeader() throws IOException { - SYNC_INTERVAL = in.readInt(); - StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE); - } - - /** - * Find the sync from the front of the buffer - * - * @return return true if it succeeds to find the sync. - * @throws IOException - */ - private boolean checkSync() throws IOException { - buffer.getInt(); // escape - buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync - return Arrays.equals(checkSync, sync); - } - - private int fillBuffer() throws IOException { - bufferStartPos += buffer.position(); - buffer.compact(); - int remain = buffer.remaining(); - int read = in.read(buffer); - if (read == -1) { - buffer.flip(); - return read; - } else { - int totalRead = read; - if (remain > totalRead) { - read = in.read(buffer); - totalRead += read > 0 ? read : 0; - } - buffer.flip(); - return totalRead; - } - } - - @Override - public Tuple next() throws IOException { - while (buffer.remaining() < SYNC_SIZE) { - if (fillBuffer() < 0) { - return null; - } - } - - buffer.mark(); - if (!checkSync()) { - buffer.reset(); - } else { - if (bufferStartPos + buffer.position() > end) { - return null; - } - } - - while (buffer.remaining() < tupleHeaderSize) { - if (fillBuffer() < 0) { - return null; - } - } - - int i; - tuple = new VTuple(schema.size()); - - int nullFlagSize = buffer.getShort(); - byte[] nullFlagBytes = new byte[nullFlagSize]; - buffer.get(nullFlagBytes, 0, nullFlagSize); - nullFlags = new BitArray(nullFlagBytes); - int tupleSize = buffer.getShort(); - - while (buffer.remaining() < (tupleSize)) { - if (fillBuffer() < 0) { - return null; - } - } - - Datum datum; - Column col; - for (i = 0; i < schema.size(); i++) { - if (!nullFlags.get(i)) { - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case BOOLEAN : - datum = DatumFactory.createBool(buffer.get()); - tuple.put(i, datum); - break; - - case BIT: - datum = DatumFactory.createBit(buffer.get()); - tuple.put(i, datum ); - break; - - case CHAR : - int realLen = buffer.getInt(); - byte[] buf = new byte[col.getDataType().getLength()]; - buffer.get(buf); - byte[] charBuf = Arrays.copyOf(buf, realLen); - tuple.put(i, DatumFactory.createChar(charBuf)); - break; - - case INT2 : - datum = DatumFactory.createInt2(buffer.getShort()); - tuple.put(i, datum ); - break; - - case INT4 : - datum = DatumFactory.createInt4(buffer.getInt()); - tuple.put(i, datum ); - break; - - case INT8 : - datum = DatumFactory.createInt8(buffer.getLong()); - tuple.put(i, datum ); - break; - - case FLOAT4 : - datum = DatumFactory.createFloat4(buffer.getFloat()); - tuple.put(i, datum); - break; - - case FLOAT8 : - datum = DatumFactory.createFloat8(buffer.getDouble()); - tuple.put(i, datum); - break; - - case TEXT: - short bytelen = buffer.getShort(); - byte[] strbytes = new byte[bytelen]; - buffer.get(strbytes, 0, bytelen); - datum = DatumFactory.createText(strbytes); - tuple.put(i, datum); - break; - - case BLOB: - short bytesLen = buffer.getShort(); - byte [] bytesBuf = new byte[bytesLen]; - buffer.get(bytesBuf); - datum = DatumFactory.createBlob(bytesBuf); - tuple.put(i, datum); - break; - - case INET4 : - byte[] ipv4 = new byte[4]; - buffer.get(ipv4, 0, 4); - datum = DatumFactory.createInet4(ipv4); - tuple.put(i, datum); - break; - - default: - break; - } - } else { - tuple.put(i, DatumFactory.createNullDatum()); - } - } - return tuple; - } - - @Override - public void reset() throws IOException { - init(); - } - - @Override - public void close() throws IOException { - if (in != null) { - in.close(); - } - } - - @Override - public boolean isProjectable() { - return false; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public boolean isSplittable(){ - return true; - } - } - - public static class RowFileAppender extends FileAppender { - private FSDataOutputStream out; - private long lastSyncPos; - private FileSystem fs; - private byte[] sync; - private ByteBuffer buffer; - - private BitArray nullFlags; - // statistics - private TableStatistics stats; - - public RowFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) - throws IOException { - super(conf, schema, meta, path); - } - - public void init() throws IOException { - SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, - ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal); - fs = path.getFileSystem(conf); - - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - - sync = new byte[SYNC_HASH_SIZE]; - lastSyncPos = 0; - - out = fs.create(path); - - MessageDigest md; - try { - md = MessageDigest.getInstance("MD5"); - md.update((path.toString()+System.currentTimeMillis()).getBytes()); - sync = md.digest(); - } catch (NoSuchAlgorithmException e) { - LOG.error(e); - } - - writeHeader(); - - buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); - - nullFlags = new BitArray(schema.size()); - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - } - - private void writeHeader() throws IOException { - out.writeInt(SYNC_INTERVAL); - out.write(sync); - out.flush(); - lastSyncPos = out.getPos(); - } - - @Override - public void addTuple(Tuple t) throws IOException { - checkAndWriteSync(); - Column col; - - buffer.clear(); - nullFlags.clear(); - - for (int i = 0; i < schema.size(); i++) { - if (enabledStats) { - stats.analyzeField(i, t.get(i)); - } - - if (t.isNull(i)) { - nullFlags.set(i); - } else { - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case BOOLEAN: - buffer.put(t.get(i).asByte()); - break; - case BIT: - buffer.put(t.get(i).asByte()); - break; - case CHAR: - byte[] src = t.get(i).asByteArray(); - byte[] dst = Arrays.copyOf(src, col.getDataType().getLength()); - buffer.putInt(src.length); - buffer.put(dst); - break; - case TEXT: - byte [] strbytes = t.get(i).asByteArray(); - buffer.putShort((short)strbytes.length); - buffer.put(strbytes, 0, strbytes.length); - break; - case INT2: - buffer.putShort(t.get(i).asInt2()); - break; - case INT4: - buffer.putInt(t.get(i).asInt4()); - break; - case INT8: - buffer.putLong(t.get(i).asInt8()); - break; - case FLOAT4: - buffer.putFloat(t.get(i).asFloat4()); - break; - case FLOAT8: - buffer.putDouble(t.get(i).asFloat8()); - break; - case BLOB: - byte [] bytes = t.get(i).asByteArray(); - buffer.putShort((short)bytes.length); - buffer.put(bytes); - break; - case INET4: - buffer.put(t.get(i).asByteArray()); - break; - case INET6: - buffer.put(t.get(i).asByteArray()); - break; - case NULL_TYPE: - nullFlags.set(i); - break; - default: - break; - } - } - } - - byte[] bytes = nullFlags.toArray(); - out.writeShort(bytes.length); - out.write(bytes); - - bytes = buffer.array(); - int dataLen = buffer.position(); - out.writeShort(dataLen); - out.write(bytes, 0, dataLen); - - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - @Override - public long getOffset() throws IOException { - return out.getPos(); - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - if (out != null) { - if (enabledStats) { - stats.setNumBytes(out.getPos()); - } - sync(); - out.flush(); - out.close(); - } - } - - private void sync() throws IOException { - if (lastSyncPos != out.getPos()) { - out.writeInt(SYNC_ESCAPE); - out.write(sync); - lastSyncPos = out.getPos(); - } - } - - private void checkAndWriteSync() throws IOException { - if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) { - sync(); - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java deleted file mode 100644 index 24b6280..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java +++ /dev/null @@ -1,377 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.exception.UnknownDataTypeException; -import org.apache.tajo.tuple.offheap.RowWriter; -import org.apache.tajo.util.BitArray; - -import java.nio.ByteBuffer; - -public class RowStoreUtil { - public static int[] getTargetIds(Schema inSchema, Schema outSchema) { - int[] targetIds = new int[outSchema.size()]; - int i = 0; - for (Column target : outSchema.getColumns()) { - targetIds[i] = inSchema.getColumnId(target.getQualifiedName()); - i++; - } - - return targetIds; - } - - public static Tuple project(Tuple in, Tuple out, int[] targetIds) { - out.clear(); - for (int idx = 0; idx < targetIds.length; idx++) { - out.put(idx, in.get(targetIds[idx])); - } - return out; - } - - public static RowStoreEncoder createEncoder(Schema schema) { - return new RowStoreEncoder(schema); - } - - public static RowStoreDecoder createDecoder(Schema schema) { - return new RowStoreDecoder(schema); - } - - public static class RowStoreDecoder { - - private Schema schema; - private BitArray nullFlags; - private int headerSize; - - private RowStoreDecoder(Schema schema) { - this.schema = schema; - nullFlags = new BitArray(schema.size()); - headerSize = nullFlags.bytesLength(); - } - - - public Tuple toTuple(byte [] bytes) { - nullFlags.clear(); - ByteBuffer bb = ByteBuffer.wrap(bytes); - Tuple tuple = new VTuple(schema.size()); - Column col; - TajoDataTypes.DataType type; - - bb.limit(headerSize); - nullFlags.fromByteBuffer(bb); - bb.limit(bytes.length); - - for (int i =0; i < schema.size(); i++) { - if (nullFlags.get(i)) { - tuple.put(i, DatumFactory.createNullDatum()); - continue; - } - - col = schema.getColumn(i); - type = col.getDataType(); - switch (type.getType()) { - case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; - case BIT: - byte b = bb.get(); - tuple.put(i, DatumFactory.createBit(b)); - break; - - case CHAR: - byte c = bb.get(); - tuple.put(i, DatumFactory.createChar(c)); - break; - - case INT2: - short s = bb.getShort(); - tuple.put(i, DatumFactory.createInt2(s)); - break; - - case INT4: - case DATE: - int i_ = bb.getInt(); - tuple.put(i, DatumFactory.createFromInt4(type, i_)); - break; - - case INT8: - case TIME: - case TIMESTAMP: - long l = bb.getLong(); - tuple.put(i, DatumFactory.createFromInt8(type, l)); - break; - - case INTERVAL: - int month = bb.getInt(); - long milliseconds = bb.getLong(); - tuple.put(i, new IntervalDatum(month, milliseconds)); - break; - - case FLOAT4: - float f = bb.getFloat(); - tuple.put(i, DatumFactory.createFloat4(f)); - break; - - case FLOAT8: - double d = bb.getDouble(); - tuple.put(i, DatumFactory.createFloat8(d)); - break; - - case TEXT: - byte [] _string = new byte[bb.getInt()]; - bb.get(_string); - tuple.put(i, DatumFactory.createText(_string)); - break; - - case BLOB: - byte [] _bytes = new byte[bb.getInt()]; - bb.get(_bytes); - tuple.put(i, DatumFactory.createBlob(_bytes)); - break; - - case INET4: - byte [] _ipv4 = new byte[4]; - bb.get(_ipv4); - tuple.put(i, DatumFactory.createInet4(_ipv4)); - break; - case INET6: - // TODO - to be implemented - throw new UnsupportedException(type.getType().name()); - default: - throw new RuntimeException(new UnknownDataTypeException(type.getType().name())); - } - } - return tuple; - } - - public Schema getSchema() { - return schema; - } - } - - public static class RowStoreEncoder { - private Schema schema; - private BitArray nullFlags; - private int headerSize; - - private RowStoreEncoder(Schema schema) { - this.schema = schema; - nullFlags = new BitArray(schema.size()); - headerSize = nullFlags.bytesLength(); - } - - public byte[] toBytes(Tuple tuple) { - nullFlags.clear(); - int size = estimateTupleDataSize(tuple); - ByteBuffer bb = ByteBuffer.allocate(size + headerSize); - bb.position(headerSize); - Column col; - for (int i = 0; i < schema.size(); i++) { - if (tuple.isNull(i)) { - nullFlags.set(i); - continue; - } - - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case NULL_TYPE: - nullFlags.set(i); - break; - case BOOLEAN: - bb.put(tuple.get(i).asByte()); - break; - case BIT: - bb.put(tuple.get(i).asByte()); - break; - case CHAR: - bb.put(tuple.get(i).asByte()); - break; - case INT2: - bb.putShort(tuple.get(i).asInt2()); - break; - case INT4: - bb.putInt(tuple.get(i).asInt4()); - break; - case INT8: - bb.putLong(tuple.get(i).asInt8()); - break; - case FLOAT4: - bb.putFloat(tuple.get(i).asFloat4()); - break; - case FLOAT8: - bb.putDouble(tuple.get(i).asFloat8()); - break; - case TEXT: - byte[] _string = tuple.get(i).asByteArray(); - bb.putInt(_string.length); - bb.put(_string); - break; - case DATE: - bb.putInt(tuple.get(i).asInt4()); - break; - case TIME: - case TIMESTAMP: - bb.putLong(tuple.get(i).asInt8()); - break; - case INTERVAL: - IntervalDatum interval = (IntervalDatum) tuple.get(i); - bb.putInt(interval.getMonths()); - bb.putLong(interval.getMilliSeconds()); - break; - case BLOB: - byte[] bytes = tuple.get(i).asByteArray(); - bb.putInt(bytes.length); - bb.put(bytes); - break; - case INET4: - byte[] ipBytes = tuple.get(i).asByteArray(); - bb.put(ipBytes); - break; - case INET6: - bb.put(tuple.get(i).asByteArray()); - break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); - } - } - - byte[] flags = nullFlags.toArray(); - int finalPosition = bb.position(); - bb.position(0); - bb.put(flags); - - bb.position(finalPosition); - bb.flip(); - byte[] buf = new byte[bb.limit()]; - bb.get(buf); - return buf; - } - - // Note that, NULL values are treated separately - private int estimateTupleDataSize(Tuple tuple) { - int size = 0; - Column col; - - for (int i = 0; i < schema.size(); i++) { - if (tuple.isNull(i)) { - continue; - } - - col = schema.getColumn(i); - switch (col.getDataType().getType()) { - case BOOLEAN: - case BIT: - case CHAR: - size += 1; - break; - case INT2: - size += 2; - break; - case DATE: - case INT4: - case FLOAT4: - size += 4; - break; - case TIME: - case TIMESTAMP: - case INT8: - case FLOAT8: - size += 8; - break; - case INTERVAL: - size += 12; - break; - case TEXT: - case BLOB: - size += (4 + tuple.get(i).asByteArray().length); - break; - case INET4: - case INET6: - size += tuple.get(i).asByteArray().length; - break; - default: - throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); - } - } - - size += 100; // optimistic reservation - - return size; - } - - public Schema getSchema() { - return schema; - } - } - - public static void convert(Tuple tuple, RowWriter writer) { - writer.startRow(); - - for (int i = 0; i < writer.dataTypes().length; i++) { - if (tuple.isNull(i)) { - writer.skipField(); - continue; - } - switch (writer.dataTypes()[i].getType()) { - case BOOLEAN: - writer.putBool(tuple.getBool(i)); - break; - case INT1: - case INT2: - writer.putInt2(tuple.getInt2(i)); - break; - case INT4: - case DATE: - case INET4: - writer.putInt4(tuple.getInt4(i)); - break; - case INT8: - case TIMESTAMP: - case TIME: - writer.putInt8(tuple.getInt8(i)); - break; - case FLOAT4: - writer.putFloat4(tuple.getFloat4(i)); - break; - case FLOAT8: - writer.putFloat8(tuple.getFloat8(i)); - break; - case TEXT: - writer.putText(tuple.getBytes(i)); - break; - case INTERVAL: - writer.putInterval((IntervalDatum) tuple.getInterval(i)); - break; - case PROTOBUF: - writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); - break; - case NULL_TYPE: - writer.skipField(); - break; - default: - throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]); - } - } - writer.endRow(); - } -}
