http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java new file mode 100644 index 0000000..cb86f35 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java @@ -0,0 +1,773 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.protobuf.Message; +import io.netty.buffer.ByteBuf; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.BitArray; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class RawFile { + private static final Log LOG = LogFactory.getLog(RawFile.class); + + public static class RawFileScanner extends FileScanner implements SeekableScanner { + private FileChannel channel; + private DataType[] columnTypes; + + private ByteBuffer buffer; + private ByteBuf buf; + private Tuple tuple; + + private int headerSize = 0; // Header size of a tuple + private BitArray nullFlags; + private static final int RECORD_SIZE = 4; + private boolean eos = false; + private long startOffset; + private long endOffset; + private FileInputStream fis; + private long recordCount; + private long totalReadBytes; + private long filePosition; + private boolean forceFillBuffer; + + public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { + super(conf, schema, meta, fragment); + } + + public void init() throws IOException { + File file; + try { + if (fragment.getPath().toUri().getScheme() != null) { + file = new File(fragment.getPath().toUri()); + } else { + file = new File(fragment.getPath().toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + fis = new FileInputStream(file); + channel = fis.getChannel(); + filePosition = startOffset = fragment.getStartKey(); + endOffset = fragment.getStartKey() + fragment.getLength(); + + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", file size :" + channel.size() + + ", fragment length :" + fragment.getLength()); + } + + buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buffer = buf.nioBuffer(0, buf.capacity()); + + columnTypes = new DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + tuple = new VTuple(columnTypes.length); + nullFlags = new BitArray(schema.size()); + headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); // The middle 2 bytes is for NullFlagSize + + // initial set position + if (fragment.getStartKey() > 0) { + channel.position(fragment.getStartKey()); + } + + forceFillBuffer = true; + super.init(); + } + + @Override + public long getNextOffset() throws IOException { + return filePosition - (forceFillBuffer ? 0 : buffer.remaining()); + } + + @Override + public void seek(long offset) throws IOException { + eos = false; + filePosition = channel.position(); + + // do not fill the buffer if the offset is already included in the buffer. + if(!forceFillBuffer && filePosition > offset && offset > filePosition - buffer.limit()){ + buffer.position((int)(offset - (filePosition - buffer.limit()))); + } else { + if(offset < startOffset || offset > startOffset + fragment.getLength()){ + throw new IndexOutOfBoundsException(String.format("range(%d, %d), offset: %d", + startOffset, startOffset + fragment.getLength(), offset)); + } + channel.position(offset); + filePosition = offset; + buffer.clear(); + forceFillBuffer = true; + fillBuffer(); + } + } + + private boolean fillBuffer() throws IOException { + if(!forceFillBuffer) buffer.compact(); + + int bytesRead = channel.read(buffer); + forceFillBuffer = false; + if (bytesRead == -1) { + eos = true; + return false; + } else { + buffer.flip(); //The limit is set to the current filePosition and then the filePosition is set to zero + filePosition += bytesRead; + totalReadBytes += bytesRead; + return true; + } + } + + /** + * Decode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n An unsigned 32-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + * @return A signed 32-bit integer. + */ + public static int decodeZigZag32(final int n) { + return (n >>> 1) ^ -(n & 1); + } + + /** + * Decode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n An unsigned 64-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + * @return A signed 64-bit integer. + */ + public static long decodeZigZag64(final long n) { + return (n >>> 1) ^ -(n & 1); + } + + + /** + * Read a raw Varint from the stream. If larger than 32 bits, discard the + * upper bits. + */ + public int readRawVarint32() throws IOException { + byte tmp = buffer.get(); + if (tmp >= 0) { + return tmp; + } + int result = tmp & 0x7f; + if ((tmp = buffer.get()) >= 0) { + result |= tmp << 7; + } else { + result |= (tmp & 0x7f) << 7; + if ((tmp = buffer.get()) >= 0) { + result |= tmp << 14; + } else { + result |= (tmp & 0x7f) << 14; + if ((tmp = buffer.get()) >= 0) { + result |= tmp << 21; + } else { + result |= (tmp & 0x7f) << 21; + result |= (tmp = buffer.get()) << 28; + if (tmp < 0) { + // Discard upper 32 bits. + for (int i = 0; i < 5; i++) { + if (buffer.get() >= 0) { + return result; + } + } + throw new IOException("Invalid Variable int32"); + } + } + } + } + return result; + } + + /** Read a raw Varint from the stream. */ + public long readRawVarint64() throws IOException { + int shift = 0; + long result = 0; + while (shift < 64) { + final byte b = buffer.get(); + result |= (long)(b & 0x7F) << shift; + if ((b & 0x80) == 0) { + return result; + } + shift += 7; + } + throw new IOException("Invalid Variable int64"); + } + + @Override + public Tuple next() throws IOException { + if(eos) return null; + + if (forceFillBuffer || buffer.remaining() < headerSize) { + if (!fillBuffer()) { + return null; + } + } + + // backup the buffer state + int bufferLimit = buffer.limit(); + int recordSize = buffer.getInt(); + int nullFlagSize = buffer.getShort(); + + buffer.limit(buffer.position() + nullFlagSize); + nullFlags.fromByteBuffer(buffer); + // restore the start of record contents + buffer.limit(bufferLimit); + if (buffer.remaining() < (recordSize - headerSize)) { + + //if the buffer reaches the writable size, the buffer increase the record size + reSizeBuffer(recordSize); + + if (!fillBuffer()) { + return null; + } + } + + for (int i = 0; i < columnTypes.length; i++) { + // check if the i'th column is null + if (nullFlags.get(i)) { + tuple.put(i, DatumFactory.createNullDatum()); + continue; + } + + switch (columnTypes[i].getType()) { + case BOOLEAN : + tuple.put(i, DatumFactory.createBool(buffer.get())); + break; + + case BIT : + tuple.put(i, DatumFactory.createBit(buffer.get())); + break; + + case CHAR : + int realLen = readRawVarint32(); + byte[] buf = new byte[realLen]; + buffer.get(buf); + tuple.put(i, DatumFactory.createChar(buf)); + break; + + case INT2 : + tuple.put(i, DatumFactory.createInt2(buffer.getShort())); + break; + + case INT4 : + tuple.put(i, DatumFactory.createInt4(decodeZigZag32(readRawVarint32()))); + break; + + case INT8 : + tuple.put(i, DatumFactory.createInt8(decodeZigZag64(readRawVarint64()))); + break; + + case FLOAT4 : + tuple.put(i, DatumFactory.createFloat4(buffer.getFloat())); + break; + + case FLOAT8 : + tuple.put(i, DatumFactory.createFloat8(buffer.getDouble())); + break; + + case TEXT : { + int len = readRawVarint32(); + byte [] strBytes = new byte[len]; + buffer.get(strBytes); + tuple.put(i, DatumFactory.createText(strBytes)); + break; + } + + case BLOB : { + int len = readRawVarint32(); + byte [] rawBytes = new byte[len]; + buffer.get(rawBytes); + tuple.put(i, DatumFactory.createBlob(rawBytes)); + break; + } + + case PROTOBUF: { + int len = readRawVarint32(); + byte [] rawBytes = new byte[len]; + buffer.get(rawBytes); + + ProtobufDatumFactory factory = ProtobufDatumFactory.get(columnTypes[i]); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(rawBytes); + tuple.put(i, factory.createDatum(builder.build())); + break; + } + + case INET4 : + byte [] ipv4Bytes = new byte[4]; + buffer.get(ipv4Bytes); + tuple.put(i, DatumFactory.createInet4(ipv4Bytes)); + break; + + case DATE: { + int val = buffer.getInt(); + if (val < Integer.MIN_VALUE + 1) { + tuple.put(i, DatumFactory.createNullDatum()); + } else { + tuple.put(i, DatumFactory.createFromInt4(columnTypes[i], val)); + } + break; + } + case TIME: + case TIMESTAMP: { + long val = buffer.getLong(); + if (val < Long.MIN_VALUE + 1) { + tuple.put(i, DatumFactory.createNullDatum()); + } else { + tuple.put(i, DatumFactory.createFromInt8(columnTypes[i], val)); + } + break; + } + case NULL_TYPE: + tuple.put(i, NullDatum.get()); + break; + + default: + } + } + + recordCount++; + + if(filePosition - buffer.remaining() >= endOffset){ + eos = true; + } + return new VTuple(tuple); + } + + private void reSizeBuffer(int writableBytes){ + if (buffer.capacity() - buffer.remaining() < writableBytes) { + buf.setIndex(buffer.position(), buffer.limit()); + buf.markReaderIndex(); + buf.discardSomeReadBytes(); + buf.ensureWritable(writableBytes); + buffer = buf.nioBuffer(0, buf.capacity()); + buffer.limit(buf.writerIndex()); + } + } + + @Override + public void reset() throws IOException { + // reset the buffer + buffer.clear(); + forceFillBuffer = true; + filePosition = fragment.getStartKey(); + channel.position(filePosition); + eos = false; + } + + @Override + public void close() throws IOException { + if(buf != null){ + buffer.clear(); + buffer = null; + + buf.release(); + buf = null; + } + + IOUtils.cleanup(LOG, channel, fis); + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public boolean isSplittable(){ + return false; + } + + @Override + public TableStats getInputStats() { + if(tableStats != null){ + tableStats.setNumRows(recordCount); + tableStats.setReadBytes(totalReadBytes); // actual read bytes (scan + rescan * n) + tableStats.setNumBytes(fragment.getLength()); + } + return tableStats; + } + + @Override + public float getProgress() { + if(eos) { + return 1.0f; + } + + if (filePosition - startOffset == 0) { + return 0.0f; + } else { + return Math.min(1.0f, ((float) filePosition / endOffset)); + } + } + } + + public static class RawFileAppender extends FileAppender { + private FileChannel channel; + private RandomAccessFile randomAccessFile; + private DataType[] columnTypes; + + private ByteBuffer buffer; + private ByteBuf buf; + private BitArray nullFlags; + private int headerSize = 0; + private static final int RECORD_SIZE = 4; + private long pos; + + private TableStatistics stats; + + public RawFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + } + + public void init() throws IOException { + File file; + try { + if (path.toUri().getScheme() != null) { + file = new File(path.toUri()); + } else { + file = new File(path.toString()); + } + } catch (IllegalArgumentException iae) { + throw new IOException(iae); + } + + randomAccessFile = new RandomAccessFile(file, "rw"); + channel = randomAccessFile.getChannel(); + pos = 0; + + columnTypes = new DataType[schema.size()]; + for (int i = 0; i < schema.size(); i++) { + columnTypes[i] = schema.getColumn(i).getDataType(); + } + + buf = BufferPool.directBuffer(64 * StorageUnit.KB); + buffer = buf.nioBuffer(0, buf.capacity()); + + // comput the number of bytes, representing the null flags + + nullFlags = new BitArray(schema.size()); + headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength(); + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + super.init(); + } + + @Override + public long getOffset() throws IOException { + return pos; + } + + private void flushBuffer() throws IOException { + buffer.flip(); + channel.write(buffer); + buffer.clear(); + } + + private boolean flushBufferAndReplace(int recordOffset, int sizeToBeWritten) + throws IOException { + + // if the buffer reaches the limit, + // write the bytes from 0 to the previous record. + if (buffer.remaining() < sizeToBeWritten) { + + int limit = buffer.position(); + buffer.limit(recordOffset); + buffer.flip(); + channel.write(buffer); + buffer.position(recordOffset); + buffer.limit(limit); + buffer.compact(); + + return true; + } else { + return false; + } + } + + /** + * Encode a ZigZag-encoded 32-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n A signed 32-bit integer. + * @return An unsigned 32-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + */ + public static int encodeZigZag32(final int n) { + // Note: the right-shift must be arithmetic + return (n << 1) ^ (n >> 31); + } + + /** + * Encode a ZigZag-encoded 64-bit value. ZigZag encodes signed integers + * into values that can be efficiently encoded with varint. (Otherwise, + * negative values must be sign-extended to 64 bits to be varint encoded, + * thus always taking 10 bytes on the wire.) + * + * @param n A signed 64-bit integer. + * @return An unsigned 64-bit integer, stored in a signed int because + * Java has no explicit unsigned support. + */ + public static long encodeZigZag64(final long n) { + // Note: the right-shift must be arithmetic + return (n << 1) ^ (n >> 63); + } + + /** + * Encode and write a varint. {@code value} is treated as + * unsigned, so it won't be sign-extended if negative. + */ + public void writeRawVarint32(int value) throws IOException { + while (true) { + if ((value & ~0x7F) == 0) { + buffer.put((byte) value); + return; + } else { + buffer.put((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + } + } + + /** + * Compute the number of bytes that would be needed to encode a varint. + * {@code value} is treated as unsigned, so it won't be sign-extended if + * negative. + */ + public static int computeRawVarint32Size(final int value) { + if ((value & (0xffffffff << 7)) == 0) return 1; + if ((value & (0xffffffff << 14)) == 0) return 2; + if ((value & (0xffffffff << 21)) == 0) return 3; + if ((value & (0xffffffff << 28)) == 0) return 4; + return 5; + } + + /** Encode and write a varint. */ + public void writeRawVarint64(long value) throws IOException { + while (true) { + if ((value & ~0x7FL) == 0) { + buffer.put((byte) value); + return; + } else { + buffer.put((byte) ((value & 0x7F) | 0x80)); + value >>>= 7; + } + } + } + + @Override + public void addTuple(Tuple t) throws IOException { + + if (buffer.remaining() < headerSize) { + flushBuffer(); + } + + // skip the row header + int recordOffset = buffer.position(); + buffer.position(recordOffset + headerSize); + // reset the null flags + nullFlags.clear(); + for (int i = 0; i < schema.size(); i++) { + if (enabledStats) { + stats.analyzeField(i, t.get(i)); + } + + if (t.isNull(i)) { + nullFlags.set(i); + continue; + } + + // 8 is the maximum bytes size of all types + if (flushBufferAndReplace(recordOffset, 8)) { + recordOffset = 0; + } + + switch(columnTypes[i].getType()) { + case NULL_TYPE: + nullFlags.set(i); + continue; + + case BOOLEAN: + case BIT: + buffer.put(t.getByte(i)); + break; + + case INT2 : + buffer.putShort(t.getInt2(i)); + break; + + case INT4 : + writeRawVarint32(encodeZigZag32(t.getInt4(i))); + break; + + case INT8 : + writeRawVarint64(encodeZigZag64(t.getInt8(i))); + break; + + case FLOAT4 : + buffer.putFloat(t.getFloat4(i)); + break; + + case FLOAT8 : + buffer.putDouble(t.getFloat8(i)); + break; + + case CHAR: + case TEXT: { + byte [] strBytes = t.getBytes(i); + if (flushBufferAndReplace(recordOffset, strBytes.length + computeRawVarint32Size(strBytes.length))) { + recordOffset = 0; + } + writeRawVarint32(strBytes.length); + buffer.put(strBytes); + break; + } + + case DATE: + buffer.putInt(t.getInt4(i)); + break; + + case TIME: + case TIMESTAMP: + buffer.putLong(t.getInt8(i)); + break; + + case BLOB : { + byte [] rawBytes = t.getBytes(i); + if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { + recordOffset = 0; + } + writeRawVarint32(rawBytes.length); + buffer.put(rawBytes); + break; + } + + case PROTOBUF: { + byte [] rawBytes = t.getBytes(i); + if (flushBufferAndReplace(recordOffset, rawBytes.length + computeRawVarint32Size(rawBytes.length))) { + recordOffset = 0; + } + writeRawVarint32(rawBytes.length); + buffer.put(rawBytes); + break; + } + + case INET4 : + buffer.put(t.getBytes(i)); + break; + + default: + throw new IOException("Cannot support data type: " + columnTypes[i].getType()); + } + } + + // write a record header + int bufferPos = buffer.position(); + buffer.position(recordOffset); + buffer.putInt(bufferPos - recordOffset); + byte [] flags = nullFlags.toArray(); + buffer.putShort((short) flags.length); + buffer.put(flags); + + pos += bufferPos - recordOffset; + buffer.position(bufferPos); + + if (enabledStats) { + stats.incrementRow(); + } + } + + @Override + public void flush() throws IOException { + if(buffer != null){ + flushBuffer(); + } + } + + @Override + public void close() throws IOException { + flush(); + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + if (LOG.isDebugEnabled()) { + LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path); + } + + if(buf != null){ + buffer.clear(); + buffer = null; + + buf.release(); + buf = null; + } + + IOUtils.cleanup(LOG, channel, randomAccessFile); + } + + @Override + public TableStats getStats() { + if (enabledStats) { + stats.setNumBytes(pos); + return stats.getTableStat(); + } else { + return null; + } + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java new file mode 100644 index 0000000..8da6ada --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.BitArray; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; + +public class RowFile { + public static final Log LOG = LogFactory.getLog(RowFile.class); + + private static final int SYNC_ESCAPE = -1; + private static final int SYNC_HASH_SIZE = 16; + private static final int SYNC_SIZE = 4 + SYNC_HASH_SIZE; + private final static int DEFAULT_BUFFER_SIZE = 65535; + public static int SYNC_INTERVAL; + + public static class RowFileScanner extends FileScanner { + private FileSystem fs; + private FSDataInputStream in; + private Tuple tuple; + + private byte[] sync = new byte[SYNC_HASH_SIZE]; + private byte[] checkSync = new byte[SYNC_HASH_SIZE]; + private long start, end; + + private ByteBuffer buffer; + private final int tupleHeaderSize; + private BitArray nullFlags; + private long bufferStartPos; + + public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + + SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, + ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal) * SYNC_SIZE; + + nullFlags = new BitArray(schema.size()); + tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8); + this.start = this.fragment.getStartKey(); + this.end = this.start + this.fragment.getLength(); + } + + public void init() throws IOException { + // set default page size. + fs = fragment.getPath().getFileSystem(conf); + in = fs.open(fragment.getPath()); + buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE * schema.size()); + buffer.flip(); + + readHeader(); + + // find the correct position from the start + if (this.start > in.getPos()) { + long realStart = start > SYNC_SIZE ? (start-SYNC_SIZE) : 0; + in.seek(realStart); + } + bufferStartPos = in.getPos(); + fillBuffer(); + + if (start != 0) { + // TODO: improve + boolean syncFound = false; + while (!syncFound) { + if (buffer.remaining() < SYNC_SIZE) { + fillBuffer(); + } + buffer.mark(); + syncFound = checkSync(); + if (!syncFound) { + buffer.reset(); + buffer.get(); // proceed one byte + } + } + bufferStartPos += buffer.position(); + buffer.compact(); + buffer.flip(); + } + + super.init(); + } + + private void readHeader() throws IOException { + SYNC_INTERVAL = in.readInt(); + StorageUtil.readFully(in, this.sync, 0, SYNC_HASH_SIZE); + } + + /** + * Find the sync from the front of the buffer + * + * @return return true if it succeeds to find the sync. + * @throws java.io.IOException + */ + private boolean checkSync() throws IOException { + buffer.getInt(); // escape + buffer.get(checkSync, 0, SYNC_HASH_SIZE); // sync + return Arrays.equals(checkSync, sync); + } + + private int fillBuffer() throws IOException { + bufferStartPos += buffer.position(); + buffer.compact(); + int remain = buffer.remaining(); + int read = in.read(buffer); + if (read == -1) { + buffer.flip(); + return read; + } else { + int totalRead = read; + if (remain > totalRead) { + read = in.read(buffer); + totalRead += read > 0 ? read : 0; + } + buffer.flip(); + return totalRead; + } + } + + @Override + public Tuple next() throws IOException { + while (buffer.remaining() < SYNC_SIZE) { + if (fillBuffer() < 0) { + return null; + } + } + + buffer.mark(); + if (!checkSync()) { + buffer.reset(); + } else { + if (bufferStartPos + buffer.position() > end) { + return null; + } + } + + while (buffer.remaining() < tupleHeaderSize) { + if (fillBuffer() < 0) { + return null; + } + } + + int i; + tuple = new VTuple(schema.size()); + + int nullFlagSize = buffer.getShort(); + byte[] nullFlagBytes = new byte[nullFlagSize]; + buffer.get(nullFlagBytes, 0, nullFlagSize); + nullFlags = new BitArray(nullFlagBytes); + int tupleSize = buffer.getShort(); + + while (buffer.remaining() < (tupleSize)) { + if (fillBuffer() < 0) { + return null; + } + } + + Datum datum; + Column col; + for (i = 0; i < schema.size(); i++) { + if (!nullFlags.get(i)) { + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case BOOLEAN : + datum = DatumFactory.createBool(buffer.get()); + tuple.put(i, datum); + break; + + case BIT: + datum = DatumFactory.createBit(buffer.get()); + tuple.put(i, datum ); + break; + + case CHAR : + int realLen = buffer.getInt(); + byte[] buf = new byte[col.getDataType().getLength()]; + buffer.get(buf); + byte[] charBuf = Arrays.copyOf(buf, realLen); + tuple.put(i, DatumFactory.createChar(charBuf)); + break; + + case INT2 : + datum = DatumFactory.createInt2(buffer.getShort()); + tuple.put(i, datum ); + break; + + case INT4 : + datum = DatumFactory.createInt4(buffer.getInt()); + tuple.put(i, datum ); + break; + + case INT8 : + datum = DatumFactory.createInt8(buffer.getLong()); + tuple.put(i, datum ); + break; + + case FLOAT4 : + datum = DatumFactory.createFloat4(buffer.getFloat()); + tuple.put(i, datum); + break; + + case FLOAT8 : + datum = DatumFactory.createFloat8(buffer.getDouble()); + tuple.put(i, datum); + break; + + case TEXT: + short bytelen = buffer.getShort(); + byte[] strbytes = new byte[bytelen]; + buffer.get(strbytes, 0, bytelen); + datum = DatumFactory.createText(strbytes); + tuple.put(i, datum); + break; + + case BLOB: + short bytesLen = buffer.getShort(); + byte [] bytesBuf = new byte[bytesLen]; + buffer.get(bytesBuf); + datum = DatumFactory.createBlob(bytesBuf); + tuple.put(i, datum); + break; + + case INET4 : + byte[] ipv4 = new byte[4]; + buffer.get(ipv4, 0, 4); + datum = DatumFactory.createInet4(ipv4); + tuple.put(i, datum); + break; + + default: + break; + } + } else { + tuple.put(i, DatumFactory.createNullDatum()); + } + } + return tuple; + } + + @Override + public void reset() throws IOException { + init(); + } + + @Override + public void close() throws IOException { + if (in != null) { + in.close(); + } + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public boolean isSplittable(){ + return true; + } + } + + public static class RowFileAppender extends FileAppender { + private FSDataOutputStream out; + private long lastSyncPos; + private FileSystem fs; + private byte[] sync; + private ByteBuffer buffer; + + private BitArray nullFlags; + // statistics + private TableStatistics stats; + + public RowFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path workDir) + throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + } + + public void init() throws IOException { + SYNC_INTERVAL = conf.getInt(ConfVars.ROWFILE_SYNC_INTERVAL.varname, + ConfVars.ROWFILE_SYNC_INTERVAL.defaultIntVal); + fs = path.getFileSystem(conf); + + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + + sync = new byte[SYNC_HASH_SIZE]; + lastSyncPos = 0; + + out = fs.create(path); + + MessageDigest md; + try { + md = MessageDigest.getInstance("MD5"); + md.update((path.toString()+System.currentTimeMillis()).getBytes()); + sync = md.digest(); + } catch (NoSuchAlgorithmException e) { + LOG.error(e); + } + + writeHeader(); + + buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); + + nullFlags = new BitArray(schema.size()); + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + } + + private void writeHeader() throws IOException { + out.writeInt(SYNC_INTERVAL); + out.write(sync); + out.flush(); + lastSyncPos = out.getPos(); + } + + @Override + public void addTuple(Tuple t) throws IOException { + checkAndWriteSync(); + Column col; + + buffer.clear(); + nullFlags.clear(); + + for (int i = 0; i < schema.size(); i++) { + if (enabledStats) { + stats.analyzeField(i, t.get(i)); + } + + if (t.isNull(i)) { + nullFlags.set(i); + } else { + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case BOOLEAN: + buffer.put(t.get(i).asByte()); + break; + case BIT: + buffer.put(t.get(i).asByte()); + break; + case CHAR: + byte[] src = t.get(i).asByteArray(); + byte[] dst = Arrays.copyOf(src, col.getDataType().getLength()); + buffer.putInt(src.length); + buffer.put(dst); + break; + case TEXT: + byte [] strbytes = t.get(i).asByteArray(); + buffer.putShort((short)strbytes.length); + buffer.put(strbytes, 0, strbytes.length); + break; + case INT2: + buffer.putShort(t.get(i).asInt2()); + break; + case INT4: + buffer.putInt(t.get(i).asInt4()); + break; + case INT8: + buffer.putLong(t.get(i).asInt8()); + break; + case FLOAT4: + buffer.putFloat(t.get(i).asFloat4()); + break; + case FLOAT8: + buffer.putDouble(t.get(i).asFloat8()); + break; + case BLOB: + byte [] bytes = t.get(i).asByteArray(); + buffer.putShort((short)bytes.length); + buffer.put(bytes); + break; + case INET4: + buffer.put(t.get(i).asByteArray()); + break; + case INET6: + buffer.put(t.get(i).asByteArray()); + break; + case NULL_TYPE: + nullFlags.set(i); + break; + default: + break; + } + } + } + + byte[] bytes = nullFlags.toArray(); + out.writeShort(bytes.length); + out.write(bytes); + + bytes = buffer.array(); + int dataLen = buffer.position(); + out.writeShort(dataLen); + out.write(bytes, 0, dataLen); + + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + @Override + public long getOffset() throws IOException { + return out.getPos(); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + if (out != null) { + if (enabledStats) { + stats.setNumBytes(out.getPos()); + } + sync(); + out.flush(); + out.close(); + } + } + + private void sync() throws IOException { + if (lastSyncPos != out.getPos()) { + out.writeInt(SYNC_ESCAPE); + out.write(sync); + lastSyncPos = out.getPos(); + } + } + + private void checkAndWriteSync() throws IOException { + if (out.getPos() >= lastSyncPos + SYNC_INTERVAL) { + sync(); + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java new file mode 100644 index 0000000..3579674 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/SplitLineReader.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; +import java.io.InputStream; + +public class SplitLineReader extends LineReader { + public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) { + super(in, recordDelimiterBytes); + } + + public SplitLineReader(InputStream in, Configuration conf, + byte[] recordDelimiterBytes) throws IOException { + super(in, conf, recordDelimiterBytes); + } + + public boolean needAdditionalRecordAfterSplit() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java new file mode 100644 index 0000000..dbb8bd0 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.FileAppender; +import org.apache.tajo.storage.TableStatistics; +import org.apache.tajo.storage.Tuple; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * FileAppender for writing to Avro files. + */ +public class AvroAppender extends FileAppender { + private TableStatistics stats; + private Schema avroSchema; + private List<Schema.Field> avroFields; + private DataFileWriter<GenericRecord> dataFileWriter; + + /** + * Creates a new AvroAppender. + * + * @param conf Configuration properties. + * @param taskAttemptId The task attempt id + * @param schema The table schema. + * @param meta The table metadata. + * @param workDir The path of the Parquet file to write to. + */ + public AvroAppender(Configuration conf, + QueryUnitAttemptId taskAttemptId, + org.apache.tajo.catalog.Schema schema, + TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + } + + /** + * Initializes the Appender. + */ + public void init() throws IOException { + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.toString()); + } + FSDataOutputStream outputStream = fs.create(path); + + avroSchema = AvroUtil.getAvroSchema(meta, conf); + avroFields = avroSchema.getFields(); + + DatumWriter<GenericRecord> datumWriter = + new GenericDatumWriter<GenericRecord>(avroSchema); + dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); + dataFileWriter.create(avroSchema, outputStream); + + if (enabledStats) { + this.stats = new TableStatistics(schema); + } + super.init(); + } + + /** + * Gets the current offset. Tracking offsets is currenly not implemented, so + * this method always returns 0. + * + * @return 0 + */ + @Override + public long getOffset() throws IOException { + return 0; + } + + private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { + if (tuple.get(i) instanceof NullDatum) { + return null; + } + switch (avroType) { + case NULL: + return null; + case BOOLEAN: + return tuple.getBool(i); + case INT: + return tuple.getInt4(i); + case LONG: + return tuple.getInt8(i); + case FLOAT: + return tuple.getFloat4(i); + case DOUBLE: + return tuple.getFloat8(i); + case BYTES: + case FIXED: + return ByteBuffer.wrap(tuple.getBytes(i)); + case STRING: + return tuple.getText(i); + default: + throw new RuntimeException("Unknown primitive type."); + } + } + + /** + * Write a Tuple to the Avro file. + * + * @param tuple The Tuple to write. + */ + @Override + public void addTuple(Tuple tuple) throws IOException { + GenericRecord record = new GenericData.Record(avroSchema); + for (int i = 0; i < schema.size(); ++i) { + Column column = schema.getColumn(i); + if (enabledStats) { + stats.analyzeField(i, tuple.get(i)); + } + Object value; + Schema.Field avroField = avroFields.get(i); + Schema.Type avroType = avroField.schema().getType(); + switch (avroType) { + case NULL: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BYTES: + case STRING: + case FIXED: + value = getPrimitive(tuple, i, avroType); + break; + case RECORD: + throw new RuntimeException("Avro RECORD not supported."); + case ENUM: + throw new RuntimeException("Avro ENUM not supported."); + case MAP: + throw new RuntimeException("Avro MAP not supported."); + case UNION: + List<Schema> schemas = avroField.schema().getTypes(); + if (schemas.size() != 2) { + throw new RuntimeException("Avro UNION not supported."); + } + if (schemas.get(0).getType().equals(Schema.Type.NULL)) { + value = getPrimitive(tuple, i, schemas.get(1).getType()); + } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { + value = getPrimitive(tuple, i, schemas.get(0).getType()); + } else { + throw new RuntimeException("Avro UNION not supported."); + } + break; + default: + throw new RuntimeException("Unknown type: " + avroType); + } + record.put(i, value); + } + dataFileWriter.append(record); + + if (enabledStats) { + stats.incrementRow(); + } + } + + /** + * Flushes the current state of the file. + */ + @Override + public void flush() throws IOException { + dataFileWriter.flush(); + } + + /** + * Closes the Appender. + */ + @Override + public void close() throws IOException { + dataFileWriter.close(); + } + + /** + * If table statistics is enabled, retrieve the table statistics. + * + * @return Table statistics if enabled or null otherwise. + */ + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java new file mode 100644 index 0000000..51594df --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.TajoDataTypes.DataType; +import org.apache.tajo.datum.*; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * FileScanner for reading Avro files + */ +public class AvroScanner extends FileScanner { + private Schema avroSchema; + private List<Schema.Field> avroFields; + private DataFileReader<GenericRecord> dataFileReader; + private int[] projectionMap; + + /** + * Creates a new AvroScanner. + * + * @param conf + * @param schema + * @param meta + * @param fragment + */ + public AvroScanner(Configuration conf, + final org.apache.tajo.catalog.Schema schema, + final TableMeta meta, final Fragment fragment) { + super(conf, schema, meta, fragment); + } + + /** + * Initializes the AvroScanner. + */ + @Override + public void init() throws IOException { + if (targets == null) { + targets = schema.toArray(); + } + prepareProjection(targets); + + avroSchema = AvroUtil.getAvroSchema(meta, conf); + avroFields = avroSchema.getFields(); + + DatumReader<GenericRecord> datumReader = + new GenericDatumReader<GenericRecord>(avroSchema); + SeekableInput input = new FsInput(fragment.getPath(), conf); + dataFileReader = new DataFileReader<GenericRecord>(input, datumReader); + super.init(); + } + + private void prepareProjection(Column[] targets) { + projectionMap = new int[targets.length]; + for (int i = 0; i < targets.length; ++i) { + projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + } + + private static String fromAvroString(Object value) { + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8)value; + return utf8.toString(); + } + return value.toString(); + } + + private static Schema getNonNull(Schema schema) { + if (!schema.getType().equals(Schema.Type.UNION)) { + return schema; + } + List<Schema> schemas = schema.getTypes(); + if (schemas.size() != 2) { + return schema; + } + if (schemas.get(0).getType().equals(Schema.Type.NULL)) { + return schemas.get(1); + } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { + return schemas.get(0); + } else { + return schema; + } + } + + private Datum convertInt(Object value, TajoDataTypes.Type tajoType) { + int intValue = (Integer)value; + switch (tajoType) { + case BIT: + return DatumFactory.createBit((byte)(intValue & 0xff)); + case INT2: + return DatumFactory.createInt2((short)intValue); + default: + return DatumFactory.createInt4(intValue); + } + } + + private Datum convertBytes(Object value, TajoDataTypes.Type tajoType, + DataType dataType) { + ByteBuffer buffer = (ByteBuffer)value; + byte[] bytes = new byte[buffer.capacity()]; + buffer.get(bytes, 0, bytes.length); + switch (tajoType) { + case INET4: + return DatumFactory.createInet4(bytes); + case PROTOBUF: + try { + ProtobufDatumFactory factory = + ProtobufDatumFactory.get(dataType.getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(bytes); + return factory.createDatum(builder); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + default: + return new BlobDatum(bytes); + } + } + + private Datum convertString(Object value, TajoDataTypes.Type tajoType) { + switch (tajoType) { + case CHAR: + return DatumFactory.createChar(fromAvroString(value)); + default: + return DatumFactory.createText(fromAvroString(value)); + } + } + + /** + * Reads the next Tuple from the Avro file. + * + * @return The next Tuple from the Avro file or null if end of file is + * reached. + */ + @Override + public Tuple next() throws IOException { + if (!dataFileReader.hasNext()) { + return null; + } + + Tuple tuple = new VTuple(schema.size()); + GenericRecord record = dataFileReader.next(); + for (int i = 0; i < projectionMap.length; ++i) { + int columnIndex = projectionMap[i]; + Object value = record.get(columnIndex); + if (value == null) { + tuple.put(columnIndex, NullDatum.get()); + continue; + } + + // Get Avro type. + Schema.Field avroField = avroFields.get(columnIndex); + Schema nonNullAvroSchema = getNonNull(avroField.schema()); + Schema.Type avroType = nonNullAvroSchema.getType(); + + // Get Tajo type. + Column column = schema.getColumn(columnIndex); + DataType dataType = column.getDataType(); + TajoDataTypes.Type tajoType = dataType.getType(); + switch (avroType) { + case NULL: + tuple.put(columnIndex, NullDatum.get()); + break; + case BOOLEAN: + tuple.put(columnIndex, DatumFactory.createBool((Boolean)value)); + break; + case INT: + tuple.put(columnIndex, convertInt(value, tajoType)); + break; + case LONG: + tuple.put(columnIndex, DatumFactory.createInt8((Long)value)); + break; + case FLOAT: + tuple.put(columnIndex, DatumFactory.createFloat4((Float)value)); + break; + case DOUBLE: + tuple.put(columnIndex, DatumFactory.createFloat8((Double)value)); + break; + case BYTES: + tuple.put(columnIndex, convertBytes(value, tajoType, dataType)); + break; + case STRING: + tuple.put(columnIndex, convertString(value, tajoType)); + break; + case RECORD: + throw new RuntimeException("Avro RECORD not supported."); + case ENUM: + throw new RuntimeException("Avro ENUM not supported."); + case MAP: + throw new RuntimeException("Avro MAP not supported."); + case UNION: + throw new RuntimeException("Avro UNION not supported."); + case FIXED: + tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes())); + break; + default: + throw new RuntimeException("Unknown type."); + } + } + return tuple; + } + + /** + * Resets the scanner + */ + @Override + public void reset() throws IOException { + } + + /** + * Closes the scanner. + */ + @Override + public void close() throws IOException { + if (dataFileReader != null) { + dataFileReader.close(); + } + } + + /** + * Returns whether this scanner is projectable. + * + * @return true + */ + @Override + public boolean isProjectable() { + return true; + } + + /** + * Returns whether this scanner is selectable. + * + * @return false + */ + @Override + public boolean isSelectable() { + return false; + } + + /** + * Returns whether this scanner is splittable. + * + * @return false + */ + @Override + public boolean isSplittable() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java new file mode 100644 index 0000000..0d14c3d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.avro; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.StorageConstants; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +public class AvroUtil { + public static Schema getAvroSchema(TableMeta meta, Configuration conf) + throws IOException { + + boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL); + boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL); + if (!isSchemaLiteral && !isSchemaUrl) { + throw new RuntimeException("No Avro schema for table."); + } + if (isSchemaLiteral) { + String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL); + return new Schema.Parser().parse(schema); + } + + String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL); + if (schemaURL.toLowerCase().startsWith("http")) { + return getAvroSchemaFromHttp(schemaURL); + } else { + return getAvroSchemaFromFileSystem(schemaURL, conf); + } + } + + public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException { + InputStream inputStream = new URL(schemaURL).openStream(); + + try { + return new Schema.Parser().parse(inputStream); + } finally { + IOUtils.closeStream(inputStream); + } + } + + public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException { + Path schemaPath = new Path(schemaURL); + FileSystem fs = schemaPath.getFileSystem(conf); + FSDataInputStream inputStream = fs.open(schemaPath); + + try { + return new Schema.Parser().parse(inputStream); + } finally { + IOUtils.closeStream(inputStream); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java new file mode 100644 index 0000000..40d1545 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/package-info.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * <p> + * Provides read and write support for Avro files. Avro schemas are + * converted to Tajo schemas according to the following mapping of Avro + * and Tajo types: + * </p> + * + * <table> + * <tr> + * <th>Avro type</th> + * <th>Tajo type</th> + * </tr> + * <tr> + * <td>NULL</td> + * <td>NULL_TYPE</td> + * </tr> + * <tr> + * <td>BOOLEAN</td> + * <td>BOOLEAN</td> + * </tr> + * <tr> + * <td>INT</td> + * <td>INT4</td> + * </tr> + * <tr> + * <td>LONG</td> + * <td>INT8</td> + * </tr> + * <tr> + * <td>FLOAT</td> + * <td>FLOAT4</td> + * </tr> + * <tr> + * <td>DOUBLE</td> + * <td>FLOAT8</td> + * </tr> + * <tr> + * <td>BYTES</td> + * <td>BLOB</td> + * </tr> + * <tr> + * <td>STRING</td> + * <td>TEXT</td> + * </tr> + * <tr> + * <td>FIXED</td> + * <td>BLOB</td> + * </tr> + * <tr> + * <td>RECORD</td> + * <td>Not currently supported</td> + * </tr> + * <tr> + * <td>ENUM</td> + * <td>Not currently supported.</td> + * </tr> + * <tr> + * <td>MAP</td> + * <td>Not currently supported.</td> + * </tr> + * <tr> + * <td>UNION</td> + * <td>Not currently supported.</td> + * </tr> + * </table> + */ + +package org.apache.tajo.storage.avro; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java new file mode 100644 index 0000000..ac413ca --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.fragment; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.storage.StorageFragmentProtos.*; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + +public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable { + @Expose private String tableName; // required + @Expose private Path uri; // required + @Expose public Long startOffset; // required + @Expose public Long length; // required + + private String[] hosts; // Datanode hostnames + @Expose private int[] diskIds; + + public FileFragment(ByteString raw) throws InvalidProtocolBufferException { + FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); + builder.mergeFrom(raw); + builder.build(); + init(builder.build()); + } + + public FileFragment(String tableName, Path uri, BlockLocation blockLocation) + throws IOException { + this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); + } + + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) { + this.set(tableName, uri, start, length, hosts, diskIds); + } + // Non splittable + public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { + this.set(tableName, uri, start, length, hosts, null); + } + + public FileFragment(String fragmentId, Path path, long start, long length) { + this.set(fragmentId, path, start, length, null, null); + } + + public FileFragment(FileFragmentProto proto) { + init(proto); + } + + private void init(FileFragmentProto proto) { + int[] diskIds = new int[proto.getDiskIdsList().size()]; + int i = 0; + for(Integer eachValue: proto.getDiskIdsList()) { + diskIds[i++] = eachValue; + } + this.set(proto.getId(), new Path(proto.getPath()), + proto.getStartOffset(), proto.getLength(), + proto.getHostsList().toArray(new String[]{}), + diskIds); + } + + private void set(String tableName, Path path, long start, + long length, String[] hosts, int[] diskIds) { + this.tableName = tableName; + this.uri = path; + this.startOffset = start; + this.length = length; + this.hosts = hosts; + this.diskIds = diskIds; + } + + + /** + * Get the list of hosts (hostname) hosting this block + */ + public String[] getHosts() { + if (hosts == null) { + this.hosts = new String[0]; + } + return hosts; + } + + /** + * Get the list of Disk Ids + * Unknown disk is -1. Others 0 ~ N + */ + public int[] getDiskIds() { + if (diskIds == null) { + this.diskIds = new int[getHosts().length]; + Arrays.fill(this.diskIds, -1); + } + return diskIds; + } + + public void setDiskIds(int[] diskIds){ + this.diskIds = diskIds; + } + + @Override + public String getTableName() { + return this.tableName; + } + + public Path getPath() { + return this.uri; + } + + public void setPath(Path path) { + this.uri = path; + } + + public Long getStartKey() { + return this.startOffset; + } + + @Override + public String getKey() { + return this.uri.toString(); + } + + @Override + public long getLength() { + return this.length; + } + + @Override + public boolean isEmpty() { + return this.length <= 0; + } + /** + * + * The offset range of tablets <b>MUST NOT</b> be overlapped. + * + * @param t + * @return If the table paths are not same, return -1. + */ + @Override + public int compareTo(FileFragment t) { + if (getPath().equals(t.getPath())) { + long diff = this.getStartKey() - t.getStartKey(); + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } else { + return 0; + } + } else { + return -1; + } + } + + @Override + public boolean equals(Object o) { + if (o instanceof FileFragment) { + FileFragment t = (FileFragment) o; + if (getPath().equals(t.getPath()) + && TUtil.checkEquals(t.getStartKey(), this.getStartKey()) + && TUtil.checkEquals(t.getLength(), this.getLength())) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, uri, startOffset, length); + } + + public Object clone() throws CloneNotSupportedException { + FileFragment frag = (FileFragment) super.clone(); + frag.tableName = tableName; + frag.uri = uri; + frag.diskIds = diskIds; + frag.hosts = hosts; + + return frag; + } + + @Override + public String toString() { + return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": " + +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " + + getLength() + "}" ; + } + + public FragmentProto getProto() { + FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); + builder.setId(this.tableName); + builder.setStartOffset(this.startOffset); + builder.setLength(this.length); + builder.setPath(this.uri.toString()); + if(diskIds != null) { + List<Integer> idList = new ArrayList<Integer>(); + for(int eachId: diskIds) { + idList.add(eachId); + } + builder.addAllDiskIds(idList); + } + + if(hosts != null) { + builder.addAllHosts(TUtil.newList(hosts)); + } + + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(this.tableName); + fragmentBuilder.setStoreType(StoreType.CSV.name()); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + return fragmentBuilder.build(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java new file mode 100644 index 0000000..a6af19b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexMethod.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.storage.TupleComparator; + +import java.io.IOException; + +public interface IndexMethod { + IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, + TupleComparator comparator) throws IOException; + IndexReader getIndexReader(final Path fileName, Schema keySchema, + TupleComparator comparator) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java new file mode 100644 index 0000000..3ae5c9d --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexReader.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.index; + +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public interface IndexReader { + + /** + * Find the offset corresponding to key which is equal to a given key. + * + * @param key + * @return + * @throws java.io.IOException + */ + public long find(Tuple key) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java new file mode 100644 index 0000000..04738f8 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/IndexWriter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.index; + +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public abstract class IndexWriter { + + public abstract void write(Tuple key, long offset) throws IOException; + + public abstract void close() throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java new file mode 100644 index 0000000..0c07b4a --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage.index; + +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public interface OrderIndexReader extends IndexReader { + /** + * Find the offset corresponding to key which is equal to or greater than + * a given key. + * + * @param key to find + * @return + * @throws java.io.IOException + */ + public long find(Tuple key, boolean nextKey) throws IOException; + + /** + * Return the next offset from the latest find or next offset + * @return + * @throws java.io.IOException + */ + public long next() throws IOException; +}
