http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java new file mode 100644 index 0000000..bfbe478 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; + +import java.util.Arrays; + +public class LazyTuple implements Tuple, Cloneable { + private long offset; + private Datum[] values; + private byte[][] textBytes; + private Schema schema; + private byte[] nullBytes; + private SerializerDeserializer serializeDeserialize; + + public LazyTuple(Schema schema, byte[][] textBytes, long offset) { + this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer()); + } + + public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) { + this.schema = schema; + this.textBytes = textBytes; + this.values = new Datum[schema.size()]; + this.offset = offset; + this.nullBytes = nullBytes; + this.serializeDeserialize = serde; + } + + public LazyTuple(LazyTuple tuple) { + this.values = tuple.getValues(); + this.offset = tuple.offset; + this.schema = tuple.schema; + this.textBytes = new byte[size()][]; + this.nullBytes = tuple.nullBytes; + this.serializeDeserialize = tuple.serializeDeserialize; + } + + @Override + public int size() { + return values.length; + } + + @Override + public boolean contains(int fieldid) { + return textBytes[fieldid] != null || values[fieldid] != null; + } + + @Override + public boolean isNull(int fieldid) { + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); + } + + @Override + public void clear() { + for (int i = 0; i < values.length; i++) { + values[i] = null; + textBytes[i] = null; + } + } + + ////////////////////////////////////////////////////// + // Setter + ////////////////////////////////////////////////////// + @Override + public void put(int fieldId, Datum value) { + values[fieldId] = value; + textBytes[fieldId] = null; + } + + @Override + public void put(int fieldId, Datum[] values) { + for (int i = fieldId, j = 0; j < values.length; i++, j++) { + this.values[i] = values[j]; + } + this.textBytes = new byte[values.length][]; + } + + @Override + public void put(int fieldId, Tuple tuple) { + for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) { + values[i] = tuple.get(j); + textBytes[i] = null; + } + } + + @Override + public void put(Datum[] values) { + System.arraycopy(values, 0, this.values, 0, size()); + this.textBytes = new byte[values.length][]; + } + + ////////////////////////////////////////////////////// + // Getter + ////////////////////////////////////////////////////// + @Override + public Datum get(int fieldId) { + if (values[fieldId] != null) + return values[fieldId]; + else if (textBytes.length <= fieldId) { + values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,") + } else if (textBytes[fieldId] != null) { + try { + values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId), + textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes); + } catch (Exception e) { + values[fieldId] = NullDatum.get(); + } + textBytes[fieldId] = null; + } else { + //non-projection + } + return values[fieldId]; + } + + @Override + public void setOffset(long offset) { + this.offset = offset; + } + + @Override + public long getOffset() { + return this.offset; + } + + @Override + public boolean getBool(int fieldId) { + return get(fieldId).asBool(); + } + + @Override + public byte getByte(int fieldId) { + return get(fieldId).asByte(); + } + + @Override + public char getChar(int fieldId) { + return get(fieldId).asChar(); + } + + @Override + public byte [] getBytes(int fieldId) { + return get(fieldId).asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return get(fieldId).asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return get(fieldId).asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return get(fieldId).asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return get(fieldId).asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return get(fieldId).asFloat8(); + } + + @Override + public String getText(int fieldId) { + return get(fieldId).asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + throw new UnsupportedException(); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override + public char[] getUnicodeChars(int fieldId) { + return get(fieldId).asUnicodeChars(); + } + + public String toString() { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + Datum d; + for (int i = 0; i < values.length; i++) { + d = get(i); + if (d != null) { + if (first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(d); + } + } + str.append(")"); + return str.toString(); + } + + @Override + public int hashCode() { + return Arrays.hashCode(values); + } + + @Override + public Datum[] getValues() { + Datum[] datums = new Datum[values.length]; + for (int i = 0; i < values.length; i++) { + datums[i] = get(i); + } + return datums; + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + LazyTuple lazyTuple = (LazyTuple) super.clone(); + + lazyTuple.values = getValues(); //shallow copy + lazyTuple.textBytes = new byte[size()][]; + return lazyTuple; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Tuple) { + Tuple other = (Tuple) obj; + return Arrays.equals(getValues(), other.getValues()); + } + return false; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java new file mode 100644 index 0000000..f19b61f --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MemoryUtil.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.datum.*; +import org.apache.tajo.util.ClassSize; + +public class MemoryUtil { + + /** Overhead for an NullDatum */ + public static final long NULL_DATUM; + + /** Overhead for an BoolDatum */ + public static final long BOOL_DATUM; + + /** Overhead for an CharDatum */ + public static final long CHAR_DATUM; + + /** Overhead for an BitDatum */ + public static final long BIT_DATUM; + + /** Overhead for an Int2Datum */ + public static final long INT2_DATUM; + + /** Overhead for an Int4Datum */ + public static final long INT4_DATUM; + + /** Overhead for an Int8Datum */ + public static final long INT8_DATUM; + + /** Overhead for an Float4Datum */ + public static final long FLOAT4_DATUM; + + /** Overhead for an Float8Datum */ + public static final long FLOAT8_DATUM; + + /** Overhead for an TextDatum */ + public static final long TEXT_DATUM; + + /** Overhead for an BlobDatum */ + public static final long BLOB_DATUM; + + /** Overhead for an DateDatum */ + public static final long DATE_DATUM; + + /** Overhead for an TimeDatum */ + public static final long TIME_DATUM; + + /** Overhead for an TimestampDatum */ + public static final long TIMESTAMP_DATUM; + + static { + NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false); + + CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false); + + BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false); + + BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false); + + INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false); + + INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false); + + INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false); + + FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false); + + FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false); + + TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false); + + BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false); + + DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false); + + TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false); + + TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false); + } + + public static long calculateMemorySize(Tuple tuple) { + long total = ClassSize.OBJECT; + for (Datum datum : tuple.getValues()) { + switch (datum.type()) { + + case NULL_TYPE: + total += NULL_DATUM; + break; + + case BOOLEAN: + total += BOOL_DATUM; + break; + + case BIT: + total += BIT_DATUM; + break; + + case CHAR: + total += CHAR_DATUM + datum.size(); + break; + + case INT1: + case INT2: + total += INT2_DATUM; + break; + + case INT4: + total += INT4_DATUM; + break; + + case INT8: + total += INT8_DATUM; + break; + + case FLOAT4: + total += FLOAT4_DATUM; + break; + + case FLOAT8: + total += FLOAT4_DATUM; + break; + + case TEXT: + total += TEXT_DATUM + datum.size(); + break; + + case DATE: + total += DATE_DATUM; + break; + + case TIME: + total += TIME_DATUM; + break; + + case TIMESTAMP: + total += TIMESTAMP_DATUM; + break; + + default: + break; + } + } + + return total; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java new file mode 100644 index 0000000..66b3667 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class MergeScanner implements Scanner { + private Configuration conf; + private TableMeta meta; + private Schema schema; + private List<Fragment> fragments; + private Iterator<Fragment> iterator; + private Fragment currentFragment; + private Scanner currentScanner; + private Tuple tuple; + private boolean projectable = false; + private boolean selectable = false; + private Schema target; + private float progress; + protected TableStats tableStats; + + public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList) + throws IOException { + this(conf, schema, meta, rawFragmentList, schema); + } + + public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList, + Schema target) + throws IOException { + this.conf = conf; + this.schema = schema; + this.meta = meta; + this.target = target; + + this.fragments = new ArrayList<Fragment>(); + + long numBytes = 0; + for (Fragment eachFileFragment: rawFragmentList) { + long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment); + if (fragmentLength > 0) { + numBytes += fragmentLength; + fragments.add(eachFileFragment); + } + } + + // it should keep the input order. Otherwise, it causes wrong result of sort queries. + this.reset(); + + if (currentScanner != null) { + this.projectable = currentScanner.isProjectable(); + this.selectable = currentScanner.isSelectable(); + } + + tableStats = new TableStats(); + + tableStats.setNumBytes(numBytes); + tableStats.setNumBlocks(fragments.size()); + + for(Column eachColumn: schema.getColumns()) { + ColumnStats columnStats = new ColumnStats(eachColumn); + tableStats.addColumnStat(columnStats); + } + } + + @Override + public void init() throws IOException { + progress = 0.0f; + } + + @Override + public Tuple next() throws IOException { + if (currentScanner != null) + tuple = currentScanner.next(); + + if (tuple != null) { + return tuple; + } else { + if (currentScanner != null) { + currentScanner.close(); + TableStats scannerTableStsts = currentScanner.getInputStats(); + if (scannerTableStsts != null) { + tableStats.setReadBytes(tableStats.getReadBytes() + scannerTableStsts.getReadBytes()); + tableStats.setNumRows(tableStats.getNumRows() + scannerTableStsts.getNumRows()); + } + } + currentScanner = getNextScanner(); + if (currentScanner != null) { + tuple = currentScanner.next(); + } + } + return tuple; + } + + @Override + public void reset() throws IOException { + this.iterator = fragments.iterator(); + if (currentScanner != null) { + currentScanner.close(); + } + this.currentScanner = getNextScanner(); + } + + private Scanner getNextScanner() throws IOException { + if (iterator.hasNext()) { + currentFragment = iterator.next(); + currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema, + currentFragment, target); + currentScanner.init(); + return currentScanner; + } else { + return null; + } + } + + @Override + public void close() throws IOException { + if(currentScanner != null) { + currentScanner.close(); + currentScanner = null; + } + iterator = null; + progress = 1.0f; + } + + @Override + public boolean isProjectable() { + return projectable; + } + + @Override + public void setTarget(Column[] targets) { + this.target = new Schema(targets); + } + + @Override + public boolean isSelectable() { + return selectable; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public boolean isSplittable(){ + return false; + } + + @Override + public float getProgress() { + if (currentScanner != null && iterator != null && tableStats.getNumBytes() > 0) { + TableStats scannerTableStsts = currentScanner.getInputStats(); + long currentScannerReadBytes = 0; + if (scannerTableStsts != null) { + currentScannerReadBytes = scannerTableStsts.getReadBytes(); + } + + return (float)(tableStats.getReadBytes() + currentScannerReadBytes) / (float)tableStats.getNumBytes(); + } else { + return progress; + } + } + + @Override + public TableStats getInputStats() { + return tableStats; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java new file mode 100644 index 0000000..4272228 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NullScanner.java @@ -0,0 +1,109 @@ +package org.apache.tajo.storage; /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hadoop.conf.Configuration; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; + +public class NullScanner implements Scanner { + protected final Configuration conf; + protected final TableMeta meta; + protected final Schema schema; + protected final Fragment fragment; + protected final int columnNum; + protected Column [] targets; + protected float progress; + protected TableStats tableStats; + + public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) { + this.conf = conf; + this.meta = meta; + this.schema = schema; + this.fragment = fragment; + this.tableStats = new TableStats(); + this.columnNum = this.schema.size(); + } + + @Override + public void init() throws IOException { + progress = 0.0f; + tableStats.setNumBytes(0); + tableStats.setNumBlocks(0); + } + + @Override + public Tuple next() throws IOException { + progress = 1.0f; + return null; + } + + @Override + public void reset() throws IOException { + progress = 0.0f; + } + + @Override + public void close() throws IOException { + progress = 1.0f; + } + + @Override + public boolean isProjectable() { + return false; + } + + @Override + public void setTarget(Column[] targets) { + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return true; + } + + @Override + public void setSearchCondition(Object expr) { + + } + + @Override + public boolean isSplittable() { + return true; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public TableStats getInputStats() { + return tableStats; + } + + @Override + public Schema getSchema() { + return schema; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java new file mode 100644 index 0000000..94d13ee --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/NumericPathComparator.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.Path; + +import java.util.Comparator; + +public class NumericPathComparator implements Comparator<Path> { + + @Override + public int compare(Path p1, Path p2) { + int num1 = Integer.parseInt(p1.getName()); + int num2 = Integer.parseInt(p2.getName()); + + return num1 - num2; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java new file mode 100644 index 0000000..24b6280 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.exception.UnknownDataTypeException; +import org.apache.tajo.tuple.offheap.RowWriter; +import org.apache.tajo.util.BitArray; + +import java.nio.ByteBuffer; + +public class RowStoreUtil { + public static int[] getTargetIds(Schema inSchema, Schema outSchema) { + int[] targetIds = new int[outSchema.size()]; + int i = 0; + for (Column target : outSchema.getColumns()) { + targetIds[i] = inSchema.getColumnId(target.getQualifiedName()); + i++; + } + + return targetIds; + } + + public static Tuple project(Tuple in, Tuple out, int[] targetIds) { + out.clear(); + for (int idx = 0; idx < targetIds.length; idx++) { + out.put(idx, in.get(targetIds[idx])); + } + return out; + } + + public static RowStoreEncoder createEncoder(Schema schema) { + return new RowStoreEncoder(schema); + } + + public static RowStoreDecoder createDecoder(Schema schema) { + return new RowStoreDecoder(schema); + } + + public static class RowStoreDecoder { + + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreDecoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + + public Tuple toTuple(byte [] bytes) { + nullFlags.clear(); + ByteBuffer bb = ByteBuffer.wrap(bytes); + Tuple tuple = new VTuple(schema.size()); + Column col; + TajoDataTypes.DataType type; + + bb.limit(headerSize); + nullFlags.fromByteBuffer(bb); + bb.limit(bytes.length); + + for (int i =0; i < schema.size(); i++) { + if (nullFlags.get(i)) { + tuple.put(i, DatumFactory.createNullDatum()); + continue; + } + + col = schema.getColumn(i); + type = col.getDataType(); + switch (type.getType()) { + case BOOLEAN: tuple.put(i, DatumFactory.createBool(bb.get())); break; + case BIT: + byte b = bb.get(); + tuple.put(i, DatumFactory.createBit(b)); + break; + + case CHAR: + byte c = bb.get(); + tuple.put(i, DatumFactory.createChar(c)); + break; + + case INT2: + short s = bb.getShort(); + tuple.put(i, DatumFactory.createInt2(s)); + break; + + case INT4: + case DATE: + int i_ = bb.getInt(); + tuple.put(i, DatumFactory.createFromInt4(type, i_)); + break; + + case INT8: + case TIME: + case TIMESTAMP: + long l = bb.getLong(); + tuple.put(i, DatumFactory.createFromInt8(type, l)); + break; + + case INTERVAL: + int month = bb.getInt(); + long milliseconds = bb.getLong(); + tuple.put(i, new IntervalDatum(month, milliseconds)); + break; + + case FLOAT4: + float f = bb.getFloat(); + tuple.put(i, DatumFactory.createFloat4(f)); + break; + + case FLOAT8: + double d = bb.getDouble(); + tuple.put(i, DatumFactory.createFloat8(d)); + break; + + case TEXT: + byte [] _string = new byte[bb.getInt()]; + bb.get(_string); + tuple.put(i, DatumFactory.createText(_string)); + break; + + case BLOB: + byte [] _bytes = new byte[bb.getInt()]; + bb.get(_bytes); + tuple.put(i, DatumFactory.createBlob(_bytes)); + break; + + case INET4: + byte [] _ipv4 = new byte[4]; + bb.get(_ipv4); + tuple.put(i, DatumFactory.createInet4(_ipv4)); + break; + case INET6: + // TODO - to be implemented + throw new UnsupportedException(type.getType().name()); + default: + throw new RuntimeException(new UnknownDataTypeException(type.getType().name())); + } + } + return tuple; + } + + public Schema getSchema() { + return schema; + } + } + + public static class RowStoreEncoder { + private Schema schema; + private BitArray nullFlags; + private int headerSize; + + private RowStoreEncoder(Schema schema) { + this.schema = schema; + nullFlags = new BitArray(schema.size()); + headerSize = nullFlags.bytesLength(); + } + + public byte[] toBytes(Tuple tuple) { + nullFlags.clear(); + int size = estimateTupleDataSize(tuple); + ByteBuffer bb = ByteBuffer.allocate(size + headerSize); + bb.position(headerSize); + Column col; + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + nullFlags.set(i); + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case NULL_TYPE: + nullFlags.set(i); + break; + case BOOLEAN: + bb.put(tuple.get(i).asByte()); + break; + case BIT: + bb.put(tuple.get(i).asByte()); + break; + case CHAR: + bb.put(tuple.get(i).asByte()); + break; + case INT2: + bb.putShort(tuple.get(i).asInt2()); + break; + case INT4: + bb.putInt(tuple.get(i).asInt4()); + break; + case INT8: + bb.putLong(tuple.get(i).asInt8()); + break; + case FLOAT4: + bb.putFloat(tuple.get(i).asFloat4()); + break; + case FLOAT8: + bb.putDouble(tuple.get(i).asFloat8()); + break; + case TEXT: + byte[] _string = tuple.get(i).asByteArray(); + bb.putInt(_string.length); + bb.put(_string); + break; + case DATE: + bb.putInt(tuple.get(i).asInt4()); + break; + case TIME: + case TIMESTAMP: + bb.putLong(tuple.get(i).asInt8()); + break; + case INTERVAL: + IntervalDatum interval = (IntervalDatum) tuple.get(i); + bb.putInt(interval.getMonths()); + bb.putLong(interval.getMilliSeconds()); + break; + case BLOB: + byte[] bytes = tuple.get(i).asByteArray(); + bb.putInt(bytes.length); + bb.put(bytes); + break; + case INET4: + byte[] ipBytes = tuple.get(i).asByteArray(); + bb.put(ipBytes); + break; + case INET6: + bb.put(tuple.get(i).asByteArray()); + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + byte[] flags = nullFlags.toArray(); + int finalPosition = bb.position(); + bb.position(0); + bb.put(flags); + + bb.position(finalPosition); + bb.flip(); + byte[] buf = new byte[bb.limit()]; + bb.get(buf); + return buf; + } + + // Note that, NULL values are treated separately + private int estimateTupleDataSize(Tuple tuple) { + int size = 0; + Column col; + + for (int i = 0; i < schema.size(); i++) { + if (tuple.isNull(i)) { + continue; + } + + col = schema.getColumn(i); + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: + size += 1; + break; + case INT2: + size += 2; + break; + case DATE: + case INT4: + case FLOAT4: + size += 4; + break; + case TIME: + case TIMESTAMP: + case INT8: + case FLOAT8: + size += 8; + break; + case INTERVAL: + size += 12; + break; + case TEXT: + case BLOB: + size += (4 + tuple.get(i).asByteArray().length); + break; + case INET4: + case INET6: + size += tuple.get(i).asByteArray().length; + break; + default: + throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name())); + } + } + + size += 100; // optimistic reservation + + return size; + } + + public Schema getSchema() { + return schema; + } + } + + public static void convert(Tuple tuple, RowWriter writer) { + writer.startRow(); + + for (int i = 0; i < writer.dataTypes().length; i++) { + if (tuple.isNull(i)) { + writer.skipField(); + continue; + } + switch (writer.dataTypes()[i].getType()) { + case BOOLEAN: + writer.putBool(tuple.getBool(i)); + break; + case INT1: + case INT2: + writer.putInt2(tuple.getInt2(i)); + break; + case INT4: + case DATE: + case INET4: + writer.putInt4(tuple.getInt4(i)); + break; + case INT8: + case TIMESTAMP: + case TIME: + writer.putInt8(tuple.getInt8(i)); + break; + case FLOAT4: + writer.putFloat4(tuple.getFloat4(i)); + break; + case FLOAT8: + writer.putFloat8(tuple.getFloat8(i)); + break; + case TEXT: + writer.putText(tuple.getBytes(i)); + break; + case INTERVAL: + writer.putInterval((IntervalDatum) tuple.getInterval(i)); + break; + case PROTOBUF: + writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i)); + break; + case NULL_TYPE: + writer.skipField(); + break; + default: + throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]); + } + } + writer.endRow(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java new file mode 100644 index 0000000..0356b19 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Scanner.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaObject; +import org.apache.tajo.catalog.statistics.TableStats; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Scanner Interface + */ +public interface Scanner extends SchemaObject, Closeable { + + void init() throws IOException; + + /** + * It returns one tuple at each call. + * + * @return retrieve null if the scanner has no more tuples. + * Otherwise it returns one tuple. + * + * @throws java.io.IOException if internal I/O error occurs during next method + */ + Tuple next() throws IOException; + + /** + * Reset the cursor. After executed, the scanner + * will retrieve the first tuple. + * + * @throws java.io.IOException if internal I/O error occurs during reset method + */ + void reset() throws IOException; + + /** + * Close scanner + * + * @throws java.io.IOException if internal I/O error occurs during close method + */ + void close() throws IOException; + + + /** + * It returns if the projection is executed in the underlying scanner layer. + * + * @return true if this scanner can project the given columns. + */ + boolean isProjectable(); + + /** + * Set target columns + * @param targets columns to be projected + */ + void setTarget(Column[] targets); + + /** + * It returns if the selection is executed in the underlying scanner layer. + * + * @return true if this scanner can filter tuples against a given condition. + */ + boolean isSelectable(); + + /** + * Set a search condition + * @param expr to be searched + * + * TODO - to be changed Object type + */ + void setSearchCondition(Object expr); + + /** + * It returns if the file is splittable. + * + * @return true if this scanner can split the a file. + */ + boolean isSplittable(); + + /** + * How much of the input has the Scanner consumed + * @return progress from <code>0.0</code> to <code>1.0</code>. + */ + float getProgress(); + + TableStats getInputStats(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java new file mode 100644 index 0000000..894e7ee --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SeekableScanner.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.io.IOException; + +public interface SeekableScanner extends Scanner { + + public abstract long getNextOffset() throws IOException; + + public abstract void seek(long offset) throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java new file mode 100644 index 0000000..564a9f5 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +import java.io.IOException; +import java.io.OutputStream; + +@Deprecated +public interface SerializerDeserializer { + + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException; + + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java new file mode 100644 index 0000000..d2a692d --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -0,0 +1,933 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.*; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.text.NumberFormat; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * StorageManager manages the functions of storing and reading data. + * StorageManager is a abstract class. + * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. + * + */ +public abstract class StorageManager { + private final Log LOG = LogFactory.getLog(StorageManager.class); + + private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { + Configuration.class, + Schema.class, + TableMeta.class, + Fragment.class + }; + + private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { + Configuration.class, + QueryUnitAttemptId.class, + Schema.class, + TableMeta.class, + Path.class + }; + + protected TajoConf conf; + protected StoreType storeType; + + /** + * Cache of StorageManager. + * Key is manager key(warehouse path) + store type + */ + private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); + + /** + * Cache of scanner handlers for each storage type. + */ + protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Scanner>>(); + + /** + * Cache of appender handlers for each storage type. + */ + protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE + = new ConcurrentHashMap<String, Class<? extends Appender>>(); + + /** + * Cache of constructors for each class. Pins the classes so they + * can't be garbage collected until ReflectionUtils can be collected. + */ + private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = + new ConcurrentHashMap<Class<?>, Constructor<?>>(); + + public StorageManager(StoreType storeType) { + this.storeType = storeType; + } + + /** + * Initialize storage manager. + * @throws java.io.IOException + */ + protected abstract void storageInit() throws IOException; + + /** + * This method is called after executing "CREATE TABLE" statement. + * If a storage is a file based storage, a storage manager may create directory. + * + * @param tableDesc Table description which is created. + * @param ifNotExists Creates the table only when the table does not exist. + * @throws java.io.IOException + */ + public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; + + /** + * This method is called after executing "DROP TABLE" statement with the 'PURGE' option + * which is the option to delete all the data. + * + * @param tableDesc + * @throws java.io.IOException + */ + public abstract void purgeTable(TableDesc tableDesc) throws IOException; + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @param scanNode The logical node for scanning. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, + ScanNode scanNode) throws IOException; + + /** + * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. + * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. + * @param tableDesc The table description for the target data. + * @param currentPage The current page number within the entire list. + * @param numFragments The number of fragments in the result. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException; + + /** + * It returns the storage property. + * @return The storage property + */ + public abstract StorageProperty getStorageProperty(); + + /** + * Release storage manager resource + */ + public abstract void closeStorageManager(); + + /** + * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER. + * In general Repartitioner determines the partition range using previous output statistics data. + * In the special cases, such as HBase Repartitioner uses the result of this method. + * + * @param queryContext The current query context which contains query properties. + * @param tableDesc The table description for the target data. + * @param inputSchema The input schema + * @param sortSpecs The sort specification that contains the sort column and sort order. + * @return The list of sort ranges. + * @throws java.io.IOException + */ + public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, + TupleRange dataRange) throws IOException; + + /** + * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. + * In general Tajo creates the target table after finishing the final sub-query of CATS. + * But In the special cases, such as HBase INSERT or CAST query uses the target table information. + * That kind of the storage should implements the logic related to creating table in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; + + /** + * It is called when the query failed. + * Each storage manager should implement to be processed when the query fails in this method. + * + * @param node The child node of the root node. + * @throws java.io.IOException + */ + public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; + + /** + * Returns the current storage type. + * @return + */ + public StoreType getStoreType() { + return storeType; + } + + /** + * Initialize StorageManager instance. It should be called before using. + * + * @param tajoConf + * @throws java.io.IOException + */ + public void init(TajoConf tajoConf) throws IOException { + this.conf = tajoConf; + storageInit(); + } + + /** + * Close StorageManager + * @throws java.io.IOException + */ + public void close() throws IOException { + synchronized(storageManagers) { + for (StorageManager eachStorageManager: storageManagers.values()) { + eachStorageManager.closeStorageManager(); + } + } + } + + /** + * Returns the splits that will serve as input for the scan tasks. The + * number of splits matches the number of regions in a table. + * + * @param fragmentId The table name or previous ExecutionBlockId + * @param tableDesc The table description for the target data. + * @return The list of input fragments. + * @throws java.io.IOException + */ + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException { + return getSplits(fragmentId, tableDesc, null); + } + + /** + * Returns FileStorageManager instance. + * + * @param tajoConf Tajo system property. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { + return getFileStorageManager(tajoConf, null); + } + + /** + * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter. + * + * @param tajoConf Tajo system property. + * @param warehousePath The warehouse directory to be set in the tajoConf. + * @return + * @throws java.io.IOException + */ + public static StorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { + URI uri; + TajoConf copiedConf = new TajoConf(tajoConf); + if (warehousePath != null) { + copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString()); + } + uri = TajoConf.getWarehouseDir(copiedConf).toUri(); + String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); + return getStorageManager(copiedConf, StoreType.CSV, key); + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { + if ("HBASE".equals(storeType)) { + return getStorageManager(tajoConf, StoreType.HBASE); + } else { + return getStorageManager(tajoConf, StoreType.CSV); + } + } + + /** + * Returns the proper StorageManager instance according to the storeType. + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @return + * @throws java.io.IOException + */ + public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { + return getStorageManager(tajoConf, storeType, null); + } + + /** + * Returns the proper StorageManager instance according to the storeType + * + * @param tajoConf Tajo system property. + * @param storeType Storage type + * @param managerKey Key that can identify each storage manager(may be a path) + * @return + * @throws java.io.IOException + */ + public static synchronized StorageManager getStorageManager ( + TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { + synchronized (storageManagers) { + String storeKey = storeType + managerKey; + StorageManager manager = storageManagers.get(storeKey); + if (manager == null) { + String typeName = "hdfs"; + + switch (storeType) { + case HBASE: + typeName = "hbase"; + break; + default: + typeName = "hdfs"; + } + + Class<? extends StorageManager> storageManagerClass = + tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class); + + if (storageManagerClass == null) { + throw new IOException("Unknown Storage Type: " + typeName); + } + + try { + Constructor<? extends StorageManager> constructor = + (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass); + if (constructor == null) { + constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{StoreType.class}); + constructor.setAccessible(true); + CONSTRUCTOR_CACHE.put(storageManagerClass, constructor); + } + manager = constructor.newInstance(new Object[]{storeType}); + } catch (Exception e) { + throw new RuntimeException(e); + } + manager.init(tajoConf); + storageManagers.put(storeKey, manager); + } + + return manager; + } + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target Columns which are selected. + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { + return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { + return getScanner(meta, schema, fragment, schema); + } + + /** + * Returns Scanner instance. + * + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + if (fragment.isEmpty()) { + Scanner scanner = new NullScanner(conf, schema, meta, fragment); + scanner.setTarget(target.toArray()); + + return scanner; + } + + Scanner scanner; + + Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); + scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); + if (scanner.isProjectable()) { + scanner.setTarget(target.toArray()); + } + + return scanner; + } + + /** + * Returns Scanner instance. + * + * @param conf The system property + * @param meta The table meta + * @param schema The input schema + * @param fragment The fragment for scanning + * @param target The output schema + * @return Scanner instance + * @throws java.io.IOException + */ + public static synchronized SeekableScanner getSeekableScanner( + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); + } + + /** + * Returns Appender instance. + * @param queryContext Query property. + * @param taskAttemptId Task id. + * @param meta Table meta data. + * @param schema Output schema. + * @param workDir Working directory + * @return Appender instance + * @throws java.io.IOException + */ + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + Appender appender; + + Class<? extends Appender> appenderClass; + + String handlerName = meta.getStoreType().name().toLowerCase(); + appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); + if (appenderClass == null) { + appenderClass = conf.getClass( + String.format("tajo.storage.appender-handler.%s.class", + meta.getStoreType().name().toLowerCase()), null, Appender.class); + APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); + } + + if (appenderClass == null) { + throw new IOException("Unknown Storage Type: " + meta.getStoreType()); + } + + appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); + + return appender; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param schema Input schema + * @param meta Table meta data + * @param fragment The fragment for scanning + * @param <T> + * @return The scanner instance + */ + public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, + Fragment fragment) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Creates a scanner instance. + * + * @param theClass Concrete class of scanner + * @param conf System property + * @param taskAttemptId Task id + * @param meta Table meta data + * @param schema Input schema + * @param workDir Working directory + * @param <T> + * @return The scanner instance + */ + public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, + TableMeta meta, Schema schema, Path workDir) { + T result; + try { + Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); + if (meth == null) { + meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); + meth.setAccessible(true); + CONSTRUCTOR_CACHE.put(theClass, meth); + } + result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); + } catch (Exception e) { + throw new RuntimeException(e); + } + + return result; + } + + /** + * Return the Scanner class for the StoreType that is defined in storage-default.xml. + * + * @param storeType store type + * @return The Scanner class + * @throws java.io.IOException + */ + public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { + String handlerName = storeType.name().toLowerCase(); + Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); + if (scannerClass == null) { + scannerClass = conf.getClass( + String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); + SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); + } + + if (scannerClass == null) { + throw new IOException("Unknown Storage Type: " + storeType.name()); + } + + return scannerClass; + } + + /** + * Return length of the fragment. + * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. + * + * @param conf Tajo system property + * @param fragment Fragment + * @return + */ + public static long getFragmentLength(TajoConf conf, Fragment fragment) { + if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { + return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); + } else { + return fragment.getLength(); + } + } + + /** + * It is called after making logical plan. Storage manager should verify the schema for inserting. + * + * @param tableDesc The table description of insert target. + * @param outSchema The output schema of select query for inserting. + * @throws java.io.IOException + */ + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + // nothing to do + } + + /** + * Returns the list of storage specified rewrite rules. + * This values are used by LogicalOptimizer. + * + * @param queryContext The query property + * @param tableDesc The description of the target table. + * @return The list of storage specified rewrite rules + * @throws java.io.IOException + */ + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + return null; + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @return Saved path + * @throws java.io.IOException + */ + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); + } + + /** + * Finalizes result data. Tajo stores result data in the staging directory. + * If the query fails, clean up the staging directory. + * Otherwise the query is successful, move to the final directory from the staging directory. + * + * @param queryContext The query property + * @param finalEbId The final execution block id + * @param plan The query plan + * @param schema The final output schema + * @param tableDesc The description of the target table + * @param changeFileSeq If true change result file name with max sequence. + * @return Saved path + * @throws java.io.IOException + */ + protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc, boolean changeFileSeq) throws IOException { + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + Path finalOutputDir; + if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + FileSystem fs = stagingResultDir.getFileSystem(conf); + + if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO + + // It moves the original table into the temporary location. + // Then it moves the new result table into the original table location. + // Upon failed, it recovers the original table if possible. + boolean movedToOldTable = false; + boolean committed = false; + Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. + Map<Path, Path> renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. + Map<Path, Path> recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.delete(entry.getValue(), true); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { + fs.delete(entry.getValue(), true); + } + + // Recovery renamed dirs + for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { + fs.delete(entry.getValue(), true); + fs.rename(entry.getValue(), entry.getKey()); + } + throw new IOException(ioe.getMessage()); + } + } else { + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } + } + } + } else { + String queryType = queryContext.get(QueryVars.COMMAND_TYPE); + + if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table + + NumberFormat fmt = NumberFormat.getInstance(); + fmt.setGroupingUsed(false); + fmt.setMinimumIntegerDigits(3); + + if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.isFile()) { + LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); + } + } else { + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; + for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); + } + } + // checking all file moved and remove empty dir + verifyAllFileMoved(fs, stagingResultDir); + FileStatus[] files = fs.listStatus(stagingResultDir); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + } + } + } else { // CREATE TABLE AS SELECT (CTAS) + fs.rename(stagingResultDir, finalOutputDir); + LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); + } + } + } else { + finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + } + + return finalOutputDir; + } + + /** + * Attach the sequence number to the output file name and than move the file into the final result path. + * + * @param fs FileSystem + * @param stagingResultDir The staging result dir + * @param fileStatus The file status + * @param finalOutputPath Final output path + * @param nf Number format + * @param fileSeq The sequence number + * @throws java.io.IOException + */ + private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, + FileStatus fileStatus, Path finalOutputPath, + NumberFormat nf, + int fileSeq, boolean changeFileSeq) throws IOException { + if (fileStatus.isDirectory()) { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (!fs.exists(finalSubPath)) { + fs.mkdirs(finalSubPath); + } + int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); + for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { + if (eachFile.getPath().getName().startsWith("_")) { + continue; + } + moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); + } + } else { + throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); + } + } else { + String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); + if (subPath != null) { + Path finalSubPath = new Path(finalOutputPath, subPath); + if (changeFileSeq) { + finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); + } + if (!fs.exists(finalSubPath.getParent())) { + fs.mkdirs(finalSubPath.getParent()); + } + if (fs.exists(finalSubPath)) { + throw new IOException("Already exists data file:" + finalSubPath); + } + boolean success = fs.rename(fileStatus.getPath(), finalSubPath); + if (success) { + LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } else { + LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + + "to final output[" + finalSubPath + "]"); + } + } + } + } + + /** + * Removes the path of the parent. + * @param parentPath + * @param childPath + * @return + */ + private String extractSubPath(Path parentPath, Path childPath) { + String parentPathStr = parentPath.toUri().getPath(); + String childPathStr = childPath.toUri().getPath(); + + if (parentPathStr.length() > childPathStr.length()) { + return null; + } + + int index = childPathStr.indexOf(parentPathStr); + if (index != 0) { + return null; + } + + return childPathStr.substring(parentPathStr.length() + 1); + } + + /** + * Attach the sequence number to a path. + * + * @param path Path + * @param seq sequence number + * @param nf Number format + * @return New path attached with sequence number + * @throws java.io.IOException + */ + private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { + String[] tokens = path.getName().split("-"); + if (tokens.length != 4) { + throw new IOException("Wrong result file name:" + path); + } + return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); + } + + /** + * Make sure all files are moved. + * @param fs FileSystem + * @param stagingPath The stagind directory + * @return + * @throws java.io.IOException + */ + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + if (files != null && files.length != 0) { + for (FileStatus eachFile: files) { + if (eachFile.isFile()) { + LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); + return false; + } else { + if (verifyAllFileMoved(fs, eachFile.getPath())) { + fs.delete(eachFile.getPath(), false); + } else { + return false; + } + } + } + } + + return true; + } + + /** + * This method sets a rename map which includes renamed staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws java.io.IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!isLeafDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java new file mode 100644 index 0000000..6816d08 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class StorageProperty { + private boolean supportsInsertInto; + private boolean sortedInsert; + + public boolean isSupportsInsertInto() { + return supportsInsertInto; + } + + public void setSupportsInsertInto(boolean supportsInsertInto) { + this.supportsInsertInto = supportsInsertInto; + } + + public boolean isSortedInsert() { + return sortedInsert; + } + + public void setSortedInsert(boolean sortedInsert) { + this.sortedInsert = sortedInsert; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java new file mode 100644 index 0000000..54fdb69 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.util.FileUtil; +import sun.nio.ch.DirectBuffer; + +import java.io.DataInput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class StorageUtil extends StorageConstants { + public static int getRowByteSize(Schema schema) { + int sum = 0; + for(Column col : schema.getColumns()) { + sum += StorageUtil.getColByteSize(col); + } + + return sum; + } + + public static int getColByteSize(Column col) { + switch (col.getDataType().getType()) { + case BOOLEAN: + return 1; + case CHAR: + return 1; + case BIT: + return 1; + case INT2: + return 2; + case INT4: + return 4; + case INT8: + return 8; + case FLOAT4: + return 4; + case FLOAT8: + return 8; + case INET4: + return 4; + case INET6: + return 32; + case TEXT: + return 256; + case BLOB: + return 256; + case DATE: + return 4; + case TIME: + return 8; + case TIMESTAMP: + return 8; + default: + return 0; + } + } + + public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { + FileSystem fs = tableroot.getFileSystem(conf); + FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); + FileUtil.writeProto(out, meta.getProto()); + out.flush(); + out.close(); + } + + public static Path concatPath(String parent, String...childs) { + return concatPath(new Path(parent), childs); + } + + public static Path concatPath(Path parent, String...childs) { + StringBuilder sb = new StringBuilder(); + + for(int i=0; i < childs.length; i++) { + sb.append(childs[i]); + if(i < childs.length - 1) + sb.append("/"); + } + + return new Path(parent, sb.toString()); + } + + static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; + static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; + + /** + * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". + * + * This method finds the maximum sequence number from existing data files through the above patterns. + * If it cannot find any matched file or the maximum number, it will return -1. + * + * @param fs + * @param path + * @param recursive + * @return The maximum sequence number + * @throws java.io.IOException + */ + public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { + if (!fs.isDirectory(path)) { + return -1; + } + + FileStatus[] files = fs.listStatus(path); + + if (files == null || files.length == 0) { + return -1; + } + + int maxValue = -1; + List<Path> fileNamePatternMatchedList = new ArrayList<Path>(); + + for (FileStatus eachFile: files) { + // In the case of partition table, return largest value within all partition dirs. + if (eachFile.isDirectory() && recursive) { + int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); + if (value > maxValue) { + maxValue = value; + } + } else { + if (eachFile.getPath().getName().matches(fileNamePatternV08) || + eachFile.getPath().getName().matches(fileNamePatternV09)) { + fileNamePatternMatchedList.add(eachFile.getPath()); + } + } + } + + if (fileNamePatternMatchedList.isEmpty()) { + return maxValue; + } + Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); + String pathName = lastFile.getName(); + + // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> + // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> + String[] pathTokens = pathName.split("-"); + if (pathTokens.length == 3) { + return -1; + } else if(pathTokens.length == 4) { + return Integer.parseInt(pathTokens[3]); + } else { + return -1; + } + } + + public static void closeBuffer(ByteBuffer buffer) { + if (buffer != null) { + if (buffer.isDirect()) { + ((DirectBuffer) buffer).cleaner().clean(); + } else { + buffer.clear(); + } + } + } + + public static int readFully(InputStream is, byte[] buffer, int offset, int length) + throws IOException { + int nread = 0; + while (nread < length) { + int nbytes = is.read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + return nread > 0 ? nread : nbytes; + } + nread += nbytes; + } + return nread; + } + + /** + * Similar to readFully(). Skips bytes in a loop. + * @param in The DataInput to skip bytes from + * @param len number of bytes to skip. + * @throws java.io.IOException if it could not skip requested number of bytes + * for any reason (including EOF) + */ + public static void skipFully(DataInput in, int len) throws IOException { + int amt = len; + while (amt > 0) { + long ret = in.skipBytes(amt); + if (ret == 0) { + // skip may return 0 even if we're not at EOF. Luckily, we can + // use the read() method to figure out if we're at the end. + int b = in.readByte(); + if (b == -1) { + throw new EOFException( "Premature EOF from inputStream after " + + "skipping " + (len - amt) + " byte(s)."); + } + ret = 1; + } + amt -= ret; + } + } +}
