http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java deleted file mode 100644 index 5ddc3fb..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ /dev/null @@ -1,586 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.*; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; -import org.apache.tajo.util.BytesUtils; - -import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; - -public class CSVFile { - - public static final byte LF = '\n'; - public static int EOF = -1; - - private static final Log LOG = LogFactory.getLog(CSVFile.class); - - public static class CSVAppender extends FileAppender { - private final TableMeta meta; - private final Schema schema; - private final int columnNum; - private final FileSystem fs; - private FSDataOutputStream fos; - private DataOutputStream outputStream; - private CompressionOutputStream deflateFilter; - private char delimiter; - private TableStatistics stats = null; - private Compressor compressor; - private CompressionCodecFactory codecFactory; - private CompressionCodec codec; - private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; - private int bufferedBytes = 0; - private long pos = 0; - private boolean isShuffle; - - private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); - private SerializerDeserializer serde; - - public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException { - super(conf, schema, meta, path); - this.fs = path.getFileSystem(conf); - this.meta = meta; - this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava( - this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - - this.columnNum = schema.size(); - - String nullCharacters = StringEscapeUtils.unescapeJava( - this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT)); - - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - @Override - public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - - //determine the intermediate file type - String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname, - TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal); - if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) { - isShuffle = true; - } else { - isShuffle = false; - } - - if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { - String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); - codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByClassName(codecName); - compressor = CodecPool.getCompressor(codec); - if(compressor != null) compressor.reset(); //builtin gzip is null - - String extension = codec.getDefaultExtension(); - compressedPath = path.suffix(extension); - - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); - deflateFilter = codec.createOutputStream(fos, compressor); - outputStream = new DataOutputStream(deflateFilter); - - } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); - outputStream = new DataOutputStream(new BufferedOutputStream(fos)); - } - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - try { - //It will be remove, because we will add custom serde in textfile - String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, - TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - os.reset(); - pos = fos.getPos(); - bufferedBytes = 0; - super.init(); - } - - - @Override - public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; - - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars); - - if(columnNum - 1 > i){ - os.write((byte) delimiter); - rowBytes += 1; - } - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); - } - } - os.write(LF); - rowBytes += 1; - - pos += rowBytes; - bufferedBytes += rowBytes; - if(bufferedBytes > BUFFER_SIZE){ - flushBuffer(); - } - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - private void flushBuffer() throws IOException { - if(os.getLength() > 0) { - os.writeTo(outputStream); - os.reset(); - bufferedBytes = 0; - } - } - @Override - public long getOffset() throws IOException { - return pos; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - - try { - flush(); - - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - - if(deflateFilter != null) { - deflateFilter.finish(); - deflateFilter.resetState(); - deflateFilter = null; - } - - os.close(); - } finally { - IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - - public boolean isCompress() { - return compressor != null; - } - - public String getExtension() { - return codec != null ? codec.getDefaultExtension() : ""; - } - } - - public static class CSVScanner extends FileScanner implements SeekableScanner { - public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - factory = new CompressionCodecFactory(conf); - codec = factory.getCodec(fragment.getPath()); - if (codec == null || codec instanceof SplittableCompressionCodec) { - splittable = true; - } - - //Delimiter - this.delimiter = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_DELIMITER, - meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0); - - String nullCharacters = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_NULL, - meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT))); - - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - private final static int DEFAULT_PAGE_SIZE = 256 * 1024; - private char delimiter; - private FileSystem fs; - private FSDataInputStream fis; - private InputStream is; //decompressd stream - private CompressionCodecFactory factory; - private CompressionCodec codec; - private Decompressor decompressor; - private Seekable filePosition; - private boolean splittable = false; - private long startOffset, end, pos; - private int currentIdx = 0, validIdx = 0, recordCount = 0; - private int[] targetColumnIndexes; - private boolean eof = false; - private final byte[] nullChars; - private SplitLineReader reader; - private ArrayList<Long> fileOffsets; - private ArrayList<Integer> rowLengthList; - private ArrayList<Integer> startOffsets; - private NonSyncByteArrayOutputStream buffer; - private SerializerDeserializer serde; - - @Override - public void init() throws IOException { - fileOffsets = new ArrayList<Long>(); - rowLengthList = new ArrayList<Integer>(); - startOffsets = new ArrayList<Integer>(); - buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); - - // FileFragment information - if(fs == null) { - fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath()); - } - if(fis == null) fis = fs.open(fragment.getPath()); - - recordCount = 0; - pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getEndKey(); - - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - if (codec instanceof SplittableCompressionCodec) { - SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream( - fis, decompressor, startOffset, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - - reader = new CompressedSplitLineReader(cIn, conf, null); - startOffset = cIn.getAdjustedStart(); - end = cIn.getAdjustedEnd(); - filePosition = cIn; - is = cIn; - } else { - is = new DataInputStream(codec.createInputStream(fis, decompressor)); - reader = new SplitLineReader(is, null); - filePosition = fis; - } - } else { - fis.seek(startOffset); - filePosition = fis; - is = fis; - reader = new SplitLineReader(is, null); - } - - if (targets == null) { - targets = schema.toArray(); - } - - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - - try { - //FIXME - String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, - TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - super.init(); - Arrays.sort(targetColumnIndexes); - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end + - "," + fs.getFileStatus(fragment.getPath()).getLen()); - } - - if (startOffset != 0) { - pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos)); - } - eof = false; - page(); - } - - private int maxBytesToConsume(long pos) { - return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); - } - - private long fragmentable() throws IOException { - return end - getFilePosition(); - } - - private long getFilePosition() throws IOException { - long retVal; - if (isCompress()) { - retVal = filePosition.getPos(); - } else { - retVal = pos; - } - return retVal; - } - - private void page() throws IOException { -// // Index initialization - currentIdx = 0; - validIdx = 0; - int currentBufferPos = 0; - int bufferedSize = 0; - - buffer.reset(); - startOffsets.clear(); - rowLengthList.clear(); - fileOffsets.clear(); - - if(eof) { - return; - } - - while (DEFAULT_PAGE_SIZE >= bufferedSize){ - - int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE); - - if(ret == 0){ - break; - } else { - fileOffsets.add(pos); - pos += ret; - startOffsets.add(currentBufferPos); - currentBufferPos += rowLengthList.get(rowLengthList.size() - 1); - bufferedSize += ret; - validIdx++; - recordCount++; - } - - if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){ - eof = true; - break; - } - } - if (tableStats != null) { - tableStats.setReadBytes(pos - startOffset); - tableStats.setNumRows(recordCount); - } - } - - @Override - public float getProgress() { - try { - if(eof) { - return 1.0f; - } - long filePos = getFilePosition(); - if (startOffset == filePos) { - return 0.0f; - } else { - long readBytes = filePos - startOffset; - long remainingBytes = Math.max(end - filePos, 0); - return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return 0.0f; - } - } - - @Override - public Tuple next() throws IOException { - try { - if (currentIdx == validIdx) { - if (eof) { - return null; - } else { - page(); - - if(currentIdx == validIdx){ - return null; - } - } - } - - long offset = -1; - if(!isCompress()){ - offset = fileOffsets.get(currentIdx); - } - - byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), - rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); - currentIdx++; - return new LazyTuple(schema, cells, offset, nullChars, serde); - } catch (Throwable t) { - LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t); - LOG.error("Tuple list current index: " + currentIdx, t); - throw new IOException(t); - } - } - - private boolean isCompress() { - return codec != null; - } - - @Override - public void reset() throws IOException { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - - init(); - } - - @Override - public void close() throws IOException { - try { - if (tableStats != null) { - tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - } - - IOUtils.cleanup(LOG, reader, is, fis); - fs = null; - is = null; - fis = null; - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner processed record:" + recordCount); - } - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public void seek(long offset) throws IOException { - if(isCompress()) throw new UnsupportedException(); - - int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset); - - if (tupleIndex > -1) { - this.currentIdx = tupleIndex; - } else if (isSplittable() && end >= offset || startOffset <= offset) { - eof = false; - fis.seek(offset); - pos = offset; - reader.reset(); - this.currentIdx = 0; - this.validIdx = 0; - // pageBuffer(); - } else { - throw new IOException("invalid offset " + - " < start : " + startOffset + " , " + - " end : " + end + " , " + - " filePos : " + filePosition.getPos() + " , " + - " input offset : " + offset + " >"); - } - } - - @Override - public long getNextOffset() throws IOException { - if(isCompress()) throw new UnsupportedException(); - - if (this.currentIdx == this.validIdx) { - if (fragmentable() <= 0) { - return -1; - } else { - page(); - if(currentIdx == validIdx) return -1; - } - } - return fileOffsets.get(currentIdx); - } - - @Override - public boolean isSplittable(){ - return splittable; - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java deleted file mode 100644 index 4f58e68..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * Line reader for compressed splits - * - * Reading records from a compressed split is tricky, as the - * LineRecordReader is using the reported compressed input stream - * position directly to determine when a split has ended. In addition the - * compressed input stream is usually faking the actual byte position, often - * updating it only after the first compressed block after the split is - * accessed. - * - * Depending upon where the last compressed block of the split ends relative - * to the record delimiters it can be easy to accidentally drop the last - * record or duplicate the last record between this split and the next. - * - * Split end scenarios: - * - * 1) Last block of split ends in the middle of a record - * Nothing special that needs to be done here, since the compressed input - * stream will report a position after the split end once the record - * is fully read. The consumer of the next split will discard the - * partial record at the start of the split normally, and no data is lost - * or duplicated between the splits. - * - * 2) Last block of split ends in the middle of a delimiter - * The line reader will continue to consume bytes into the next block to - * locate the end of the delimiter. If a custom delimiter is being used - * then the next record must be read by this split or it will be dropped. - * The consumer of the next split will not recognize the partial - * delimiter at the beginning of its split and will discard it along with - * the next record. - * - * However for the default delimiter processing there is a special case - * because CR, LF, and CRLF are all valid record delimiters. If the - * block ends with a CR then the reader must peek at the next byte to see - * if it is an LF and therefore part of the same record delimiter. - * Peeking at the next byte is an access to the next block and triggers - * the stream to report the end of the split. There are two cases based - * on the next byte: - * - * A) The next byte is LF - * The split needs to end after the current record is returned. The - * consumer of the next split will discard the first record, which - * is degenerate since LF is itself a delimiter, and start consuming - * records after that byte. If the current split tries to read - * another record then the record will be duplicated between splits. - * - * B) The next byte is not LF - * The current record will be returned but the stream will report - * the split has ended due to the peek into the next block. If the - * next record is not read then it will be lost, as the consumer of - * the next split will discard it before processing subsequent - * records. Therefore the next record beyond the reported split end - * must be consumed by this split to avoid data loss. - * - * 3) Last block of split ends at the beginning of a delimiter - * This is equivalent to case 1, as the reader will consume bytes into - * the next block and trigger the end of the split. No further records - * should be read as the consumer of the next split will discard the - * (degenerate) record at the beginning of its split. - * - * 4) Last block of split ends at the end of a delimiter - * Nothing special needs to be done here. The reader will not start - * examining the bytes into the next block until the next record is read, - * so the stream will not report the end of the split just yet. Once the - * next record is read then the next block will be accessed and the - * stream will indicate the end of the split. The consumer of the next - * split will correctly discard the first record of its split, and no - * data is lost or duplicated. - * - * If the default delimiter is used and the block ends at a CR then this - * is treated as case 2 since the reader does not yet know without - * looking at subsequent bytes whether the delimiter has ended. - * - * NOTE: It is assumed that compressed input streams *never* return bytes from - * multiple compressed blocks from a single read. Failure to do so will - * violate the buffering performed by this class, as it will access - * bytes into the next block after the split before returning all of the - * records from the previous block. - */ - -public class CompressedSplitLineReader extends SplitLineReader { - SplitCompressionInputStream scin; - private boolean usingCRLF; - private boolean needAdditionalRecord = false; - private boolean finished = false; - - public CompressedSplitLineReader(SplitCompressionInputStream in, - Configuration conf, - byte[] recordDelimiterBytes) - throws IOException { - super(in, conf, recordDelimiterBytes); - scin = in; - usingCRLF = (recordDelimiterBytes == null); - } - - @Override - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - int bytesRead = in.read(buffer); - - // If the split ended in the middle of a record delimiter then we need - // to read one additional record, as the consumer of the next split will - // not recognize the partial delimiter as a record. - // However if using the default delimiter and the next character is a - // linefeed then next split will treat it as a delimiter all by itself - // and the additional record read should not be performed. - if (inDelimiter && bytesRead > 0) { - if (usingCRLF) { - needAdditionalRecord = (buffer[0] != '\n'); - } else { - needAdditionalRecord = true; - } - } - return bytesRead; - } - - @Override - public int readLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength - , int maxBytesToConsume) throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public boolean needAdditionalRecordAfterSplit() { - return !finished && needAdditionalRecord; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java deleted file mode 100644 index 8841a31..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class DataLocation { - private String host; - private int volumeId; - - public DataLocation(String host, int volumeId) { - this.host = host; - this.volumeId = volumeId; - } - - public String getHost() { - return host; - } - - public int getVolumeId() { - return volumeId; - } - - @Override - public String toString() { - return "DataLocation{" + - "host=" + host + - ", volumeId=" + volumeId + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java deleted file mode 100644 index 2396349..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import java.util.ArrayList; -import java.util.List; - -public class DiskDeviceInfo { - private int id; - private String name; - - private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>(); - - public DiskDeviceInfo(int id) { - this.id = id; - } - - public int getId() { - return id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public String toString() { - return id + "," + name; - } - - public void addMountPath(DiskMountInfo diskMountInfo) { - mountInfos.add(diskMountInfo); - } - - public List<DiskMountInfo> getMountInfos() { - return mountInfos; - } - - public void setMountInfos(List<DiskMountInfo> mountInfos) { - this.mountInfos = mountInfos; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java deleted file mode 100644 index 22f18ba..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class DiskInfo { - private int id; - private String partitionName; - private String mountPath; - - private long capacity; - private long used; - - public DiskInfo(int id, String partitionName) { - this.id = id; - this.partitionName = partitionName; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public String getPartitionName() { - return partitionName; - } - - public void setPartitionName(String partitionName) { - this.partitionName = partitionName; - } - - public String getMountPath() { - return mountPath; - } - - public void setMountPath(String mountPath) { - this.mountPath = mountPath; - } - - public long getCapacity() { - return capacity; - } - - public void setCapacity(long capacity) { - this.capacity = capacity; - } - - public long getUsed() { - return used; - } - - public void setUsed(long used) { - this.used = used; - } - - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java deleted file mode 100644 index aadb0e7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; - -public class DiskMountInfo implements Comparable<DiskMountInfo> { - private String mountPath; - - private long capacity; - private long used; - - private int deviceId; - - public DiskMountInfo(int deviceId, String mountPath) { - this.mountPath = mountPath; - } - - public String getMountPath() { - return mountPath; - } - - public void setMountPath(String mountPath) { - this.mountPath = mountPath; - } - - public long getCapacity() { - return capacity; - } - - public void setCapacity(long capacity) { - this.capacity = capacity; - } - - public long getUsed() { - return used; - } - - public void setUsed(long used) { - this.used = used; - } - - public int getDeviceId() { - return deviceId; - } - - @Override - public boolean equals(Object obj){ - if (!(obj instanceof DiskMountInfo)) return false; - - if (compareTo((DiskMountInfo) obj) == 0) return true; - else return false; - } - - @Override - public int hashCode(){ - return Objects.hashCode(mountPath); - } - - @Override - public int compareTo(DiskMountInfo other) { - String path1 = mountPath; - String path2 = other.mountPath; - - int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ; - int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ; - - if(path1Depth > path2Depth) { - return -1; - } else if(path1Depth < path2Depth) { - return 1; - } else { - int path1Length = path1.length(); - int path2Length = path2.length(); - - if(path1Length < path2Length) { - return 1; - } else if(path1Length > path2Length) { - return -1; - } else { - return path1.compareTo(path2); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java deleted file mode 100644 index 2d68870..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.common.Util; - -import java.io.*; -import java.net.URI; -import java.util.*; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; - -public class DiskUtil { - - static String UNIX_DISK_DEVICE_PATH = "/proc/partitions"; - - public enum OSType { - OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC - } - - static private OSType getOSType() { - String osName = System.getProperty("os.name"); - if (osName.contains("Windows") - && (osName.contains("XP") || osName.contains("2003") - || osName.contains("Vista") - || osName.contains("Windows_7") - || osName.contains("Windows 7") || osName - .contains("Windows7"))) { - return OSType.OS_TYPE_WINXP; - } else if (osName.contains("SunOS") || osName.contains("Solaris")) { - return OSType.OS_TYPE_SOLARIS; - } else if (osName.contains("Mac")) { - return OSType.OS_TYPE_MAC; - } else { - return OSType.OS_TYPE_UNIX; - } - } - - public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException { - List<DiskDeviceInfo> deviceInfos; - - if(getOSType() == OSType.OS_TYPE_UNIX) { - deviceInfos = getUnixDiskDeviceInfos(); - setDeviceMountInfo(deviceInfos); - } else { - deviceInfos = getDefaultDiskDeviceInfos(); - } - - return deviceInfos; - } - - private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() { - List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); - - File file = new File(UNIX_DISK_DEVICE_PATH); - if(!file.exists()) { - System.out.println("No partition file:" + file.getAbsolutePath()); - return getDefaultDiskDeviceInfos(); - } - - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH))); - String line = null; - - int count = 0; - Set<String> deviceNames = new TreeSet<String>(); - while((line = reader.readLine()) != null) { - if(count > 0 && !line.trim().isEmpty()) { - String[] tokens = line.trim().split(" +"); - if(tokens.length == 4) { - String deviceName = getDiskDeviceName(tokens[3]); - deviceNames.add(deviceName); - } - } - count++; - } - - int id = 0; - for(String eachDeviceName: deviceNames) { - DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++); - diskDeviceInfo.setName(eachDeviceName); - - //TODO set addtional info - // /sys/block/sda/queue - infos.add(diskDeviceInfo); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if(reader != null) { - try { - reader.close(); - } catch (IOException e) { - } - } - } - - return infos; - } - - private static String getDiskDeviceName(String partitionName) { - byte[] bytes = partitionName.getBytes(); - - byte[] result = new byte[bytes.length]; - int length = 0; - for(int i = 0; i < bytes.length; i++, length++) { - if(bytes[i] >= '0' && bytes[i] <= '9') { - break; - } else { - result[i] = bytes[i]; - } - } - - return new String(result, 0, length); - } - - public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() { - DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0); - diskDeviceInfo.setName("default"); - - List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); - - infos.add(diskDeviceInfo); - - return infos; - } - - - private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException { - Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>(); - for(DiskDeviceInfo eachDevice: deviceInfos) { - deviceMap.put(eachDevice.getName(), eachDevice); - } - - BufferedReader mountOutput = null; - try { - Process mountProcess = Runtime.getRuntime().exec("mount"); - mountOutput = new BufferedReader(new InputStreamReader( - mountProcess.getInputStream())); - while (true) { - String line = mountOutput.readLine(); - if (line == null) { - break; - } - - int indexStart = line.indexOf(" on /"); - int indexEnd = line.indexOf(" ", indexStart + 4); - - String deviceName = line.substring(0, indexStart).trim(); - String[] deviceNameTokens = deviceName.split("/"); - if(deviceNameTokens.length == 3) { - if("dev".equals(deviceNameTokens[1])) { - String realDeviceName = getDiskDeviceName(deviceNameTokens[2]); - String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath(); - - DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName); - if(diskDeviceInfo != null) { - diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath)); - } - } - } - } - } catch (IOException e) { - throw e; - } finally { - if (mountOutput != null) { - mountOutput.close(); - } - } - } - - public static int getDataNodeStorageSize(){ - return getStorageDirs().size(); - } - - public static List<URI> getStorageDirs(){ - Configuration conf = new HdfsConfiguration(); - Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); - return Util.stringCollectionAsURIs(dirNames); - } - - public static void main(String[] args) throws Exception { - System.out.println("/dev/sde1".split("/").length); - for(String eachToken: "/dev/sde1".split("/")) { - System.out.println(eachToken); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java deleted file mode 100644 index 0b3755d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.text.TextLineParsingError; - -import java.io.IOException; -import java.io.OutputStream; - - -public interface FieldSerializerDeserializer { - - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; - - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) - throws IOException, TextLineParsingError; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java deleted file mode 100644 index 04278e9..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; - -import java.io.IOException; - -public abstract class FileAppender implements Appender { - protected boolean inited = false; - - protected final Configuration conf; - protected final TableMeta meta; - protected final Schema schema; - protected final Path path; - - protected boolean enabledStats; - - public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) { - this.conf = conf; - this.meta = meta; - this.schema = schema; - this.path = path; - } - - public void init() throws IOException { - if (inited) { - throw new IllegalStateException("FileAppender is already initialized."); - } - inited = true; - } - - public void enableStats() { - if (inited) { - throw new IllegalStateException("Should enable this option before init()"); - } - - this.enabledStats = true; - } - - public long getEstimatedOutputSize() throws IOException { - return getOffset(); - } - - public abstract long getOffset() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java deleted file mode 100644 index f15c4c9..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; - -public abstract class FileScanner implements Scanner { - private static final Log LOG = LogFactory.getLog(FileScanner.class); - - protected boolean inited = false; - protected final Configuration conf; - protected final TableMeta meta; - protected final Schema schema; - protected final FileFragment fragment; - protected final int columnNum; - - protected Column [] targets; - - protected float progress; - - protected TableStats tableStats; - - public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) { - this.conf = conf; - this.meta = meta; - this.schema = schema; - this.fragment = fragment; - this.tableStats = new TableStats(); - this.columnNum = this.schema.size(); - } - - public void init() throws IOException { - inited = true; - progress = 0.0f; - - if (fragment != null) { - tableStats.setNumBytes(fragment.getEndKey()); - tableStats.setNumBlocks(1); - } - - if (schema != null) { - for(Column eachColumn: schema.getColumns()) { - ColumnStats columnStats = new ColumnStats(eachColumn); - tableStats.addColumnStat(columnStats); - } - } - } - - @Override - public Schema getSchema() { - return schema; - } - - @Override - public void setTarget(Column[] targets) { - if (inited) { - throw new IllegalStateException("Should be called before init()"); - } - this.targets = targets; - } - - public void setSearchCondition(Object expr) { - if (inited) { - throw new IllegalStateException("Should be called before init()"); - } - } - - public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException { - String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME); - FileSystem fs; - if(tajoUser != null) { - try { - fs = FileSystem.get(path.toUri(), tajoConf, tajoUser); - } catch (InterruptedException e) { - LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]"); - fs = FileSystem.get(path.toUri(), tajoConf); - } - } else { - fs = FileSystem.get(path.toUri(), tajoConf); - } - - return fs; - } - - @Override - public float getProgress() { - return progress; - } - - @Override - public TableStats getInputStats() { - return tableStats; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java deleted file mode 100644 index 8b7e2e0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage; - -import com.google.common.base.Preconditions; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.exception.UnsupportedException; - -/** - * An instance of FrameTuple is an immutable tuple. - * It contains two tuples and pretends to be one instance of Tuple for - * join qual evaluatations. - */ -public class FrameTuple implements Tuple, Cloneable { - private int size; - private int leftSize; - - private Tuple left; - private Tuple right; - - public FrameTuple() {} - - public FrameTuple(Tuple left, Tuple right) { - set(left, right); - } - - public void set(Tuple left, Tuple right) { - this.size = left.size() + right.size(); - this.left = left; - this.leftSize = left.size(); - this.right = right; - } - - @Override - public int size() { - return size; - } - - @Override - public boolean contains(int fieldId) { - Preconditions.checkArgument(fieldId < size, - "Out of field access: " + fieldId); - - if (fieldId < leftSize) { - return left.contains(fieldId); - } else { - return right.contains(fieldId - leftSize); - } - } - - @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); - } - - @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); - } - - @Override - public void clear() { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Datum value) { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException(); - } - - @Override - public void setOffset(long offset) { - throw new UnsupportedException(); - } - - @Override - public long getOffset() { - throw new UnsupportedException(); - } - - @Override - public void put(Datum [] values) { - throw new UnsupportedException(); - } - - @Override - public Datum get(int fieldId) { - Preconditions.checkArgument(fieldId < size, - "Out of field access: " + fieldId); - - if (fieldId < leftSize) { - return left.get(fieldId); - } else { - return right.get(fieldId - leftSize); - } - } - - @Override - public boolean getBool(int fieldId) { - return get(fieldId).asBool(); - } - - @Override - public byte getByte(int fieldId) { - return get(fieldId).asByte(); - } - - @Override - public char getChar(int fieldId) { - return get(fieldId).asChar(); - } - - @Override - public byte [] getBytes(int fieldId) { - return get(fieldId).asByteArray(); - } - - @Override - public short getInt2(int fieldId) { - return get(fieldId).asInt2(); - } - - @Override - public int getInt4(int fieldId) { - return get(fieldId).asInt4(); - } - - @Override - public long getInt8(int fieldId) { - return get(fieldId).asInt8(); - } - - @Override - public float getFloat4(int fieldId) { - return get(fieldId).asFloat4(); - } - - @Override - public double getFloat8(int fieldId) { - return get(fieldId).asFloat8(); - } - - @Override - public String getText(int fieldId) { - return get(fieldId).asChars(); - } - - @Override - public ProtobufDatum getProtobufDatum(int fieldId) { - return (ProtobufDatum) get(fieldId); - } - - @Override - public IntervalDatum getInterval(int fieldId) { - return (IntervalDatum) get(fieldId); - } - - @Override - public char [] getUnicodeChars(int fieldId) { - return get(fieldId).asUnicodeChars(); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - FrameTuple frameTuple = (FrameTuple) super.clone(); - frameTuple.set(this.left.clone(), this.right.clone()); - return frameTuple; - } - - @Override - public Datum[] getValues(){ - throw new UnsupportedException(); - } - - public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - for(int i=0; i < size(); i++) { - if(contains(i)) { - if(first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(get(i)); - } - } - str.append(")"); - return str.toString(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java deleted file mode 100644 index 40cad32..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.util.Pair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public class HashShuffleAppender implements Appender { - private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); - - private FileAppender appender; - private AtomicBoolean closed = new AtomicBoolean(false); - private int partId; - - private TableStats tableStats; - - //<taskId,<page start offset,<task start, task end>>> - private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; - - //page start offset, length - private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); - - private Pair<Long, Integer> currentPage; - - private int pageSize; //MB - - private int rowNumInPage; - - private int totalRows; - - private long offset; - - private ExecutionBlockId ebId; - - public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { - this.ebId = ebId; - this.partId = partId; - this.appender = appender; - this.pageSize = pageSize; - } - - @Override - public void init() throws IOException { - currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); - rowNumInPage = 0; - } - - /** - * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. - * After writing if a current page exceeds pageSize, pageOffset will be added. - * @param taskId - * @param tuples - * @return written bytes - * @throws IOException - */ - public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { - synchronized(appender) { - if (closed.get()) { - return 0; - } - long currentPos = appender.getOffset(); - - for (Tuple eachTuple: tuples) { - appender.addTuple(eachTuple); - } - long posAfterWritten = appender.getOffset(); - - int writtenBytes = (int)(posAfterWritten - currentPos); - - int nextRowNum = rowNumInPage + tuples.size(); - List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); - if (taskIndexes == null) { - taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - taskTupleIndexes.put(taskId, taskIndexes); - } - taskIndexes.add( - new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); - rowNumInPage = nextRowNum; - - if (posAfterWritten - currentPage.getFirst() > pageSize) { - nextPage(posAfterWritten); - rowNumInPage = 0; - } - - totalRows += tuples.size(); - return writtenBytes; - } - } - - public long getOffset() throws IOException { - if (closed.get()) { - return offset; - } else { - return appender.getOffset(); - } - } - - private void nextPage(long pos) { - currentPage.setSecond((int) (pos - currentPage.getFirst())); - pages.add(currentPage); - currentPage = new Pair(pos, 0); - } - - @Override - public void addTuple(Tuple t) throws IOException { - throw new IOException("Not support addTuple, use addTuples()"); - } - - @Override - public void flush() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - } - } - - @Override - public long getEstimatedOutputSize() throws IOException { - return pageSize * pages.size(); - } - - @Override - public void close() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - offset = appender.getOffset(); - if (offset > currentPage.getFirst()) { - nextPage(offset); - } - appender.close(); - if (LOG.isDebugEnabled()) { - if (!pages.isEmpty()) { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() - + ", lastPage=" + pages.get(pages.size() - 1)); - } else { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); - } - } - closed.set(true); - tableStats = appender.getStats(); - } - } - - @Override - public void enableStats() { - } - - @Override - public TableStats getStats() { - synchronized(appender) { - return appender.getStats(); - } - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - - public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { - return taskTupleIndexes; - } - - public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { - List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - - for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { - merged.addAll(eachFailureIndex); - } - - return merged; - } - - public void taskFinished(QueryUnitAttemptId taskId) { - taskTupleIndexes.remove(taskId); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java deleted file mode 100644 index 84d81d5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.util.Pair; -import org.apache.tajo.storage.StorageManager; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class HashShuffleAppenderManager { - private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); - - private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = - new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); - private TajoConf systemConf; - private FileSystem defaultFS; - private FileSystem localFS; - private LocalDirAllocator lDirAllocator; - private int pageSize; - - public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { - this.systemConf = systemConf; - - // initialize LocalDirAllocator - lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - - // initialize DFS and LocalFileSystems - defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); - localFS = FileSystem.getLocal(systemConf); - pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; - } - - public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, - TableMeta meta, Schema outSchema) throws IOException { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); - - if (partitionAppenderMap == null) { - partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); - appenderMap.put(ebId, partitionAppenderMap); - } - - PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); - if (partitionAppenderMeta == null) { - Path dataFile = getDataFile(ebId, partId); - FileSystem fs = dataFile.getFileSystem(systemConf); - if (fs.exists(dataFile)) { - FileStatus status = fs.getFileStatus(dataFile); - LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); - } - - if (!fs.exists(dataFile.getParent())) { - fs.mkdirs(dataFile.getParent()); - } - FileAppender appender = (FileAppender) StorageManager.getStorageManager( - tajoConf).getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); - - partitionAppenderMeta = new PartitionAppenderMeta(); - partitionAppenderMeta.partId = partId; - partitionAppenderMeta.dataFile = dataFile; - partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); - partitionAppenderMeta.appender.init(); - partitionAppenderMap.put(partId, partitionAppenderMeta); - - LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); - } - - return partitionAppenderMeta.appender; - } - } - - public static int getPartParentId(int partId, TajoConf tajoConf) { - return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); - } - - private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { - try { - // the base dir for an output dir - String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; - Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); - //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); - - // If EB has many partition, too many shuffle file are in single directory. - return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - } - - public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; - synchronized (appenderMap) { - partitionAppenderMap = appenderMap.remove(ebId); - } - - if (partitionAppenderMap == null) { - LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); - return null; - } - - // Send Intermediate data to QueryMaster. - List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); - for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { - try { - eachMeta.appender.close(); - HashShuffleIntermediate intermediate = - new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), - eachMeta.appender.getPages(), - eachMeta.appender.getMergedTupleIndexes()); - intermEntries.add(intermediate); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - throw e; - } - } - - LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); - - return intermEntries; - } - - public void finalizeTask(QueryUnitAttemptId taskId) { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = - appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId()); - if (partitionAppenderMap == null) { - return; - } - - for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { - eachAppender.appender.taskFinished(taskId); - } - } - } - - public static class HashShuffleIntermediate { - private int partId; - - private long volume; - - //[<page start offset,<task start, task end>>] - private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes; - - //[<page start offset, length>] - private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); - - public HashShuffleIntermediate(int partId, long volume, - List<Pair<Long, Integer>> pages, - Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) { - this.partId = partId; - this.volume = volume; - this.failureTskTupleIndexes = failureTskTupleIndexes; - this.pages = pages; - } - - public int getPartId() { - return partId; - } - - public long getVolume() { - return volume; - } - - public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() { - return failureTskTupleIndexes; - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - } - - static class PartitionAppenderMeta { - int partId; - HashShuffleAppender appender; - Path dataFile; - - public int getPartId() { - return partId; - } - - public HashShuffleAppender getAppender() { - return appender; - } - - public Path getDataFile() { - return dataFile; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java deleted file mode 100644 index bfbe478..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.exception.UnsupportedException; - -import java.util.Arrays; - -public class LazyTuple implements Tuple, Cloneable { - private long offset; - private Datum[] values; - private byte[][] textBytes; - private Schema schema; - private byte[] nullBytes; - private SerializerDeserializer serializeDeserialize; - - public LazyTuple(Schema schema, byte[][] textBytes, long offset) { - this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer()); - } - - public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) { - this.schema = schema; - this.textBytes = textBytes; - this.values = new Datum[schema.size()]; - this.offset = offset; - this.nullBytes = nullBytes; - this.serializeDeserialize = serde; - } - - public LazyTuple(LazyTuple tuple) { - this.values = tuple.getValues(); - this.offset = tuple.offset; - this.schema = tuple.schema; - this.textBytes = new byte[size()][]; - this.nullBytes = tuple.nullBytes; - this.serializeDeserialize = tuple.serializeDeserialize; - } - - @Override - public int size() { - return values.length; - } - - @Override - public boolean contains(int fieldid) { - return textBytes[fieldid] != null || values[fieldid] != null; - } - - @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); - } - - @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); - } - - @Override - public void clear() { - for (int i = 0; i < values.length; i++) { - values[i] = null; - textBytes[i] = null; - } - } - - ////////////////////////////////////////////////////// - // Setter - ////////////////////////////////////////////////////// - @Override - public void put(int fieldId, Datum value) { - values[fieldId] = value; - textBytes[fieldId] = null; - } - - @Override - public void put(int fieldId, Datum[] values) { - for (int i = fieldId, j = 0; j < values.length; i++, j++) { - this.values[i] = values[j]; - } - this.textBytes = new byte[values.length][]; - } - - @Override - public void put(int fieldId, Tuple tuple) { - for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) { - values[i] = tuple.get(j); - textBytes[i] = null; - } - } - - @Override - public void put(Datum[] values) { - System.arraycopy(values, 0, this.values, 0, size()); - this.textBytes = new byte[values.length][]; - } - - ////////////////////////////////////////////////////// - // Getter - ////////////////////////////////////////////////////// - @Override - public Datum get(int fieldId) { - if (values[fieldId] != null) - return values[fieldId]; - else if (textBytes.length <= fieldId) { - values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,") - } else if (textBytes[fieldId] != null) { - try { - values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId), - textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes); - } catch (Exception e) { - values[fieldId] = NullDatum.get(); - } - textBytes[fieldId] = null; - } else { - //non-projection - } - return values[fieldId]; - } - - @Override - public void setOffset(long offset) { - this.offset = offset; - } - - @Override - public long getOffset() { - return this.offset; - } - - @Override - public boolean getBool(int fieldId) { - return get(fieldId).asBool(); - } - - @Override - public byte getByte(int fieldId) { - return get(fieldId).asByte(); - } - - @Override - public char getChar(int fieldId) { - return get(fieldId).asChar(); - } - - @Override - public byte [] getBytes(int fieldId) { - return get(fieldId).asByteArray(); - } - - @Override - public short getInt2(int fieldId) { - return get(fieldId).asInt2(); - } - - @Override - public int getInt4(int fieldId) { - return get(fieldId).asInt4(); - } - - @Override - public long getInt8(int fieldId) { - return get(fieldId).asInt8(); - } - - @Override - public float getFloat4(int fieldId) { - return get(fieldId).asFloat4(); - } - - @Override - public double getFloat8(int fieldId) { - return get(fieldId).asFloat8(); - } - - @Override - public String getText(int fieldId) { - return get(fieldId).asChars(); - } - - @Override - public ProtobufDatum getProtobufDatum(int fieldId) { - throw new UnsupportedException(); - } - - @Override - public IntervalDatum getInterval(int fieldId) { - return (IntervalDatum) get(fieldId); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - return get(fieldId).asUnicodeChars(); - } - - public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - Datum d; - for (int i = 0; i < values.length; i++) { - d = get(i); - if (d != null) { - if (first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(d); - } - } - str.append(")"); - return str.toString(); - } - - @Override - public int hashCode() { - return Arrays.hashCode(values); - } - - @Override - public Datum[] getValues() { - Datum[] datums = new Datum[values.length]; - for (int i = 0; i < values.length; i++) { - datums[i] = get(i); - } - return datums; - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - LazyTuple lazyTuple = (LazyTuple) super.clone(); - - lazyTuple.values = getValues(); //shallow copy - lazyTuple.textBytes = new byte[size()][]; - return lazyTuple; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Tuple) { - Tuple other = (Tuple) obj; - return Arrays.equals(getValues(), other.getValues()); - } - return false; - } -}
