http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java index c11db6f..925c047 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java @@ -30,12 +30,10 @@ import java.io.IOException; public class TajoQueryEngine { - private final StorageManager storageManager; private final PhysicalPlanner phyPlanner; public TajoQueryEngine(TajoConf conf) throws IOException { - this.storageManager = StorageManager.getStorageManager(conf); - this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager); + this.phyPlanner = new PhysicalPlannerImpl(conf); } public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index cb038df..00eabcc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.catalog.Schema; @@ -53,10 +52,7 @@ import org.apache.tajo.plan.logical.*; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; @@ -108,39 +104,6 @@ public class Task { private Schema finalSchema = null; private TupleComparator sortComp = null; - static final String OUTPUT_FILE_PREFIX="part-"; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(2); - return fmt; - } - }; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(6); - return fmt; - } - }; - - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - return fmt; - } - }; - public Task(String taskRunnerId, Path baseDir, QueryUnitAttemptId taskId, @@ -190,13 +153,8 @@ public class Task { this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys()); } } else { - // The final result of a task will be written in a file named part-ss-nnnnnnn, - // where ss is the subquery id associated with this task, and nnnnnn is the task id. - Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME, - OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" + - OUTPUT_FILE_FORMAT_SEQ.get().format(0)); + Path outFilePath = ((FileStorageManager)StorageManager.getFileStorageManager(systemConf)) + .getAppenderFilePath(taskId, queryContext.getStagingDir()); LOG.info("Output File Path: " + outFilePath); context.setOutputPath(outFilePath); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/resources/webapps/worker/queryunit.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp index 18a67d8..49635d1 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp @@ -41,6 +41,7 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.Map" %> <%@ page import="java.util.Set" %> +<%@ page import="org.apache.tajo.storage.fragment.Fragment" %> <% String paramQueryId = request.getParameter("queryId"); @@ -102,8 +103,8 @@ String fragmentInfo = ""; String delim = ""; for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) { - FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); - fragmentInfo += delim + fileFragment.toString(); + Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment); + fragmentInfo += delim + fragment.toString(); delim = "<br/>"; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java index 45d3c51..fb98be2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java @@ -47,7 +47,7 @@ public class BackendTestingUtil { public static void writeTmpTable(TajoConf conf, Path tablePath) throws IOException { - StorageManager sm = StorageManager.getStorageManager(conf, tablePath); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, tablePath); FileSystem fs = sm.getFileSystem(); Appender appender; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java new file mode 100644 index 0000000..a8e4a5c --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.master.ServerManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; +import org.apache.tajo.util.Bytes; + +import java.io.File; +import java.io.IOException; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY; + +public class HBaseTestClusterUtil { + private static final Log LOG = LogFactory.getLog(HBaseTestClusterUtil.class); + private Configuration conf; + private MiniHBaseCluster hbaseCluster; + private MiniZooKeeperCluster zkCluster; + private File testBaseDir; + public HBaseTestClusterUtil(Configuration conf, File testBaseDir) { + this.conf = conf; + this.testBaseDir = testBaseDir; + } + /** + * Returns the path to the default root dir the minicluster uses. + * Note: this does not cause the root dir to be created. + * @return Fully qualified path for the default hbase root dir + * @throws java.io.IOException + */ + public Path getDefaultRootDirPath() throws IOException { + FileSystem fs = FileSystem.get(this.conf); + return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase"); + } + + /** + * Creates an hbase rootdir in user home directory. Also creates hbase + * version file. Normally you won't make use of this method. Root hbasedir + * is created for you as part of mini cluster startup. You'd only use this + * method if you were doing manual operation. + * @return Fully qualified path to hbase root dir + * @throws java.io.IOException + */ + public Path createRootDir() throws IOException { + FileSystem fs = FileSystem.get(this.conf); + Path hbaseRootdir = getDefaultRootDirPath(); + FSUtils.setRootDir(this.conf, hbaseRootdir); + fs.mkdirs(hbaseRootdir); + FSUtils.setVersion(fs, hbaseRootdir); + return hbaseRootdir; + } + + public void stopHBaseCluster() throws IOException { + if (hbaseCluster != null) { + LOG.info("MiniHBaseCluster stopped"); + hbaseCluster.shutdown(); + hbaseCluster.waitUntilShutDown(); + hbaseCluster = null; + } + } + + public void startHBaseCluster() throws Exception { + if (zkCluster == null) { + startMiniZKCluster(); + } + if (hbaseCluster != null) { + return; + } + + System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file"); + if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1); + } + if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) { + conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); + } + conf.setBoolean(REPLICATION_ENABLE_KEY, false); + createRootDir(); + + Configuration c = HBaseConfiguration.create(this.conf); + + hbaseCluster = new MiniHBaseCluster(c, 1); + + // Don't leave here till we've done a successful scan of the hbase:meta + HTable t = new HTable(c, TableName.META_TABLE_NAME); + ResultScanner s = t.getScanner(new Scan()); + while (s.next() != null) { + continue; + } + s.close(); + t.close(); + LOG.info("MiniHBaseCluster started"); + + } + + /** + * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set + * the port mentionned is used as the default port for ZooKeeper. + */ + public MiniZooKeeperCluster startMiniZKCluster() + throws Exception { + File zkDataPath = new File(testBaseDir, "zk"); + if (this.zkCluster != null) { + throw new IOException("Cluster already running at " + zkDataPath); + } + this.zkCluster = new MiniZooKeeperCluster(conf); + final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0); + if (defPort > 0){ + // If there is a port in the config file, we use it. + this.zkCluster.setDefaultClientPort(defPort); + } + int clientPort = this.zkCluster.startup(zkDataPath, 1); + this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort)); + LOG.info("MiniZooKeeperCluster started"); + + return this.zkCluster; + } + + public void stopZooKeeperCluster() throws IOException { + if (zkCluster != null) { + LOG.info("MiniZooKeeperCluster stopped"); + zkCluster.shutdown(); + zkCluster = null; + } + } + + public Configuration getConf() { + return conf; + } + + public MiniZooKeeperCluster getMiniZooKeeperCluster() { + return zkCluster; + } + + public MiniHBaseCluster getMiniHBaseCluster() { + return hbaseCluster; + } + + public HTableDescriptor getTableDescriptor(String tableName) throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + try { + return admin.getTableDescriptor(Bytes.toBytes(tableName)); + } finally { + admin.close(); + } + } + + public void createTable(HTableDescriptor hTableDesc) throws IOException { + HBaseAdmin admin = new HBaseAdmin(conf); + try { + admin.createTable(hTableDesc); + } finally { + admin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index ecfb9f5..875e450 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -169,9 +169,9 @@ public class QueryTestCaseBase { private static String currentDatabase; private static Set<String> createdTableGlobalSet = new HashSet<String>(); // queries and results directory corresponding to subclass class. - private Path currentQueryPath; - private Path currentResultPath; - private Path currentDatasetPath; + protected Path currentQueryPath; + protected Path currentResultPath; + protected Path currentDatasetPath; // for getting a method name @Rule public TestName name = new TestName(); @@ -303,7 +303,7 @@ public class QueryTestCaseBase { return executeFile(getMethodName() + ".sql"); } - private String getMethodName() { + protected String getMethodName() { String methodName = name.getMethodName(); // In the case of parameter execution name's pattern is methodName[0] if (methodName.endsWith("]")) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 757ba0f..64c27e0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -70,6 +70,7 @@ public class TajoTestingCluster { private FileSystem defaultFS; private MiniDFSCluster dfsCluster; private MiniCatalogServer catalogServer; + private HBaseTestClusterUtil hbaseUtil; private TajoMaster tajoMaster; private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>(); @@ -284,6 +285,10 @@ public class TajoTestingCluster { return this.defaultFS; } + public HBaseTestClusterUtil getHBaseUtil() { + return hbaseUtil; + } + //////////////////////////////////////////////////////// // Catalog Section //////////////////////////////////////////////////////// @@ -507,6 +512,8 @@ public class TajoTestingCluster { startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts); this.dfsCluster.waitClusterUp(); + hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); + if(!standbyWorkerMode) { startMiniYarnCluster(); } @@ -594,7 +601,6 @@ public class TajoTestingCluster { } if(this.dfsCluster != null) { - try { FileSystem fs = this.dfsCluster.getFileSystem(); if (fs != null) fs.close(); @@ -613,6 +619,10 @@ public class TajoTestingCluster { } this.clusterTestBuildDir = null; } + + hbaseUtil.stopZooKeeperCluster(); + hbaseUtil.stopHBaseCluster(); + LOG.info("Minicluster is down"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java index 3437e3a..9ce7b5b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java @@ -40,10 +40,10 @@ import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -380,7 +380,7 @@ public class TestPlannerUtil { int index = 0; for (int i = startIndex; i < startIndex + expectedSize; i++, index++) { - FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]); + FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), fragments[index]); assertEquals(expectedFiles.get(i), fragment.getPath()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java index 0e7f3e6..3803c7a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -42,10 +42,7 @@ import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.logical.*; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.util.CommonTestingUtil; import org.junit.After; import org.junit.Before; @@ -140,8 +137,8 @@ public class TestBroadcastJoinPlan { contentsData += j; } } - Appender appender = StorageManager.getStorageManager(conf).getAppender(tableMeta, schema, - dataPath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(tableMeta, schema, dataPath); appender.init(); Tuple tuple = new VTuple(schema.size()); int writtenSize = 0; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java index aef8064..6a6aafb 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -79,7 +79,7 @@ public class TestBNLJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -89,7 +89,8 @@ public class TestBNLJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, employeePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = 0; i < OUTER_TUPLE_NUM; i++) { @@ -110,7 +111,8 @@ public class TestBNLJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < INNER_TUPLE_NUM; i += 2) { @@ -150,10 +152,10 @@ public class TestBNLJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -162,7 +164,7 @@ public class TestBNLJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -183,9 +185,9 @@ public class TestBNLJoinExec { LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context).getRootBlock().getRoot(); - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -201,7 +203,7 @@ public class TestBNLJoinExec { ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java index 60ae849..dc3c28d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -66,7 +66,7 @@ public class TestBSTIndexExec { private SQLAnalyzer analyzer; private LogicalPlanner planner; private LogicalOptimizer optimizer; - private StorageManager sm; + private FileStorageManager sm; private Schema idxSchema; private BaseTupleComparator comp; private BSTIndex.BSTIndexWriter writer; @@ -91,7 +91,7 @@ public class TestBSTIndexExec { Path workDir = CommonTestingUtil.getTestDir(); catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); - sm = StorageManager.getStorageManager(conf, workDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir); idxPath = new Path(workDir, "test.idx"); @@ -117,8 +117,7 @@ public class TestBSTIndexExec { fs = tablePath.getFileSystem(conf); fs.mkdirs(tablePath.getParent()); - FileAppender appender = (FileAppender)StorageManager.getStorageManager(conf).getAppender(meta, schema, - tablePath); + FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath); appender.init(); Tuple tuple = new VTuple(schema.size()); for (int i = 0; i < 10000; i++) { @@ -164,7 +163,7 @@ public class TestBSTIndexExec { this.rndKey = rnd.nextInt(250); final String QUERY = "select * from employee where managerId = " + rndKey; - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual"); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir); @@ -172,7 +171,7 @@ public class TestBSTIndexExec { LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); LogicalNode rootNode = optimizer.optimize(plan); - TmpPlanner phyPlanner = new TmpPlanner(conf, sm); + TmpPlanner phyPlanner = new TmpPlanner(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); int tupleCount = this.randomValues.get(rndKey); @@ -186,8 +185,8 @@ public class TestBSTIndexExec { } private class TmpPlanner extends PhysicalPlannerImpl { - public TmpPlanner(TajoConf conf, StorageManager sm) { - super(conf, sm); + public TmpPlanner(TajoConf conf) { + super(conf); } @Override @@ -196,12 +195,11 @@ public class TestBSTIndexExec { Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()), "Error: There is no table matched to %s", scanNode.getTableName()); - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(), - ctx.getTables(scanNode.getTableName())); + List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName())); Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)}; - return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum); + return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java index bb40b28..c0bf6ce 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -59,7 +59,7 @@ public class TestExternalSortExec { private CatalogService catalog; private SQLAnalyzer analyzer; private LogicalPlanner planner; - private StorageManager sm; + private FileStorageManager sm; private Path testDir; private final int numTuple = 100000; @@ -76,7 +76,7 @@ public class TestExternalSortExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); - sm = StorageManager.getStorageManager(conf, testDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir); Schema schema = new Schema(); schema.addColumn("managerid", Type.INT4); @@ -85,7 +85,8 @@ public class TestExternalSortExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, schema, employeePath); appender.enableStats(); appender.init(); Tuple tuple = new VTuple(schema.size()); @@ -121,7 +122,7 @@ public class TestExternalSortExec { @Test public final void testNext() throws IOException, PlanningException { - FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), + FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), @@ -131,7 +132,7 @@ public class TestExternalSortExec { LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); LogicalNode rootNode = plan.getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); ProjectionExec proj = (ProjectionExec) exec; @@ -141,8 +142,7 @@ public class TestExternalSortExec { UnaryPhysicalExec sortExec = proj.getChild(); SeqScanExec scan = sortExec.getChild(); - ExternalSortExec extSort = new ExternalSortExec(ctx, sm, - ((MemSortExec)sortExec).getPlan(), scan); + ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan); proj.setChild(extSort); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java index c4ce43b..ecd1c23 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -61,7 +61,7 @@ public class TestFullOuterHashJoinExec { private CatalogService catalog; private SQLAnalyzer analyzer; private LogicalPlanner planner; - private StorageManager sm; + private FileStorageManager sm; private Path testDir; private QueryContext defaultContext; @@ -84,7 +84,7 @@ public class TestFullOuterHashJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -107,7 +107,8 @@ public class TestFullOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -136,7 +137,8 @@ public class TestFullOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -175,7 +177,8 @@ public class TestFullOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -227,8 +230,8 @@ public class TestFullOuterHashJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, - phone3Path); + Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); @@ -266,9 +269,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), + FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); @@ -277,7 +280,7 @@ public class TestFullOuterHashJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -305,9 +308,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); @@ -316,7 +319,7 @@ public class TestFullOuterHashJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -343,9 +346,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); @@ -354,7 +357,7 @@ public class TestFullOuterHashJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -382,9 +385,9 @@ public class TestFullOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); @@ -394,7 +397,7 @@ public class TestFullOuterHashJoinExec { workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java index a82de92..a81979f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -40,10 +40,7 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; @@ -92,7 +89,7 @@ public class TestFullOuterMergeJoinExec { catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -115,7 +112,8 @@ public class TestFullOuterMergeJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -153,7 +151,8 @@ public class TestFullOuterMergeJoinExec { TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep4Path = new Path(testDir, "dep4.csv"); - Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path); + Appender appender4 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep4Meta, dep4Schema, dep4Path); appender4.init(); Tuple tuple4 = new VTuple(dep4Schema.size()); for (int i = 0; i < 11; i++) { @@ -184,7 +183,8 @@ public class TestFullOuterMergeJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -223,7 +223,8 @@ public class TestFullOuterMergeJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -275,8 +276,8 @@ public class TestFullOuterMergeJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, - phone3Path); + Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); appender5.close(); @@ -318,9 +319,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] dep3Frags = - StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0"); @@ -328,7 +329,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -355,9 +356,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] job3Frags = - StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1"); @@ -365,7 +366,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -392,9 +393,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] job3Frags = - StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2"); @@ -402,7 +403,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -430,9 +431,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] dep4Frags = - StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3"); @@ -440,7 +441,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -470,9 +471,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] phone3Frags = - StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); @@ -481,7 +482,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -509,9 +510,9 @@ public class TestFullOuterMergeJoinExec { enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); FileFragment[] emp3Frags = - StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); + FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] phone3Frags = - StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags); @@ -520,7 +521,7 @@ public class TestFullOuterMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -537,9 +538,4 @@ public class TestFullOuterMergeJoinExec { exec.close(); assertEquals(7, count); } - - - - - } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java index 20d4651..4fe6ff2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java @@ -76,7 +76,7 @@ public class TestHashAntiJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema employeeSchema = new Schema(); employeeSchema.addColumn("managerid", Type.INT4); @@ -86,8 +86,8 @@ public class TestHashAntiJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, - employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); Tuple tuple = new VTuple(employeeSchema.size()); @@ -112,7 +112,8 @@ public class TestHashAntiJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < 10; i += 2) { @@ -150,9 +151,9 @@ public class TestHashAntiJoinExec { @Test public final void testHashAntiJoin() throws IOException, PlanningException { - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -166,7 +167,7 @@ public class TestHashAntiJoinExec { optimizer.optimize(plan); LogicalNode rootNode = plan.getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); // replace an equal join with an hash anti join. http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java index d1fa28a..55e87d4 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java @@ -78,7 +78,7 @@ public class TestHashJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema employeeSchema = new Schema(); employeeSchema.addColumn("managerid", Type.INT4); @@ -88,8 +88,8 @@ public class TestHashJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, - employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); Tuple tuple = new VTuple(employeeSchema.size()); for (int i = 0; i < 10; i++) { @@ -111,7 +111,8 @@ public class TestHashJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < 10; i += 2) { @@ -152,9 +153,9 @@ public class TestHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -163,7 +164,7 @@ public class TestHashJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -195,9 +196,9 @@ public class TestHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -207,7 +208,7 @@ public class TestHashJoinExec { ctx.setEnforcer(enforcer); ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l); - PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java index 7a43a55..a2f1155 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java @@ -38,10 +38,7 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; @@ -80,7 +77,7 @@ public class TestHashSemiJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema employeeSchema = new Schema(); employeeSchema.addColumn("managerid", Type.INT4); @@ -90,8 +87,8 @@ public class TestHashSemiJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, - employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); Tuple tuple = new VTuple(employeeSchema.size()); @@ -116,7 +113,8 @@ public class TestHashSemiJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); // make 27 tuples @@ -158,9 +156,9 @@ public class TestHashSemiJoinExec { @Test public final void testHashSemiJoin() throws IOException, PlanningException { - FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -174,7 +172,7 @@ public class TestHashSemiJoinExec { optimizer.optimize(plan); LogicalNode rootNode = plan.getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); // replace an equal join with an hash anti join. http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java index ec9daa7..0477771 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java @@ -85,7 +85,7 @@ public class TestLeftOuterHashJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -108,7 +108,8 @@ public class TestLeftOuterHashJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -137,7 +138,8 @@ public class TestLeftOuterHashJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -176,7 +178,8 @@ public class TestLeftOuterHashJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -228,8 +231,8 @@ public class TestLeftOuterHashJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, - phone3Path); + Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); @@ -270,9 +273,9 @@ public class TestLeftOuterHashJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); - FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), + FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); @@ -281,7 +284,7 @@ public class TestLeftOuterHashJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -300,9 +303,9 @@ public class TestLeftOuterHashJoinExec { @Test public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException { - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); @@ -314,7 +317,7 @@ public class TestLeftOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[1]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -341,9 +344,9 @@ public class TestLeftOuterHashJoinExec { @Test public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); @@ -355,7 +358,7 @@ public class TestLeftOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[2]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -383,9 +386,9 @@ public class TestLeftOuterHashJoinExec { @Test public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); @@ -397,7 +400,7 @@ public class TestLeftOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[3]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; @@ -425,9 +428,9 @@ public class TestLeftOuterHashJoinExec { @Test public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, "default.emp3", emp3.getMeta(), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, "default.emp3", emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, "default.phone3", phone3.getMeta(), + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, "default.phone3", phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags); @@ -439,7 +442,7 @@ public class TestLeftOuterHashJoinExec { Expr expr = analyzer.parse(QUERIES[4]); LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); ProjectionExec proj = (ProjectionExec) exec; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java index b3d1f33..36dd77e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java @@ -81,7 +81,7 @@ public class TestLeftOuterNLJoinExec { catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); //----------------- dep3 ------------------------------ // dep_id | dep_name | loc_id @@ -104,7 +104,8 @@ public class TestLeftOuterNLJoinExec { TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); Tuple tuple = new VTuple(dep3Schema.size()); for (int i = 0; i < 10; i++) { @@ -133,7 +134,8 @@ public class TestLeftOuterNLJoinExec { TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(job3Meta, job3Schema, job3Path); appender2.init(); Tuple tuple2 = new VTuple(job3Schema.size()); for (int i = 1; i < 4; i++) { @@ -172,7 +174,8 @@ public class TestLeftOuterNLJoinExec { TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); Tuple tuple3 = new VTuple(emp3Schema.size()); @@ -224,8 +227,8 @@ public class TestLeftOuterNLJoinExec { TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, - phone3Path); + Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(phone3Meta, phone3Schema, phone3Path); appender5.init(); appender5.flush(); @@ -254,9 +257,9 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException { - FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), + FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); @@ -269,7 +272,7 @@ public class TestLeftOuterNLJoinExec { LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec @@ -295,9 +298,9 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException { - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); @@ -311,7 +314,7 @@ public class TestLeftOuterNLJoinExec { LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec @@ -340,9 +343,9 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), + FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); @@ -355,7 +358,7 @@ public class TestLeftOuterNLJoinExec { LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec @@ -385,9 +388,9 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); @@ -400,7 +403,7 @@ public class TestLeftOuterNLJoinExec { LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec @@ -429,9 +432,9 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException { - FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), + FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE); - FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), + FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags); @@ -444,7 +447,7 @@ public class TestLeftOuterNLJoinExec { LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot(); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, plan); //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec @@ -470,7 +473,4 @@ public class TestLeftOuterNLJoinExec { exec.close(); assertEquals(0, count); } - - - } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java index cae5de5..10d4d33 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -79,7 +79,7 @@ public class TestMergeJoinExec { catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); conf = util.getConfiguration(); FileSystem fs = testDir.getFileSystem(conf); - sm = StorageManager.getStorageManager(conf, testDir); + sm = StorageManager.getFileStorageManager(conf, testDir); Schema employeeSchema = new Schema(); employeeSchema.addColumn("managerid", Type.INT4); @@ -89,8 +89,8 @@ public class TestMergeJoinExec { TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, - employeePath); + Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta, employeeSchema, employeePath); appender.init(); Tuple tuple = new VTuple(employeeSchema.size()); for (int i = 0; i < 10; i++) { @@ -118,7 +118,8 @@ public class TestMergeJoinExec { peopleSchema.addColumn("age", Type.INT4); TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path peoplePath = new Path(testDir, "people.csv"); - appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)) + .getAppender(peopleMeta, peopleSchema, peoplePath); appender.init(); tuple = new VTuple(peopleSchema.size()); for (int i = 1; i < 10; i += 2) { @@ -165,9 +166,9 @@ public class TestMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - FileFragment[] empFrags = sm.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), + FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()), Integer.MAX_VALUE); - FileFragment[] peopleFrags = sm.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), + FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()), Integer.MAX_VALUE); FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); @@ -176,7 +177,7 @@ public class TestMergeJoinExec { LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); ctx.setEnforcer(enforcer); - PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); PhysicalExec exec = phyPlanner.createPlan(ctx, root); ProjectionExec proj = (ProjectionExec) exec; assertTrue(proj.getChild() instanceof MergeJoinExec);
