http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index f807987..8a61cab 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -70,7 +70,7 @@ public class TestSortExec { util = TpchTestBase.getInstance().getTestingCluster(); catalog = util.getMaster().getCatalog(); workDir = CommonTestingUtil.getTestDir(TEST_PATH); - sm = StorageManager.getFileStorageManager(conf, workDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -82,7 +82,8 @@ public class TestSortExec { tablePath = StorageUtil.concatPath(workDir, "employee", "table1"); sm.getFileSystem().mkdirs(tablePath.getParent()); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(employeeMeta, schema, tablePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, tablePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = 0; i < 100; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index b74f634..68b3fb3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -565,8 +565,8 @@ public class TestJoinBroadcast extends QueryTestCaseBase { } Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv"); fileIndex++; - appender = StorageManager.getFileStorageManager(conf).getAppender(tableMeta, schema, - dataPath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(tableMeta, schema, dataPath); appender.init(); } String[] columnDatas = rows[i].split("\\|"); http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java index ac5ff13..b8f3ef7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java @@ -63,7 +63,7 @@ public class TestResultSet { public static void setup() throws Exception { util = TpchTestBase.getInstance().getTestingCluster(); conf = util.getConfiguration(); - sm = StorageManager.getFileStorageManager(conf); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf); scoreSchema = new Schema(); scoreSchema.addColumn("deptname", Type.TEXT); @@ -73,8 +73,7 @@ public class TestResultSet { Path p = sm.getTablePath("score"); sm.getFileSystem().mkdirs(p); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(scoreMeta, scoreSchema, - new Path(p, "score")); + Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score")); appender.init(); int deptSize = 100; int tupleNum = 10000; http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java index 2aa56db..f36ff24 100644 --- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java +++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java @@ -36,7 +36,6 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.FileUtil; import org.junit.After; import org.junit.Before; @@ -70,7 +69,8 @@ public class TestRowFile { TableMeta meta = CatalogUtil.newTableMeta(StoreType.ROWFILE); - FileStorageManager sm = StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR))); + FileStorageManager sm = + (FileStorageManager)StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR))); Path tablePath = new Path("/test"); Path metaPath = new Path(tablePath, ".meta"); @@ -80,7 +80,7 @@ public class TestRowFile { FileUtil.writeProto(fs, metaPath, meta.getProto()); - Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, dataPath); + Appender appender = sm.getAppender(meta, schema, dataPath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index 517f425..5a93538 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -185,7 +185,7 @@ public class TestRangeRetrieverHandler { reader.open(); TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema, StorageUtil.concatPath(testDir, "output", "output")); scanner.init(); @@ -308,7 +308,7 @@ public class TestRangeRetrieverHandler { new Path(testDir, "output/index"), keySchema, comp); reader.open(); TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet()); - SeekableScanner scanner = StorageManager.getSeekableScanner(conf, outputMeta, schema, + SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema, StorageUtil.concatPath(testDir, "output", "output")); scanner.init(); int cnt = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index d350889..eb8ada9 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -75,7 +75,12 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> <scope>provided</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml index 20cdf16..1c3c410 100644 --- a/tajo-jdbc/pom.xml +++ b/tajo-jdbc/pom.xml @@ -102,7 +102,11 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index a82aa46..82ccbdc 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -744,6 +744,22 @@ <groupId>org.apache.tajo</groupId> <artifactId>tajo-storage</artifactId> <version>${tajo.version}</version> + <type>pom</type> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> + <version>${tajo.version}</version> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hbase</artifactId> + <version>${tajo.version}</version> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index dee429f..8acb1a9 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -1,5 +1,5 @@ -<!-- <?xml version="1.0" encoding="UTF-8"?> +<!-- Copyright 2012 Database Lab., Korea Univ. Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,328 +16,47 @@ --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> <parent> <artifactId>tajo-project</artifactId> <groupId>org.apache.tajo</groupId> <version>0.9.1-SNAPSHOT</version> <relativePath>../tajo-project</relativePath> </parent> - + <modelVersion>4.0.0</modelVersion> <artifactId>tajo-storage</artifactId> - <packaging>jar</packaging> + <packaging>pom</packaging> <name>Tajo Storage</name> - <description>Tajo Storage Package</description> - <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <parquet.version>1.5.0</parquet.version> - <parquet.format.version>2.1.0</parquet.format.version> </properties> - <repositories> - <repository> - <id>repository.jboss.org</id> - <url>https://repository.jboss.org/nexus/content/repositories/releases/ - </url> - <snapshots> - <enabled>false</enabled> - </snapshots> - </repository> - </repositories> + <modules> + <module>tajo-storage-common</module> + <module>tajo-storage-hdfs</module> + <module>tajo-storage-hbase</module> + </modules> <build> <plugins> <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.6</source> - <target>1.6</target> - <encoding>${project.build.sourceEncoding}</encoding> - </configuration> - </plugin> - <plugin> <groupId>org.apache.rat</groupId> <artifactId>apache-rat-plugin</artifactId> - <executions> - <execution> - <phase>verify</phase> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - <configuration> - <excludes> - <exclude>src/test/resources/testVariousTypes.avsc</exclude> - </excludes> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <systemProperties> - <tajo.test>TRUE</tajo.test> - </systemProperties> - <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> - </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <version>2.4</version> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> + <artifactId>maven-surefire-report-plugin</artifactId> </plugin> <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <executions> - <execution> - <id>create-protobuf-generated-sources-directory</id> - <phase>initialize</phase> - <configuration> - <target> - <mkdir dir="target/generated-sources/proto" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>exec-maven-plugin</artifactId> - <version>1.2</version> - <executions> - <execution> - <id>generate-sources</id> - <phase>generate-sources</phase> - <configuration> - <executable>protoc</executable> - <arguments> - <argument>-Isrc/main/proto/</argument> - <argument>--proto_path=../tajo-common/src/main/proto</argument> - <argument>--proto_path=../tajo-catalog/tajo-catalog-common/src/main/proto</argument> - <argument>--java_out=target/generated-sources/proto</argument> - <argument>src/main/proto/IndexProtos.proto</argument> - <argument>src/main/proto/StorageFragmentProtos.proto</argument> - </arguments> - </configuration> - <goals> - <goal>exec</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.5</version> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>target/generated-sources/proto</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-pmd-plugin</artifactId> - <version>2.7.1</version> + <artifactId>maven-deploy-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> </plugin> </plugins> </build> - <dependencies> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-catalog-common</artifactId> - </dependency> - <dependency> - <groupId>org.apache.tajo</groupId> - <artifactId>tajo-plan</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>1.7.7</version> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <artifactId>zookeeper</artifactId> - <groupId>org.apache.zookeeper</groupId> - </exclusion> - <exclusion> - <artifactId>slf4j-api</artifactId> - <groupId>org.slf4j</groupId> - </exclusion> - <exclusion> - <artifactId>jersey-json</artifactId> - <groupId>com.sun.jersey</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey.jersey-test-framework</groupId> - <artifactId>jersey-test-framework-grizzly2</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>commons-el</groupId> - <artifactId>commons-el</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-runtime</artifactId> - </exclusion> - <exclusion> - <groupId>tomcat</groupId> - <artifactId>jasper-compiler</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jsp-2.1-jetty</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jersey.jersey-test-framework</groupId> - <artifactId>jersey-test-framework-grizzly2</artifactId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-server-tests</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-app</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-yarn-api</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-hs</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - <exclusion> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <groupId>org.apache.hadoop</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-column</artifactId> - <version>${parquet.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-hadoop</artifactId> - <version>${parquet.version}</version> - </dependency> - <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-format</artifactId> - <version>${parquet.format.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-buffer</artifactId> - </dependency> - </dependencies> - <profiles> <profile> <id>docs</id> @@ -382,7 +101,7 @@ <executions> <execution> <id>dist</id> - <phase>package</phase> + <phase>prepare-package</phase> <goals> <goal>run</goal> </goals> @@ -405,12 +124,15 @@ echo echo "Current directory `pwd`" echo - run rm -rf ${project.artifactId}-${project.version} - run mkdir ${project.artifactId}-${project.version} - run cd ${project.artifactId}-${project.version} - run cp -r ${basedir}/target/${project.artifactId}-${project.version}*.jar . + run rm -rf tajo-storage-${project.version} + run mkdir tajo-storage-${project.version} + run cd tajo-storage-${project.version} + run cp -r ${basedir}/tajo-storage-common/target/tajo-storage-common-${project.version}*.jar . + run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar . + run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar . + echo - echo "Tajo Storage dist layout available at: ${project.build.directory}/${project.artifactId}-${project.version}" + echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}" echo </echo> <exec executable="sh" dir="${project.build.directory}" failonerror="true"> @@ -430,11 +152,7 @@ <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-project-info-reports-plugin</artifactId> - <version>2.4</version> - <configuration> - <dependencyLocationsEnabled>false</dependencyLocationsEnabled> - </configuration> + <artifactId>maven-surefire-report-plugin</artifactId> </plugin> </plugins> </reporting> http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java deleted file mode 100644 index c5e96ac..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.statistics.TableStats; - -import java.io.Closeable; -import java.io.IOException; - -public interface Appender extends Closeable { - - void init() throws IOException; - - void addTuple(Tuple t) throws IOException; - - void flush() throws IOException; - - long getEstimatedOutputSize() throws IOException; - - void close() throws IOException; - - void enableStats(); - - TableStats getStats(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java deleted file mode 100644 index b829f60..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.datum.Datum; - -import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto; -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -/** - * The Comparator class for Tuples - * - * @see Tuple - */ -public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> { - private final Schema schema; - private final SortSpec [] sortSpecs; - private final int[] sortKeyIds; - private final boolean[] asc; - @SuppressWarnings("unused") - private final boolean[] nullFirsts; - - private Datum left; - private Datum right; - private int compVal; - - /** - * @param schema The schema of input tuples - * @param sortKeys The description of sort keys - */ - public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) { - Preconditions.checkArgument(sortKeys.length > 0, - "At least one sort key must be specified."); - - this.schema = schema; - this.sortSpecs = sortKeys; - this.sortKeyIds = new int[sortKeys.length]; - this.asc = new boolean[sortKeys.length]; - this.nullFirsts = new boolean[sortKeys.length]; - for (int i = 0; i < sortKeys.length; i++) { - if (sortKeys[i].getSortKey().hasQualifier()) { - this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName()); - } else { - this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName()); - } - - this.asc[i] = sortKeys[i].isAscending(); - this.nullFirsts[i]= sortKeys[i].isNullFirst(); - } - } - - public BaseTupleComparator(TupleComparatorProto proto) { - this.schema = new Schema(proto.getSchema()); - - this.sortSpecs = new SortSpec[proto.getSortSpecsCount()]; - for (int i = 0; i < proto.getSortSpecsCount(); i++) { - sortSpecs[i] = new SortSpec(proto.getSortSpecs(i)); - } - - this.sortKeyIds = new int[proto.getCompSpecsCount()]; - this.asc = new boolean[proto.getCompSpecsCount()]; - this.nullFirsts = new boolean[proto.getCompSpecsCount()]; - - for (int i = 0; i < proto.getCompSpecsCount(); i++) { - TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i); - sortKeyIds[i] = sortSepcProto.getColumnId(); - asc[i] = sortSepcProto.getAscending(); - nullFirsts[i] = sortSepcProto.getNullFirst(); - } - } - - public Schema getSchema() { - return schema; - } - - public SortSpec [] getSortSpecs() { - return sortSpecs; - } - - public int [] getSortKeyIds() { - return sortKeyIds; - } - - @Override - public boolean isAscendingFirstKey() { - return this.asc[0]; - } - - @Override - public int compare(Tuple tuple1, Tuple tuple2) { - for (int i = 0; i < sortKeyIds.length; i++) { - left = tuple1.get(sortKeyIds[i]); - right = tuple2.get(sortKeyIds[i]); - - if (left.isNull() || right.isNull()) { - if (!left.equals(right)) { - if (left.isNull()) { - compVal = 1; - } else if (right.isNull()) { - compVal = -1; - } - if (nullFirsts[i]) { - if (compVal != 0) { - compVal *= -1; - } - } - } else { - compVal = 0; - } - } else { - if (asc[i]) { - compVal = left.compareTo(right); - } else { - compVal = right.compareTo(left); - } - } - - if (compVal < 0 || compVal > 0) { - return compVal; - } - } - return 0; - } - - @Override - public int hashCode() { - return Objects.hashCode(sortKeyIds); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof BaseTupleComparator) { - BaseTupleComparator other = (BaseTupleComparator) obj; - if (sortKeyIds.length != other.sortKeyIds.length) { - return false; - } - - for (int i = 0; i < sortKeyIds.length; i++) { - if (sortKeyIds[i] != other.sortKeyIds[i] || - asc[i] != other.asc[i] || - nullFirsts[i] != other.nullFirsts[i]) { - return false; - } - } - - return true; - } else { - return false; - } - } - - @Override - public TupleComparatorProto getProto() { - TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder(); - builder.setSchema(schema.getProto()); - for (int i = 0; i < sortSpecs.length; i++) { - builder.addSortSpecs(sortSpecs[i].getProto()); - } - - TupleComparatorSpecProto.Builder sortSpecBuilder; - for (int i = 0; i < sortKeyIds.length; i++) { - sortSpecBuilder = TupleComparatorSpecProto.newBuilder(); - sortSpecBuilder.setColumnId(sortKeyIds[i]); - sortSpecBuilder.setAscending(asc[i]); - sortSpecBuilder.setNullFirst(nullFirsts[i]); - builder.addCompSpecs(sortSpecBuilder); - } - - return builder.build(); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - - String prefix = ""; - for (int i = 0; i < sortKeyIds.length; i++) { - sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i]) - .append(",Asc=").append(asc[i]) - .append(",NullFirst=").append(nullFirsts[i]); - prefix = " ,"; - } - return sb.toString(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java deleted file mode 100644 index 00112e7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Preconditions; -import com.google.protobuf.Message; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.*; -import org.apache.tajo.util.Bytes; - -import java.io.IOException; -import java.io.OutputStream; - -@Deprecated -public class BinarySerializerDeserializer implements SerializerDeserializer { - - static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)}; - - @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) - throws IOException { - byte[] bytes; - int length = 0; - if (datum == null || datum instanceof NullDatum) { - return 0; - } - - switch (col.getDataType().getType()) { - case BOOLEAN: - case BIT: - case CHAR: - bytes = datum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case INT2: - length = writeShort(out, datum.asInt2()); - break; - case INT4: - length = writeVLong(out, datum.asInt4()); - break; - case INT8: - length = writeVLong(out, datum.asInt8()); - break; - case FLOAT4: - length = writeFloat(out, datum.asFloat4()); - break; - case FLOAT8: - length = writeDouble(out, datum.asFloat8()); - break; - case TEXT: { - bytes = datum.asTextBytes(); - length = datum.size(); - if (length == 0) { - bytes = INVALID_UTF__SINGLE_BYTE; - length = INVALID_UTF__SINGLE_BYTE.length; - } - out.write(bytes, 0, bytes.length); - break; - } - case BLOB: - case INET4: - case INET6: - bytes = datum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobufDatum = (ProtobufDatum) datum; - bytes = protobufDatum.asByteArray(); - length = bytes.length; - out.write(bytes, 0, length); - break; - case NULL_TYPE: - break; - default: - throw new IOException("Does not support type"); - } - return length; - } - - @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { - if (length == 0) return NullDatum.get(); - - Datum datum; - switch (col.getDataType().getType()) { - case BOOLEAN: - datum = DatumFactory.createBool(bytes[offset]); - break; - case BIT: - datum = DatumFactory.createBit(bytes[offset]); - break; - case CHAR: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - datum = DatumFactory.createChar(chars); - break; - } - case INT2: - datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length)); - break; - case INT4: - datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset)); - break; - case INT8: - datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset)); - break; - case FLOAT4: - datum = DatumFactory.createFloat4(toFloat(bytes, offset, length)); - break; - case FLOAT8: - datum = DatumFactory.createFloat8(toDouble(bytes, offset, length)); - break; - case TEXT: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - - if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) { - datum = DatumFactory.createText(new byte[0]); - } else { - datum = DatumFactory.createText(chars); - } - break; - } - case PROTOBUF: { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(bytes, offset, length); - datum = factory.createDatum(builder); - break; - } - case INET4: - datum = DatumFactory.createInet4(bytes, offset, length); - break; - case BLOB: - datum = DatumFactory.createBlob(bytes, offset, length); - break; - default: - datum = NullDatum.get(); - } - return datum; - } - - private byte[] shortBytes = new byte[2]; - - public int writeShort(OutputStream out, short val) throws IOException { - shortBytes[0] = (byte) (val >> 8); - shortBytes[1] = (byte) val; - out.write(shortBytes, 0, 2); - return 2; - } - - public float toFloat(byte[] bytes, int offset, int length) { - Preconditions.checkArgument(length == 4); - - int val = ((bytes[offset] & 0x000000FF) << 24) + - ((bytes[offset + 1] & 0x000000FF) << 16) + - ((bytes[offset + 2] & 0x000000FF) << 8) + - (bytes[offset + 3] & 0x000000FF); - return Float.intBitsToFloat(val); - } - - private byte[] floatBytes = new byte[4]; - - public int writeFloat(OutputStream out, float f) throws IOException { - int val = Float.floatToIntBits(f); - - floatBytes[0] = (byte) (val >> 24); - floatBytes[1] = (byte) (val >> 16); - floatBytes[2] = (byte) (val >> 8); - floatBytes[3] = (byte) val; - out.write(floatBytes, 0, 4); - return floatBytes.length; - } - - public double toDouble(byte[] bytes, int offset, int length) { - Preconditions.checkArgument(length == 8); - long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) + - ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) + - ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) + - ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) + - ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) + - ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) + - ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) + - (long) (bytes[offset + 7] & 0x00000000000000FF); - return Double.longBitsToDouble(val); - } - - private byte[] doubleBytes = new byte[8]; - - public int writeDouble(OutputStream out, double d) throws IOException { - long val = Double.doubleToLongBits(d); - - doubleBytes[0] = (byte) (val >> 56); - doubleBytes[1] = (byte) (val >> 48); - doubleBytes[2] = (byte) (val >> 40); - doubleBytes[3] = (byte) (val >> 32); - doubleBytes[4] = (byte) (val >> 24); - doubleBytes[5] = (byte) (val >> 16); - doubleBytes[6] = (byte) (val >> 8); - doubleBytes[7] = (byte) val; - out.write(doubleBytes, 0, 8); - return doubleBytes.length; - } - - private byte[] vLongBytes = new byte[9]; - - public static int writeVLongToByteArray(byte[] bytes, int offset, long l) { - if (l >= -112 && l <= 127) { - bytes[offset] = (byte) l; - return 1; - } - - int len = -112; - if (l < 0) { - l ^= -1L; // take one's complement' - len = -120; - } - - long tmp = l; - while (tmp != 0) { - tmp = tmp >> 8; - len--; - } - - bytes[offset++] = (byte) len; - len = (len < -120) ? -(len + 120) : -(len + 112); - - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); - } - return 1 + len; - } - - public int writeVLong(OutputStream out, long l) throws IOException { - int len = writeVLongToByteArray(vLongBytes, 0, l); - out.write(vLongBytes, 0, len); - return len; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java deleted file mode 100644 index 85c79fa..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/BufferPool.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.util.internal.PlatformDependent; -import org.apache.hadoop.classification.InterfaceStability; - -/* this class is PooledBuffer holder */ -public class BufferPool { - - private static final PooledByteBufAllocator allocator; - - private BufferPool() { - } - - static { - //TODO we need determine the default params - allocator = new PooledByteBufAllocator(PlatformDependent.directBufferPreferred()); - - /* if you are finding memory leak, please enable this line */ - //ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED); - } - - public static long maxDirectMemory() { - return PlatformDependent.maxDirectMemory(); - } - - - public synchronized static ByteBuf directBuffer(int size) { - return allocator.directBuffer(size); - } - - /** - * - * @param size the initial capacity - * @param max the max capacity - * @return allocated ByteBuf from pool - */ - public static ByteBuf directBuffer(int size, int max) { - return allocator.directBuffer(size, max); - } - - @InterfaceStability.Unstable - public static void forceRelease(ByteBuf buf) { - buf.release(buf.refCnt()); - } - - /** - * the ByteBuf will increase to writable size - * @param buf - * @param minWritableBytes required minimum writable size - */ - public static void ensureWritable(ByteBuf buf, int minWritableBytes) { - buf.ensureWritable(minWritableBytes); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java deleted file mode 100644 index b1b6d65..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.fs.ByteBufferReadable; -import org.apache.hadoop.hdfs.DFSInputStream; -import org.apache.hadoop.io.IOUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.ScatteringByteChannel; -import java.nio.channels.spi.AbstractInterruptibleChannel; - -public class ByteBufInputChannel extends AbstractInterruptibleChannel implements ScatteringByteChannel { - - ByteBufferReadable byteBufferReadable; - ReadableByteChannel channel; - InputStream inputStream; - - public ByteBufInputChannel(InputStream inputStream) { - if (inputStream instanceof DFSInputStream && inputStream instanceof ByteBufferReadable) { - this.byteBufferReadable = (ByteBufferReadable) inputStream; - } else { - this.channel = Channels.newChannel(inputStream); - } - - this.inputStream = inputStream; - } - - @Override - public long read(ByteBuffer[] dsts, int offset, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public long read(ByteBuffer[] dsts) { - return read(dsts, 0, dsts.length); - } - - @Override - public int read(ByteBuffer dst) throws IOException { - if (byteBufferReadable != null) { - return byteBufferReadable.read(dst); - } else { - return channel.read(dst); - } - } - - @Override - protected void implCloseChannel() throws IOException { - IOUtils.cleanup(null, channel, inputStream); - } - - public int available() throws IOException { - return inputStream.available(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java deleted file mode 100644 index 1e2b0f3..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ /dev/null @@ -1,588 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.*; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.compress.CodecPool; -import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; -import org.apache.tajo.util.BytesUtils; - -import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; - -public class CSVFile { - - public static final byte LF = '\n'; - public static int EOF = -1; - - private static final Log LOG = LogFactory.getLog(CSVFile.class); - - public static class CSVAppender extends FileAppender { - private final TableMeta meta; - private final Schema schema; - private final int columnNum; - private final FileSystem fs; - private FSDataOutputStream fos; - private DataOutputStream outputStream; - private CompressionOutputStream deflateFilter; - private char delimiter; - private TableStatistics stats = null; - private Compressor compressor; - private CompressionCodecFactory codecFactory; - private CompressionCodec codec; - private Path compressedPath; - private byte[] nullChars; - private int BUFFER_SIZE = 128 * 1024; - private int bufferedBytes = 0; - private long pos = 0; - private boolean isShuffle; - - private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); - private SerializerDeserializer serde; - - public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, - final Schema schema, final TableMeta meta, final Path workDir) throws IOException { - super(conf, taskAttemptId, schema, meta, workDir); - this.fs = workDir.getFileSystem(conf); - this.meta = meta; - this.schema = schema; - this.delimiter = StringEscapeUtils.unescapeJava( - this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); - - this.columnNum = schema.size(); - - String nullCharacters = StringEscapeUtils.unescapeJava( - this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT)); - - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - @Override - public void init() throws IOException { - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.getParent().toString()); - } - - //determine the intermediate file type - String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname, - TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal); - if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) { - isShuffle = true; - } else { - isShuffle = false; - } - - if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { - String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); - codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByClassName(codecName); - compressor = CodecPool.getCompressor(codec); - if(compressor != null) compressor.reset(); //builtin gzip is null - - String extension = codec.getDefaultExtension(); - compressedPath = path.suffix(extension); - - if (fs.exists(compressedPath)) { - throw new AlreadyExistsStorageException(compressedPath); - } - - fos = fs.create(compressedPath); - deflateFilter = codec.createOutputStream(fos, compressor); - outputStream = new DataOutputStream(deflateFilter); - - } else { - if (fs.exists(path)) { - throw new AlreadyExistsStorageException(path); - } - fos = fs.create(path); - outputStream = new DataOutputStream(new BufferedOutputStream(fos)); - } - - if (enabledStats) { - this.stats = new TableStatistics(this.schema); - } - - try { - //It will be remove, because we will add custom serde in textfile - String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, - TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - os.reset(); - pos = fos.getPos(); - bufferedBytes = 0; - super.init(); - } - - - @Override - public void addTuple(Tuple tuple) throws IOException { - Datum datum; - int rowBytes = 0; - - for (int i = 0; i < columnNum; i++) { - datum = tuple.get(i); - rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars); - - if(columnNum - 1 > i){ - os.write((byte) delimiter); - rowBytes += 1; - } - if (isShuffle) { - // it is to calculate min/max values, and it is only used for the intermediate file. - stats.analyzeField(i, datum); - } - } - os.write(LF); - rowBytes += 1; - - pos += rowBytes; - bufferedBytes += rowBytes; - if(bufferedBytes > BUFFER_SIZE){ - flushBuffer(); - } - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } - - private void flushBuffer() throws IOException { - if(os.getLength() > 0) { - os.writeTo(outputStream); - os.reset(); - bufferedBytes = 0; - } - } - @Override - public long getOffset() throws IOException { - return pos; - } - - @Override - public void flush() throws IOException { - flushBuffer(); - outputStream.flush(); - } - - @Override - public void close() throws IOException { - - try { - flush(); - - // Statistical section - if (enabledStats) { - stats.setNumBytes(getOffset()); - } - - if(deflateFilter != null) { - deflateFilter.finish(); - deflateFilter.resetState(); - deflateFilter = null; - } - - os.close(); - } finally { - IOUtils.cleanup(LOG, fos); - if (compressor != null) { - CodecPool.returnCompressor(compressor); - compressor = null; - } - } - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } - - public boolean isCompress() { - return compressor != null; - } - - public String getExtension() { - return codec != null ? codec.getDefaultExtension() : ""; - } - } - - public static class CSVScanner extends FileScanner implements SeekableScanner { - public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) - throws IOException { - super(conf, schema, meta, fragment); - factory = new CompressionCodecFactory(conf); - codec = factory.getCodec(this.fragment.getPath()); - if (codec == null || codec instanceof SplittableCompressionCodec) { - splittable = true; - } - - //Delimiter - this.delimiter = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_DELIMITER, - meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0); - - String nullCharacters = StringEscapeUtils.unescapeJava( - meta.getOption(StorageConstants.TEXT_NULL, - meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT))); - - if (StringUtils.isEmpty(nullCharacters)) { - nullChars = NullDatum.get().asTextBytes(); - } else { - nullChars = nullCharacters.getBytes(); - } - } - - private final static int DEFAULT_PAGE_SIZE = 256 * 1024; - private char delimiter; - private FileSystem fs; - private FSDataInputStream fis; - private InputStream is; //decompressd stream - private CompressionCodecFactory factory; - private CompressionCodec codec; - private Decompressor decompressor; - private Seekable filePosition; - private boolean splittable = false; - private long startOffset, end, pos; - private int currentIdx = 0, validIdx = 0, recordCount = 0; - private int[] targetColumnIndexes; - private boolean eof = false; - private final byte[] nullChars; - private SplitLineReader reader; - private ArrayList<Long> fileOffsets; - private ArrayList<Integer> rowLengthList; - private ArrayList<Integer> startOffsets; - private NonSyncByteArrayOutputStream buffer; - private SerializerDeserializer serde; - - @Override - public void init() throws IOException { - fileOffsets = new ArrayList<Long>(); - rowLengthList = new ArrayList<Integer>(); - startOffsets = new ArrayList<Integer>(); - buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); - - // FileFragment information - if(fs == null) { - fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath()); - } - if(fis == null) fis = fs.open(fragment.getPath()); - - recordCount = 0; - pos = startOffset = fragment.getStartKey(); - end = startOffset + fragment.getLength(); - - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - if (codec instanceof SplittableCompressionCodec) { - SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream( - fis, decompressor, startOffset, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - - reader = new CompressedSplitLineReader(cIn, conf, null); - startOffset = cIn.getAdjustedStart(); - end = cIn.getAdjustedEnd(); - filePosition = cIn; - is = cIn; - } else { - is = new DataInputStream(codec.createInputStream(fis, decompressor)); - reader = new SplitLineReader(is, null); - filePosition = fis; - } - } else { - fis.seek(startOffset); - filePosition = fis; - is = fis; - reader = new SplitLineReader(is, null); - } - - if (targets == null) { - targets = schema.toArray(); - } - - targetColumnIndexes = new int[targets.length]; - for (int i = 0; i < targets.length; i++) { - targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - - try { - //FIXME - String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, - TextSerializerDeserializer.class.getName()); - serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - - super.init(); - Arrays.sort(targetColumnIndexes); - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end + - "," + fs.getFileStatus(fragment.getPath()).getLen()); - } - - if (startOffset != 0) { - pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos)); - } - eof = false; - page(); - } - - private int maxBytesToConsume(long pos) { - return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); - } - - private long fragmentable() throws IOException { - return end - getFilePosition(); - } - - private long getFilePosition() throws IOException { - long retVal; - if (isCompress()) { - retVal = filePosition.getPos(); - } else { - retVal = pos; - } - return retVal; - } - - private void page() throws IOException { -// // Index initialization - currentIdx = 0; - validIdx = 0; - int currentBufferPos = 0; - int bufferedSize = 0; - - buffer.reset(); - startOffsets.clear(); - rowLengthList.clear(); - fileOffsets.clear(); - - if(eof) { - return; - } - - while (DEFAULT_PAGE_SIZE >= bufferedSize){ - - int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE); - - if(ret == 0){ - break; - } else { - fileOffsets.add(pos); - pos += ret; - startOffsets.add(currentBufferPos); - currentBufferPos += rowLengthList.get(rowLengthList.size() - 1); - bufferedSize += ret; - validIdx++; - recordCount++; - } - - if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){ - eof = true; - break; - } - } - if (tableStats != null) { - tableStats.setReadBytes(pos - startOffset); - tableStats.setNumRows(recordCount); - } - } - - @Override - public float getProgress() { - try { - if(eof) { - return 1.0f; - } - long filePos = getFilePosition(); - if (startOffset == filePos) { - return 0.0f; - } else { - long readBytes = filePos - startOffset; - long remainingBytes = Math.max(end - filePos, 0); - return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes)); - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - return 0.0f; - } - } - - @Override - public Tuple next() throws IOException { - try { - if (currentIdx == validIdx) { - if (eof) { - return null; - } else { - page(); - - if(currentIdx == validIdx){ - return null; - } - } - } - - long offset = -1; - if(!isCompress()){ - offset = fileOffsets.get(currentIdx); - } - - byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), - rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); - currentIdx++; - return new LazyTuple(schema, cells, offset, nullChars, serde); - } catch (Throwable t) { - LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t); - LOG.error("Tuple list current index: " + currentIdx, t); - throw new IOException(t); - } - } - - private boolean isCompress() { - return codec != null; - } - - @Override - public void reset() throws IOException { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - - init(); - } - - @Override - public void close() throws IOException { - try { - if (tableStats != null) { - tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead) - tableStats.setNumRows(recordCount); - } - - IOUtils.cleanup(LOG, reader, is, fis); - fs = null; - is = null; - fis = null; - if (LOG.isDebugEnabled()) { - LOG.debug("CSVScanner processed record:" + recordCount); - } - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - decompressor = null; - } - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - } - - @Override - public void seek(long offset) throws IOException { - if(isCompress()) throw new UnsupportedException(); - - int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset); - - if (tupleIndex > -1) { - this.currentIdx = tupleIndex; - } else if (isSplittable() && end >= offset || startOffset <= offset) { - eof = false; - fis.seek(offset); - pos = offset; - reader.reset(); - this.currentIdx = 0; - this.validIdx = 0; - // pageBuffer(); - } else { - throw new IOException("invalid offset " + - " < start : " + startOffset + " , " + - " end : " + end + " , " + - " filePos : " + filePosition.getPos() + " , " + - " input offset : " + offset + " >"); - } - } - - @Override - public long getNextOffset() throws IOException { - if(isCompress()) throw new UnsupportedException(); - - if (this.currentIdx == this.validIdx) { - if (fragmentable() <= 0) { - return -1; - } else { - page(); - if(currentIdx == validIdx) return -1; - } - } - return fileOffsets.get(currentIdx); - } - - @Override - public boolean isSplittable(){ - return splittable; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java deleted file mode 100644 index 4f58e68..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; - -/** - * Line reader for compressed splits - * - * Reading records from a compressed split is tricky, as the - * LineRecordReader is using the reported compressed input stream - * position directly to determine when a split has ended. In addition the - * compressed input stream is usually faking the actual byte position, often - * updating it only after the first compressed block after the split is - * accessed. - * - * Depending upon where the last compressed block of the split ends relative - * to the record delimiters it can be easy to accidentally drop the last - * record or duplicate the last record between this split and the next. - * - * Split end scenarios: - * - * 1) Last block of split ends in the middle of a record - * Nothing special that needs to be done here, since the compressed input - * stream will report a position after the split end once the record - * is fully read. The consumer of the next split will discard the - * partial record at the start of the split normally, and no data is lost - * or duplicated between the splits. - * - * 2) Last block of split ends in the middle of a delimiter - * The line reader will continue to consume bytes into the next block to - * locate the end of the delimiter. If a custom delimiter is being used - * then the next record must be read by this split or it will be dropped. - * The consumer of the next split will not recognize the partial - * delimiter at the beginning of its split and will discard it along with - * the next record. - * - * However for the default delimiter processing there is a special case - * because CR, LF, and CRLF are all valid record delimiters. If the - * block ends with a CR then the reader must peek at the next byte to see - * if it is an LF and therefore part of the same record delimiter. - * Peeking at the next byte is an access to the next block and triggers - * the stream to report the end of the split. There are two cases based - * on the next byte: - * - * A) The next byte is LF - * The split needs to end after the current record is returned. The - * consumer of the next split will discard the first record, which - * is degenerate since LF is itself a delimiter, and start consuming - * records after that byte. If the current split tries to read - * another record then the record will be duplicated between splits. - * - * B) The next byte is not LF - * The current record will be returned but the stream will report - * the split has ended due to the peek into the next block. If the - * next record is not read then it will be lost, as the consumer of - * the next split will discard it before processing subsequent - * records. Therefore the next record beyond the reported split end - * must be consumed by this split to avoid data loss. - * - * 3) Last block of split ends at the beginning of a delimiter - * This is equivalent to case 1, as the reader will consume bytes into - * the next block and trigger the end of the split. No further records - * should be read as the consumer of the next split will discard the - * (degenerate) record at the beginning of its split. - * - * 4) Last block of split ends at the end of a delimiter - * Nothing special needs to be done here. The reader will not start - * examining the bytes into the next block until the next record is read, - * so the stream will not report the end of the split just yet. Once the - * next record is read then the next block will be accessed and the - * stream will indicate the end of the split. The consumer of the next - * split will correctly discard the first record of its split, and no - * data is lost or duplicated. - * - * If the default delimiter is used and the block ends at a CR then this - * is treated as case 2 since the reader does not yet know without - * looking at subsequent bytes whether the delimiter has ended. - * - * NOTE: It is assumed that compressed input streams *never* return bytes from - * multiple compressed blocks from a single read. Failure to do so will - * violate the buffering performed by this class, as it will access - * bytes into the next block after the split before returning all of the - * records from the previous block. - */ - -public class CompressedSplitLineReader extends SplitLineReader { - SplitCompressionInputStream scin; - private boolean usingCRLF; - private boolean needAdditionalRecord = false; - private boolean finished = false; - - public CompressedSplitLineReader(SplitCompressionInputStream in, - Configuration conf, - byte[] recordDelimiterBytes) - throws IOException { - super(in, conf, recordDelimiterBytes); - scin = in; - usingCRLF = (recordDelimiterBytes == null); - } - - @Override - protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) - throws IOException { - int bytesRead = in.read(buffer); - - // If the split ended in the middle of a record delimiter then we need - // to read one additional record, as the consumer of the next split will - // not recognize the partial delimiter as a record. - // However if using the default delimiter and the next character is a - // linefeed then next split will treat it as a delimiter all by itself - // and the additional record read should not be performed. - if (inDelimiter && bytesRead > 0) { - if (usingCRLF) { - needAdditionalRecord = (buffer[0] != '\n'); - } else { - needAdditionalRecord = true; - } - } - return bytesRead; - } - - @Override - public int readLine(Text str, int maxLineLength, int maxBytesToConsume) - throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength - , int maxBytesToConsume) throws IOException { - int bytesRead = 0; - if (!finished) { - // only allow at most one more record to be read after the stream - // reports the split ended - if (scin.getPos() > scin.getAdjustedEnd()) { - finished = true; - } - - bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume); - } - return bytesRead; - } - - @Override - public boolean needAdditionalRecordAfterSplit() { - return !finished && needAdditionalRecord; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java deleted file mode 100644 index 8841a31..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DataLocation.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class DataLocation { - private String host; - private int volumeId; - - public DataLocation(String host, int volumeId) { - this.host = host; - this.volumeId = volumeId; - } - - public String getHost() { - return host; - } - - public int getVolumeId() { - return volumeId; - } - - @Override - public String toString() { - return "DataLocation{" + - "host=" + host + - ", volumeId=" + volumeId + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java deleted file mode 100644 index 2396349..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskDeviceInfo.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import java.util.ArrayList; -import java.util.List; - -public class DiskDeviceInfo { - private int id; - private String name; - - private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>(); - - public DiskDeviceInfo(int id) { - this.id = id; - } - - public int getId() { - return id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - @Override - public String toString() { - return id + "," + name; - } - - public void addMountPath(DiskMountInfo diskMountInfo) { - mountInfos.add(diskMountInfo); - } - - public List<DiskMountInfo> getMountInfos() { - return mountInfos; - } - - public void setMountInfos(List<DiskMountInfo> mountInfos) { - this.mountInfos = mountInfos; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java deleted file mode 100644 index 22f18ba..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskInfo.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class DiskInfo { - private int id; - private String partitionName; - private String mountPath; - - private long capacity; - private long used; - - public DiskInfo(int id, String partitionName) { - this.id = id; - this.partitionName = partitionName; - } - - public int getId() { - return id; - } - - public void setId(int id) { - this.id = id; - } - - public String getPartitionName() { - return partitionName; - } - - public void setPartitionName(String partitionName) { - this.partitionName = partitionName; - } - - public String getMountPath() { - return mountPath; - } - - public void setMountPath(String mountPath) { - this.mountPath = mountPath; - } - - public long getCapacity() { - return capacity; - } - - public void setCapacity(long capacity) { - this.capacity = capacity; - } - - public long getUsed() { - return used; - } - - public void setUsed(long used) { - this.used = used; - } - - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java deleted file mode 100644 index aadb0e7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskMountInfo.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; - -public class DiskMountInfo implements Comparable<DiskMountInfo> { - private String mountPath; - - private long capacity; - private long used; - - private int deviceId; - - public DiskMountInfo(int deviceId, String mountPath) { - this.mountPath = mountPath; - } - - public String getMountPath() { - return mountPath; - } - - public void setMountPath(String mountPath) { - this.mountPath = mountPath; - } - - public long getCapacity() { - return capacity; - } - - public void setCapacity(long capacity) { - this.capacity = capacity; - } - - public long getUsed() { - return used; - } - - public void setUsed(long used) { - this.used = used; - } - - public int getDeviceId() { - return deviceId; - } - - @Override - public boolean equals(Object obj){ - if (!(obj instanceof DiskMountInfo)) return false; - - if (compareTo((DiskMountInfo) obj) == 0) return true; - else return false; - } - - @Override - public int hashCode(){ - return Objects.hashCode(mountPath); - } - - @Override - public int compareTo(DiskMountInfo other) { - String path1 = mountPath; - String path2 = other.mountPath; - - int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ; - int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ; - - if(path1Depth > path2Depth) { - return -1; - } else if(path1Depth < path2Depth) { - return 1; - } else { - int path1Length = path1.length(); - int path2Length = path2.length(); - - if(path1Length < path2Length) { - return 1; - } else if(path1Length > path2Length) { - return -1; - } else { - return path1.compareTo(path2); - } - } - } -}
