http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java deleted file mode 100644 index c43ba38..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java +++ /dev/null @@ -1,577 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.*; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.ProtoUtil; -import org.junit.Test; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import static org.apache.tajo.common.TajoDataTypes.Type; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestOffHeapRowBlock { - private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class); - public static String UNICODE_FIELD_PREFIX = "abc_ê°ëë¤_"; - public static Schema schema; - - static { - schema = new Schema(); - schema.addColumn("col0", Type.BOOLEAN); - schema.addColumn("col1", Type.INT2); - schema.addColumn("col2", Type.INT4); - schema.addColumn("col3", Type.INT8); - schema.addColumn("col4", Type.FLOAT4); - schema.addColumn("col5", Type.FLOAT8); - schema.addColumn("col6", Type.TEXT); - schema.addColumn("col7", Type.TIMESTAMP); - schema.addColumn("col8", Type.DATE); - schema.addColumn("col9", Type.TIME); - schema.addColumn("col10", Type.INTERVAL); - schema.addColumn("col11", Type.INET4); - schema.addColumn("col12", - CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName())); - } - - private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) { - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (endTime - startTime) + " msec"); - } - - @Test - public void testPutAndReadValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testNullityValidation() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - - reader.reset(); - int j = 0; - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec"); - - long readStart = System.currentTimeMillis(); - tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - validateNullity(j, tuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - @Test - public void testEmptyRow() { - int rowNum = 1000; - - long allocStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10); - long allocEnd = System.currentTimeMillis(); - explainRowBlockAllocation(rowBlock, allocStart, allocEnd); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - rowBlock.getWriter().startRow(); - // empty columns - rowBlock.getWriter().endRow(); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing tooks " + (writeEnd - writeStart) + " msec"); - - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - rowBlock.release(); - - assertEquals(rowNum, j); - assertEquals(rowNum, rowBlock.rows()); - } - - @Test - public void testSortBenchmark() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - - List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList(); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - reader.reset(); - while(reader.next(tuple)) { - unSafeTuples.add(tuple); - tuple = new ZeroCopyTuple(); - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4)); - BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec}); - - long sortStart = System.currentTimeMillis(); - Collections.sort(unSafeTuples, comparator); - long sortEnd = System.currentTimeMillis(); - LOG.info("sorting took " + (sortEnd - sortStart) + " msec"); - rowBlock.release(); - } - - @Test - public void testVTuplePutAndGetBenchmark() { - int rowNum = 1000; - - List<VTuple> rowBlock = Lists.newArrayList(); - long writeStart = System.currentTimeMillis(); - VTuple tuple; - for (int i = 0; i < rowNum; i++) { - tuple = new VTuple(schema.size()); - fillVTuple(i, tuple); - rowBlock.add(tuple); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - long readStart = System.currentTimeMillis(); - int j = 0; - for (VTuple t : rowBlock) { - validateTupleResult(j, t); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - int count = 0; - for (int l = 0; l < rowBlock.size(); l++) { - for(int m = 0; m < schema.size(); m++ ) { - if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) { - count ++; - } - } - } - // For preventing unnecessary code elimination optimization. - LOG.info("The number of INT4 values is " + count + "."); - } - - @Test - public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100); - - long writeStart = System.currentTimeMillis(); - VTuple tuple = new VTuple(schema.size()); - for (int i = 0; i < rowNum; i++) { - fillVTuple(i, tuple); - - RowStoreUtil.convert(tuple, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("Writing takes " + (writeEnd - writeStart) + " msec"); - - validateResults(rowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfRowBlock() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - validateResults(restoredRowBlock); - rowBlock.release(); - } - - @Test - public void testSerDerOfZeroCopyTuple() { - int rowNum = 1000; - - OffHeapRowBlock rowBlock = createRowBlock(rowNum); - - ByteBuffer bb = rowBlock.nioBuffer(); - OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb); - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock); - - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - ZeroCopyTuple copyTuple = new ZeroCopyTuple(); - int j = 0; - reader.reset(); - while(reader.next(tuple)) { - ByteBuffer copy = tuple.nioBuffer(); - copyTuple.set(copy, SchemaUtil.toDataTypes(schema)); - - validateTupleResult(j, copyTuple); - - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("reading takes " + (readEnd - readStart) + " msec"); - - rowBlock.release(); - } - - public static OffHeapRowBlock createRowBlock(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRow(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing takes " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static OffHeapRowBlock createRowBlockWithNull(int rowNum) { - long allocateStart = System.currentTimeMillis(); - OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8); - long allocatedEnd = System.currentTimeMillis(); - LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated " - + (allocatedEnd - allocateStart) + " msec"); - - long writeStart = System.currentTimeMillis(); - for (int i = 0; i < rowNum; i++) { - fillRowBlockWithNull(i, rowBlock.getWriter()); - } - long writeEnd = System.currentTimeMillis(); - LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec"); - - return rowBlock; - } - - public static void fillRow(int i, RowWriter builder) { - builder.startRow(); - builder.putBool(i % 1 == 0 ? true : false); // 0 - builder.putInt2((short) 1); // 1 - builder.putInt4(i); // 2 - builder.putInt8(i); // 3 - builder.putFloat4(i); // 4 - builder.putFloat8(i); // 5 - builder.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - builder.endRow(); - } - - public static void fillRowBlockWithNull(int i, RowWriter writer) { - writer.startRow(); - - if (i == 0) { - writer.skipField(); - } else { - writer.putBool(i % 1 == 0 ? true : false); // 0 - } - if (i % 1 == 0) { - writer.skipField(); - } else { - writer.putInt2((short) 1); // 1 - } - - if (i % 2 == 0) { - writer.skipField(); - } else { - writer.putInt4(i); // 2 - } - - if (i % 3 == 0) { - writer.skipField(); - } else { - writer.putInt8(i); // 3 - } - - if (i % 4 == 0) { - writer.skipField(); - } else { - writer.putFloat4(i); // 4 - } - - if (i % 5 == 0) { - writer.skipField(); - } else { - writer.putFloat8(i); // 5 - } - - if (i % 6 == 0) { - writer.skipField(); - } else { - writer.putText((UNICODE_FIELD_PREFIX + i).getBytes()); // 6 - } - - if (i % 7 == 0) { - writer.skipField(); - } else { - writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7 - } - - if (i % 8 == 0) { - writer.skipField(); - } else { - writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8 - } - - if (i % 9 == 0) { - writer.skipField(); - } else { - writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9 - } - - if (i % 10 == 0) { - writer.skipField(); - } else { - writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10 - } - - if (i % 11 == 0) { - writer.skipField(); - } else { - writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11 - } - - if (i % 12 == 0) { - writer.skipField(); - } else { - writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12 - } - - writer.endRow(); - } - - public static void fillVTuple(int i, VTuple tuple) { - tuple.put(0, DatumFactory.createBool(i % 1 == 0)); - tuple.put(1, DatumFactory.createInt2((short) 1)); - tuple.put(2, DatumFactory.createInt4(i)); - tuple.put(3, DatumFactory.createInt8(i)); - tuple.put(4, DatumFactory.createFloat4(i)); - tuple.put(5, DatumFactory.createFloat8(i)); - tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes())); - tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7 - tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8 - tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9 - tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10 - tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11 - tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12; - } - - public static void validateResults(OffHeapRowBlock rowBlock) { - long readStart = System.currentTimeMillis(); - ZeroCopyTuple tuple = new ZeroCopyTuple(); - int j = 0; - OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock); - reader.reset(); - while(reader.next(tuple)) { - validateTupleResult(j, tuple); - j++; - } - long readEnd = System.currentTimeMillis(); - LOG.info("Reading takes " + (readEnd - readStart) + " msec"); - } - - public static void validateTupleResult(int j, Tuple t) { - assertTrue((j % 1 == 0) == t.getBool(0)); - assertTrue(1 == t.getInt2(1)); - assertEquals(j, t.getInt4(2)); - assertEquals(j, t.getInt8(3)); - assertTrue(j == t.getFloat4(4)); - assertTrue(j == t.getFloat8(5)); - assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6)); - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7)); - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8)); - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9)); - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10)); - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11)); - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12)); - } - - public static void validateNullity(int j, Tuple tuple) { - if (j == 0) { - tuple.isNull(0); - } else { - assertTrue((j % 1 == 0) == tuple.getBool(0)); - } - - if (j % 1 == 0) { - tuple.isNull(1); - } else { - assertTrue(1 == tuple.getInt2(1)); - } - - if (j % 2 == 0) { - tuple.isNull(2); - } else { - assertEquals(j, tuple.getInt4(2)); - } - - if (j % 3 == 0) { - tuple.isNull(3); - } else { - assertEquals(j, tuple.getInt8(3)); - } - - if (j % 4 == 0) { - tuple.isNull(4); - } else { - assertTrue(j == tuple.getFloat4(4)); - } - - if (j % 5 == 0) { - tuple.isNull(5); - } else { - assertTrue(j == tuple.getFloat8(5)); - } - - if (j % 6 == 0) { - tuple.isNull(6); - } else { - assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6)); - } - - if (j % 7 == 0) { - tuple.isNull(7); - } else { - assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7)); - } - - if (j % 8 == 0) { - tuple.isNull(8); - } else { - assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8)); - } - - if (j % 9 == 0) { - tuple.isNull(9); - } else { - assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9)); - } - - if (j % 10 == 0) { - tuple.isNull(10); - } else { - assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10)); - } - - if (j % 11 == 0) { - tuple.isNull(11); - } else { - assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11)); - } - - if (j % 12 == 0) { - tuple.isNull(12); - } else { - assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12)); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java deleted file mode 100644 index 1eb9c17..0000000 --- a/tajo-storage/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.tuple.offheap; - -import org.apache.tajo.unit.StorageUnit; -import org.junit.Test; - -import static org.junit.Assert.*; - -public class TestResizableSpec { - - @Test - public void testResizableLimit() { - ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f); - - long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f)); - - assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB)); - - assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB)); - - assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1); - - assertFalse(limit.canIncrease(limit.limit())); - } - - @Test - public void testFixedLimit() { - FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f); - - assertEquals(limit.limit(), 100 * StorageUnit.MB); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000)); - - assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB)); - - assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB)); - - assertFalse(limit.canIncrease(limit.limit())); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml deleted file mode 100644 index 790d5a8..0000000 --- a/tajo-storage/src/test/resources/storage-default.xml +++ /dev/null @@ -1,154 +0,0 @@ -<?xml version="1.0"?> -<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> - -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<configuration> - <property> - <name>fs.s3.impl</name> - <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value> - </property> - - <!--- Registered Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> - </property> - - <!--- Fragment Class Configurations --> - <property> - <name>tajo.storage.fragment.textfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.csv.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.raw.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.rcfile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.row.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.parquet.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.sequencefile.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - <property> - <name>tajo.storage.fragment.avro.class</name> - <value>org.apache.tajo.storage.fragment.FileFragment</value> - </property> - - <!--- Scanner Handler --> - <property> - <name>tajo.storage.scanner-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value> - </property> - - <property> - <name>tajo.storage.scanner-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroScanner</value> - </property> - - <!--- Appender Handler --> - <property> - <name>tajo.storage.appender-handler</name> - <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro</value> - </property> - - <property> - <name>tajo.storage.appender-handler.textfile.class</name> - <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.csv.class</name> - <value>org.apache.tajo.storage.CSVFile$CSVAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.raw.class</name> - <value>org.apache.tajo.storage.RawFile$RawFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rcfile.class</name> - <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.rowfile.class</name> - <value>org.apache.tajo.storage.RowFile$RowFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.parquet.class</name> - <value>org.apache.tajo.storage.parquet.ParquetAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.sequencefile.class</name> - <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value> - </property> - - <property> - <name>tajo.storage.appender-handler.avro.class</name> - <value>org.apache.tajo.storage.avro.AvroAppender</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/test/resources/testVariousTypes.avsc ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc deleted file mode 100644 index 611b97f..0000000 --- a/tajo-storage/src/test/resources/testVariousTypes.avsc +++ /dev/null @@ -1,21 +0,0 @@ -{ - "type": "record", - "namespace": "org.apache.tajo", - "name": "testVariousTypes", - "fields": [ - { "name": "col1", "type": "boolean" }, - { "name": "col2", "type": "int" }, - { "name": "col3", "type": "string" }, - { "name": "col4", "type": "int" }, - { "name": "col5", "type": "int" }, - { "name": "col6", "type": "long" }, - { "name": "col7", "type": "float" }, - { "name": "col8", "type": "double" }, - { "name": "col9", "type": "string" }, - { "name": "col10", "type": "bytes" }, - { "name": "col11", "type": "bytes" }, - { "name": "col12", "type": "null" }, - { "name": "col13", "type": "bytes" } - ] -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml new file mode 100644 index 0000000..c600b4b --- /dev/null +++ b/tajo-storage/tajo-storage-common/pom.xml @@ -0,0 +1,337 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.9.1-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + <artifactId>tajo-storage-common</artifactId> + <packaging>jar</packaging> + <name>Tajo Storage Common</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--proto_path=../../tajo-common/src/main/proto</argument> + <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/IndexProtos.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-pmd-plugin</artifactId> + <version>2.7.1</version> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-json</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-server-tests</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-api</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-hs</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java new file mode 100644 index 0000000..c5e96ac --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.catalog.statistics.TableStats; + +import java.io.Closeable; +import java.io.IOException; + +public interface Appender extends Closeable { + + void init() throws IOException; + + void addTuple(Tuple t) throws IOException; + + void flush() throws IOException; + + long getEstimatedOutputSize() throws IOException; + + void close() throws IOException; + + void enableStats(); + + TableStats getStats(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java new file mode 100644 index 0000000..b829f60 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.datum.Datum; + +import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; +import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; + +/** + * The Comparator class for Tuples + * + * @see Tuple + */ +public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> { + private final Schema schema; + private final SortSpec [] sortSpecs; + private final int[] sortKeyIds; + private final boolean[] asc; + @SuppressWarnings("unused") + private final boolean[] nullFirsts; + + private Datum left; + private Datum right; + private int compVal; + + /** + * @param schema The schema of input tuples + * @param sortKeys The description of sort keys + */ + public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) { + Preconditions.checkArgument(sortKeys.length > 0, + "At least one sort key must be specified."); + + this.schema = schema; + this.sortSpecs = sortKeys; + this.sortKeyIds = new int[sortKeys.length]; + this.asc = new boolean[sortKeys.length]; + this.nullFirsts = new boolean[sortKeys.length]; + for (int i = 0; i < sortKeys.length; i++) { + if (sortKeys[i].getSortKey().hasQualifier()) { + this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); + } else { + this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); + } + + this.asc[i] = sortKeys[i].isAscending(); + this.nullFirsts[i]= sortKeys[i].isNullFirst(); + } + } + + public BaseTupleComparator(TupleComparatorProto proto) { + this.schema = new Schema(proto.getSchema()); + + this.sortSpecs = new SortSpec[proto.getSortSpecsCount()]; + for (int i = 0; i < proto.getSortSpecsCount(); i++) { + sortSpecs[i] = new SortSpec(proto.getSortSpecs(i)); + } + + this.sortKeyIds = new int[proto.getCompSpecsCount()]; + this.asc = new boolean[proto.getCompSpecsCount()]; + this.nullFirsts = new boolean[proto.getCompSpecsCount()]; + + for (int i = 0; i < proto.getCompSpecsCount(); i++) { + TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); + sortKeyIds[i] = sortSepcProto.getColumnId(); + asc[i] = sortSepcProto.getAscending(); + nullFirsts[i] = sortSepcProto.getNullFirst(); + } + } + + public Schema getSchema() { + return schema; + } + + public SortSpec [] getSortSpecs() { + return sortSpecs; + } + + public int [] getSortKeyIds() { + return sortKeyIds; + } + + @Override + public boolean isAscendingFirstKey() { + return this.asc[0]; + } + + @Override + public int compare(Tuple tuple1, Tuple tuple2) { + for (int i = 0; i < sortKeyIds.length; i++) { + left = tuple1.get(sortKeyIds[i]); + right = tuple2.get(sortKeyIds[i]); + + if (left.isNull() || right.isNull()) { + if (!left.equals(right)) { + if (left.isNull()) { + compVal = 1; + } else if (right.isNull()) { + compVal = -1; + } + if (nullFirsts[i]) { + if (compVal != 0) { + compVal *= -1; + } + } + } else { + compVal = 0; + } + } else { + if (asc[i]) { + compVal = left.compareTo(right); + } else { + compVal = right.compareTo(left); + } + } + + if (compVal < 0 || compVal > 0) { + return compVal; + } + } + return 0; + } + + @Override + public int hashCode() { + return Objects.hashCode(sortKeyIds); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof BaseTupleComparator) { + BaseTupleComparator other = (BaseTupleComparator) obj; + if (sortKeyIds.length != other.sortKeyIds.length) { + return false; + } + + for (int i = 0; i < sortKeyIds.length; i++) { + if (sortKeyIds[i] != other.sortKeyIds[i] || + asc[i] != other.asc[i] || + nullFirsts[i] != other.nullFirsts[i]) { + return false; + } + } + + return true; + } else { + return false; + } + } + + @Override + public TupleComparatorProto getProto() { + TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); + builder.setSchema(schema.getProto()); + for (int i = 0; i < sortSpecs.length; i++) { + builder.addSortSpecs(sortSpecs[i].getProto()); + } + + TupleComparatorSpecProto.Builder sortSpecBuilder; + for (int i = 0; i < sortKeyIds.length; i++) { + sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); + sortSpecBuilder.setColumnId(sortKeyIds[i]); + sortSpecBuilder.setAscending(asc[i]); + sortSpecBuilder.setNullFirst(nullFirsts[i]); + builder.addCompSpecs(sortSpecBuilder); + } + + return builder.build(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + String prefix = ""; + for (int i = 0; i < sortKeyIds.length; i++) { + sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) + .append(",Asc=").append(asc[i]) + .append(",NullFirst=").append(nullFirsts[i]); + prefix = " ,"; + } + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java new file mode 100644 index 0000000..00112e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.*; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.io.OutputStream; + +@Deprecated +public class BinarySerializerDeserializer implements SerializerDeserializer { + + static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; + + @Override + public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) + throws IOException { + byte[] bytes; + int length = 0; + if (datum == null || datum instanceof NullDatum) { + return 0; + } + + switch (col.getDataType().getType()) { + case BOOLEAN: + case BIT: + case CHAR: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case INT2: + length = writeShort(out, datum.asInt2()); + break; + case INT4: + length = writeVLong(out, datum.asInt4()); + break; + case INT8: + length = writeVLong(out, datum.asInt8()); + break; + case FLOAT4: + length = writeFloat(out, datum.asFloat4()); + break; + case FLOAT8: + length = writeDouble(out, datum.asFloat8()); + break; + case TEXT: { + bytes = datum.asTextBytes(); + length = datum.size(); + if (length == 0) { + bytes = INVALID_UTF__SINGLE_BYTE; + length = INVALID_UTF__SINGLE_BYTE.length; + } + out.write(bytes, 0, bytes.length); + break; + } + case BLOB: + case INET4: + case INET6: + bytes = datum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case PROTOBUF: + ProtobufDatum protobufDatum = (ProtobufDatum) datum; + bytes = protobufDatum.asByteArray(); + length = bytes.length; + out.write(bytes, 0, length); + break; + case NULL_TYPE: + break; + default: + throw new IOException("Does not support type"); + } + return length; + } + + @Override + public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { + if (length == 0) return NullDatum.get(); + + Datum datum; + switch (col.getDataType().getType()) { + case BOOLEAN: + datum = DatumFactory.createBool(bytes[offset]); + break; + case BIT: + datum = DatumFactory.createBit(bytes[offset]); + break; + case CHAR: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + datum = DatumFactory.createChar(chars); + break; + } + case INT2: + datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length)); + break; + case INT4: + datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset)); + break; + case INT8: + datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset)); + break; + case FLOAT4: + datum = DatumFactory.createFloat4(toFloat(bytes, offset, length)); + break; + case FLOAT8: + datum = DatumFactory.createFloat8(toDouble(bytes, offset, length)); + break; + case TEXT: { + byte[] chars = new byte[length]; + System.arraycopy(bytes, offset, chars, 0, length); + + if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) { + datum = DatumFactory.createText(new byte[0]); + } else { + datum = DatumFactory.createText(chars); + } + break; + } + case PROTOBUF: { + ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); + Message.Builder builder = factory.newBuilder(); + builder.mergeFrom(bytes, offset, length); + datum = factory.createDatum(builder); + break; + } + case INET4: + datum = DatumFactory.createInet4(bytes, offset, length); + break; + case BLOB: + datum = DatumFactory.createBlob(bytes, offset, length); + break; + default: + datum = NullDatum.get(); + } + return datum; + } + + private byte[] shortBytes = new byte[2]; + + public int writeShort(OutputStream out, short val) throws IOException { + shortBytes[0] = (byte) (val >> 8); + shortBytes[1] = (byte) val; + out.write(shortBytes, 0, 2); + return 2; + } + + public float toFloat(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 4); + + int val = ((bytes[offset] & 0x000000FF) << 24) + + ((bytes[offset + 1] & 0x000000FF) << 16) + + ((bytes[offset + 2] & 0x000000FF) << 8) + + (bytes[offset + 3] & 0x000000FF); + return Float.intBitsToFloat(val); + } + + private byte[] floatBytes = new byte[4]; + + public int writeFloat(OutputStream out, float f) throws IOException { + int val = Float.floatToIntBits(f); + + floatBytes[0] = (byte) (val >> 24); + floatBytes[1] = (byte) (val >> 16); + floatBytes[2] = (byte) (val >> 8); + floatBytes[3] = (byte) val; + out.write(floatBytes, 0, 4); + return floatBytes.length; + } + + public double toDouble(byte[] bytes, int offset, int length) { + Preconditions.checkArgument(length == 8); + long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) + + ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) + + ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) + + ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) + + ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) + + ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) + + ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) + + (long) (bytes[offset + 7] & 0x00000000000000FF); + return Double.longBitsToDouble(val); + } + + private byte[] doubleBytes = new byte[8]; + + public int writeDouble(OutputStream out, double d) throws IOException { + long val = Double.doubleToLongBits(d); + + doubleBytes[0] = (byte) (val >> 56); + doubleBytes[1] = (byte) (val >> 48); + doubleBytes[2] = (byte) (val >> 40); + doubleBytes[3] = (byte) (val >> 32); + doubleBytes[4] = (byte) (val >> 24); + doubleBytes[5] = (byte) (val >> 16); + doubleBytes[6] = (byte) (val >> 8); + doubleBytes[7] = (byte) val; + out.write(doubleBytes, 0, 8); + return doubleBytes.length; + } + + private byte[] vLongBytes = new byte[9]; + + public static int writeVLongToByteArray(byte[] bytes, int offset, long l) { + if (l >= -112 && l <= 127) { + bytes[offset] = (byte) l; + return 1; + } + + int len = -112; + if (l < 0) { + l ^= -1L; // take one's complement' + len = -120; + } + + long tmp = l; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + bytes[offset++] = (byte) len; + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); + } + return 1 + len; + } + + public int writeVLong(OutputStream out, long l) throws IOException { + int len = writeVLongToByteArray(vLongBytes, 0, l); + out.write(vLongBytes, 0, len); + return len; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java new file mode 100644 index 0000000..85c79fa --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import org.apache.hadoop.classification.InterfaceStability; + +/* this class is PooledBuffer holder */ +public class BufferPool { + + private static final PooledByteBufAllocator allocator; + + private BufferPool() { + } + + static { + //TODO we need determine the default params + allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); + + /* if you are finding memory leak, please enable this line */ + //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); + } + + public static long maxDirectMemory() { + return PlatformDependent.maxDirectMemory(); + } + + + public synchronized static ByteBuf directBuffer(int size) { + return allocator.directBuffer(size); + } + + /** + * + * @param size the initial capacity + * @param max the max capacity + * @return allocated ByteBuf from pool + */ + public static ByteBuf directBuffer(int size, int max) { + return allocator.directBuffer(size, max); + } + + @InterfaceStability.Unstable + public static void forceRelease(ByteBuf buf) { + buf.release(buf.refCnt()); + } + + /** + * the ByteBuf will increase to writable size + * @param buf + * @param minWritableBytes required minimum writable size + */ + public static void ensureWritable(ByteBuf buf, int minWritableBytes) { + buf.ensureWritable(minWritableBytes); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java new file mode 100644 index 0000000..b1b6d65 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.DFSInputStream; +import org.apache.hadoop.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.channels.spi.AbstractInterruptibleChannel; + +public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { + + ByteBufferReadable byteBufferReadable; + ReadableByteChannel channel; + InputStream inputStream; + + public ByteBufInputChannel(InputStream inputStream) { + if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) { + this.byteBufferReadable = (ByteBufferReadable) inputStream; + } else { + this.channel = Channels.newChannel(inputStream); + } + + this.inputStream = inputStream; + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public long read(ByteBuffer[] dsts) { + return read(dsts, 0, dsts.length); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (byteBufferReadable != null) { + return byteBufferReadable.read(dst); + } else { + return channel.read(dst); + } + } + + @Override + protected void implCloseChannel() throws IOException { + IOUtils.cleanup(null, channel, inputStream); + } + + public int available() throws IOException { + return inputStream.available(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java new file mode 100644 index 0000000..8841a31 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DataLocation.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DataLocation { + private String host; + private int volumeId; + + public DataLocation(String host, int volumeId) { + this.host = host; + this.volumeId = volumeId; + } + + public String getHost() { + return host; + } + + public int getVolumeId() { + return volumeId; + } + + @Override + public String toString() { + return "DataLocation{" + + "host=" + host + + ", volumeId=" + volumeId + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java new file mode 100644 index 0000000..2396349 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import java.util.ArrayList; +import java.util.List; + +public class DiskDeviceInfo { + private int id; + private String name; + + private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>(); + + public DiskDeviceInfo(int id) { + this.id = id; + } + + public int getId() { + return id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return id + "," + name; + } + + public void addMountPath(DiskMountInfo diskMountInfo) { + mountInfos.add(diskMountInfo); + } + + public List<DiskMountInfo> getMountInfos() { + return mountInfos; + } + + public void setMountInfos(List<DiskMountInfo> mountInfos) { + this.mountInfos = mountInfos; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java new file mode 100644 index 0000000..22f18ba --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskInfo.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +public class DiskInfo { + private int id; + private String partitionName; + private String mountPath; + + private long capacity; + private long used; + + public DiskInfo(int id, String partitionName) { + this.id = id; + this.partitionName = partitionName; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public String getPartitionName() { + return partitionName; + } + + public void setPartitionName(String partitionName) { + this.partitionName = partitionName; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java new file mode 100644 index 0000000..aadb0e7 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskMountInfo.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Objects; + +public class DiskMountInfo implements Comparable<DiskMountInfo> { + private String mountPath; + + private long capacity; + private long used; + + private int deviceId; + + public DiskMountInfo(int deviceId, String mountPath) { + this.mountPath = mountPath; + } + + public String getMountPath() { + return mountPath; + } + + public void setMountPath(String mountPath) { + this.mountPath = mountPath; + } + + public long getCapacity() { + return capacity; + } + + public void setCapacity(long capacity) { + this.capacity = capacity; + } + + public long getUsed() { + return used; + } + + public void setUsed(long used) { + this.used = used; + } + + public int getDeviceId() { + return deviceId; + } + + @Override + public boolean equals(Object obj){ + if (!(obj instanceof DiskMountInfo)) return false; + + if (compareTo((DiskMountInfo) obj) == 0) return true; + else return false; + } + + @Override + public int hashCode(){ + return Objects.hashCode(mountPath); + } + + @Override + public int compareTo(DiskMountInfo other) { + String path1 = mountPath; + String path2 = other.mountPath; + + int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ; + int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ; + + if(path1Depth > path2Depth) { + return -1; + } else if(path1Depth < path2Depth) { + return 1; + } else { + int path1Length = path1.length(); + int path2Length = path2.length(); + + if(path1Length < path2Length) { + return 1; + } else if(path1Length > path2Length) { + return -1; + } else { + return path1.compareTo(path2); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java new file mode 100644 index 0000000..2d68870 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java @@ -0,0 +1,207 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.common.Util; + +import java.io.*; +import java.net.URI; +import java.util.*; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; + +public class DiskUtil { + + static String UNIX_DISK_DEVICE_PATH = "/proc/partitions"; + + public enum OSType { + OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC + } + + static private OSType getOSType() { + String osName = System.getProperty("os.name"); + if (osName.contains("Windows") + && (osName.contains("XP") || osName.contains("2003") + || osName.contains("Vista") + || osName.contains("Windows_7") + || osName.contains("Windows 7") || osName + .contains("Windows7"))) { + return OSType.OS_TYPE_WINXP; + } else if (osName.contains("SunOS") || osName.contains("Solaris")) { + return OSType.OS_TYPE_SOLARIS; + } else if (osName.contains("Mac")) { + return OSType.OS_TYPE_MAC; + } else { + return OSType.OS_TYPE_UNIX; + } + } + + public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException { + List<DiskDeviceInfo> deviceInfos; + + if(getOSType() == OSType.OS_TYPE_UNIX) { + deviceInfos = getUnixDiskDeviceInfos(); + setDeviceMountInfo(deviceInfos); + } else { + deviceInfos = getDefaultDiskDeviceInfos(); + } + + return deviceInfos; + } + + private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() { + List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); + + File file = new File(UNIX_DISK_DEVICE_PATH); + if(!file.exists()) { + System.out.println("No partition file:" + file.getAbsolutePath()); + return getDefaultDiskDeviceInfos(); + } + + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH))); + String line = null; + + int count = 0; + Set<String> deviceNames = new TreeSet<String>(); + while((line = reader.readLine()) != null) { + if(count > 0 && !line.trim().isEmpty()) { + String[] tokens = line.trim().split(" +"); + if(tokens.length == 4) { + String deviceName = getDiskDeviceName(tokens[3]); + deviceNames.add(deviceName); + } + } + count++; + } + + int id = 0; + for(String eachDeviceName: deviceNames) { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++); + diskDeviceInfo.setName(eachDeviceName); + + //TODO set addtional info + // /sys/block/sda/queue + infos.add(diskDeviceInfo); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + if(reader != null) { + try { + reader.close(); + } catch (IOException e) { + } + } + } + + return infos; + } + + private static String getDiskDeviceName(String partitionName) { + byte[] bytes = partitionName.getBytes(); + + byte[] result = new byte[bytes.length]; + int length = 0; + for(int i = 0; i < bytes.length; i++, length++) { + if(bytes[i] >= '0' && bytes[i] <= '9') { + break; + } else { + result[i] = bytes[i]; + } + } + + return new String(result, 0, length); + } + + public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() { + DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0); + diskDeviceInfo.setName("default"); + + List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); + + infos.add(diskDeviceInfo); + + return infos; + } + + + private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException { + Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>(); + for(DiskDeviceInfo eachDevice: deviceInfos) { + deviceMap.put(eachDevice.getName(), eachDevice); + } + + BufferedReader mountOutput = null; + try { + Process mountProcess = Runtime.getRuntime().exec("mount"); + mountOutput = new BufferedReader(new InputStreamReader( + mountProcess.getInputStream())); + while (true) { + String line = mountOutput.readLine(); + if (line == null) { + break; + } + + int indexStart = line.indexOf(" on /"); + int indexEnd = line.indexOf(" ", indexStart + 4); + + String deviceName = line.substring(0, indexStart).trim(); + String[] deviceNameTokens = deviceName.split("/"); + if(deviceNameTokens.length == 3) { + if("dev".equals(deviceNameTokens[1])) { + String realDeviceName = getDiskDeviceName(deviceNameTokens[2]); + String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath(); + + DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName); + if(diskDeviceInfo != null) { + diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath)); + } + } + } + } + } catch (IOException e) { + throw e; + } finally { + if (mountOutput != null) { + mountOutput.close(); + } + } + } + + public static int getDataNodeStorageSize(){ + return getStorageDirs().size(); + } + + public static List<URI> getStorageDirs(){ + Configuration conf = new HdfsConfiguration(); + Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); + return Util.stringCollectionAsURIs(dirNames); + } + + public static void main(String[] args) throws Exception { + System.out.println("/dev/sde1".split("/").length); + for(String eachToken: "/dev/sde1".split("/")) { + System.out.println(eachToken); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java new file mode 100644 index 0000000..7df4584 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import io.netty.buffer.ByteBuf; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +import java.io.IOException; +import java.io.OutputStream; + + +public interface FieldSerializerDeserializer { + + public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; + + public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java new file mode 100644 index 0000000..8b7e2e0 --- /dev/null +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * + */ +package org.apache.tajo.storage; + +import com.google.common.base.Preconditions; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.IntervalDatum; +import org.apache.tajo.datum.ProtobufDatum; +import org.apache.tajo.exception.UnsupportedException; + +/** + * An instance of FrameTuple is an immutable tuple. + * It contains two tuples and pretends to be one instance of Tuple for + * join qual evaluatations. + */ +public class FrameTuple implements Tuple, Cloneable { + private int size; + private int leftSize; + + private Tuple left; + private Tuple right; + + public FrameTuple() {} + + public FrameTuple(Tuple left, Tuple right) { + set(left, right); + } + + public void set(Tuple left, Tuple right) { + this.size = left.size() + right.size(); + this.left = left; + this.leftSize = left.size(); + this.right = right; + } + + @Override + public int size() { + return size; + } + + @Override + public boolean contains(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.contains(fieldId); + } else { + return right.contains(fieldId - leftSize); + } + } + + @Override + public boolean isNull(int fieldid) { + return get(fieldid).isNull(); + } + + @Override + public boolean isNotNull(int fieldid) { + return !isNull(fieldid); + } + + @Override + public void clear() { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum value) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Datum[] values) { + throw new UnsupportedException(); + } + + @Override + public void put(int fieldId, Tuple tuple) { + throw new UnsupportedException(); + } + + @Override + public void setOffset(long offset) { + throw new UnsupportedException(); + } + + @Override + public long getOffset() { + throw new UnsupportedException(); + } + + @Override + public void put(Datum [] values) { + throw new UnsupportedException(); + } + + @Override + public Datum get(int fieldId) { + Preconditions.checkArgument(fieldId < size, + "Out of field access: " + fieldId); + + if (fieldId < leftSize) { + return left.get(fieldId); + } else { + return right.get(fieldId - leftSize); + } + } + + @Override + public boolean getBool(int fieldId) { + return get(fieldId).asBool(); + } + + @Override + public byte getByte(int fieldId) { + return get(fieldId).asByte(); + } + + @Override + public char getChar(int fieldId) { + return get(fieldId).asChar(); + } + + @Override + public byte [] getBytes(int fieldId) { + return get(fieldId).asByteArray(); + } + + @Override + public short getInt2(int fieldId) { + return get(fieldId).asInt2(); + } + + @Override + public int getInt4(int fieldId) { + return get(fieldId).asInt4(); + } + + @Override + public long getInt8(int fieldId) { + return get(fieldId).asInt8(); + } + + @Override + public float getFloat4(int fieldId) { + return get(fieldId).asFloat4(); + } + + @Override + public double getFloat8(int fieldId) { + return get(fieldId).asFloat8(); + } + + @Override + public String getText(int fieldId) { + return get(fieldId).asChars(); + } + + @Override + public ProtobufDatum getProtobufDatum(int fieldId) { + return (ProtobufDatum) get(fieldId); + } + + @Override + public IntervalDatum getInterval(int fieldId) { + return (IntervalDatum) get(fieldId); + } + + @Override + public char [] getUnicodeChars(int fieldId) { + return get(fieldId).asUnicodeChars(); + } + + @Override + public Tuple clone() throws CloneNotSupportedException { + FrameTuple frameTuple = (FrameTuple) super.clone(); + frameTuple.set(this.left.clone(), this.right.clone()); + return frameTuple; + } + + @Override + public Datum[] getValues(){ + throw new UnsupportedException(); + } + + public String toString() { + boolean first = true; + StringBuilder str = new StringBuilder(); + str.append("("); + for(int i=0; i < size(); i++) { + if(contains(i)) { + if(first) { + first = false; + } else { + str.append(", "); + } + str.append(i) + .append("=>") + .append(get(i)); + } + } + str.append(")"); + return str.toString(); + } +}
