http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java index 845c2d7..5fac0cf 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -18,7 +18,6 @@ package org.apache.tajo.storage.hbase; -import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,12 +43,12 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.TextDatum; import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.*; import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Bytes; @@ -60,6 +59,7 @@ import org.apache.tajo.util.TUtil; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.net.URI; import java.util.*; /** @@ -68,14 +68,44 @@ import java.util.*; public class HBaseTablespace extends Tablespace { private static final Log LOG = LogFactory.getLog(HBaseTablespace.class); + public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false); + + public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true); + + private Configuration hbaseConf; + + private final static SortedInsertRewriter REWRITE_RULE = new SortedInsertRewriter(); + private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); - public HBaseTablespace(String storeType) { - super(storeType); + public HBaseTablespace(String spaceName, URI uri) { + super(spaceName, uri); } @Override public void storageInit() throws IOException { + this.hbaseConf = HBaseConfiguration.create(conf); + String zkQuorum = extractQuorum(uri); + String [] splits = zkQuorum.split(":"); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, splits[0]); + hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, splits[1]); + } + + @Override + public void setConfig(String name, String value) { + } + + @Override + public void setConfigs(Map<String, String> configs) { + } + + public Configuration getHbaseConf() { + return hbaseConf; + } + + @Override + public long getTableVolume(URI uri) throws IOException { + return 0; } @Override @@ -93,13 +123,13 @@ public class HBaseTablespace extends Tablespace { @Override public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { - createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); + createTable(tableDesc.getUri(), tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); TableStats stats = new TableStats(); stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); tableDesc.setStats(stats); } - private void createTable(TableMeta tableMeta, Schema schema, + private void createTable(URI uri, TableMeta tableMeta, Schema schema, boolean isExternal, boolean ifNotExists) throws IOException { String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { @@ -113,7 +143,7 @@ public class HBaseTablespace extends Tablespace { throw new IOException("Columns property has more entry than Tajo table columns"); } - ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions()); int numRowKeys = 0; boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); for (int i = 0; i < isRowKeyMappings.length; i++) { @@ -138,8 +168,7 @@ public class HBaseTablespace extends Tablespace { } } - Configuration hConf = getHBaseConfiguration(conf, tableMeta); - HBaseAdmin hAdmin = new HBaseAdmin(hConf); + HBaseAdmin hAdmin = new HBaseAdmin(getHbaseConf()); try { if (isExternal) { @@ -210,7 +239,7 @@ public class HBaseTablespace extends Tablespace { return null; } - ColumnMapping columnMapping = new ColumnMapping(schema, meta); + ColumnMapping columnMapping = new ColumnMapping(schema, meta.getOptions()); boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns(); boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); @@ -291,45 +320,23 @@ public class HBaseTablespace extends Tablespace { } /** - * Creates Configuration instance and sets with hbase connection options. + * It extracts quorum addresses from a Hbase Tablespace URI. + * For example, consider an example URI 'hbase:zk://host1:2171,host2:2172,host3:2173/table1'. + * <code>extractQuorum</code> will extract only 'host1:2171,host2:2172,host3:2173'. * - * @param conf - * @param tableMeta - * @return - * @throws java.io.IOException + * @param uri Hbase Tablespace URI + * @return Quorum addresses */ - public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException { - Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf); - - String zkQuorum = hbaseConf.get(HConstants.ZOOKEEPER_QUORUM); - if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_QUORUM_KEY)) { - zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, ""); - hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - } - - if (zkQuorum == null || zkQuorum.trim().isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute."); - } + static String extractQuorum(URI uri) { + String uriStr = uri.toString(); + int start = uriStr.indexOf("/") + 2; + int pathIndex = uriStr.lastIndexOf("/"); - String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT); - if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) { - zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, ""); - hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort); - } - - if (zkPort == null || zkPort.trim().isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute."); - } - - for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) { - String key = eachOption.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - hbaseConf.set(key, eachOption.getValue()); - } + if (pathIndex < start) { + return uriStr.substring(start); + } else { + return uriStr.substring(start, pathIndex); } - return hbaseConf; } /** @@ -348,7 +355,7 @@ public class HBaseTablespace extends Tablespace { } TableName hTableName = TableName.valueOf(hbaseTableName); - ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions()); HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName); @@ -369,7 +376,7 @@ public class HBaseTablespace extends Tablespace { @Override public void purgeTable(TableDesc tableDesc) throws IOException { - HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta())); + HBaseAdmin hAdmin = new HBaseAdmin(hbaseConf); try { HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema()); @@ -381,6 +388,11 @@ public class HBaseTablespace extends Tablespace { } } + @Override + public URI getTableUri(String databaseName, String tableName) { + return URI.create(uri.toString() + "/" + tableName); + } + /** * Returns columns which are mapped to the rowkey of the hbase table. * @@ -389,7 +401,7 @@ public class HBaseTablespace extends Tablespace { * @throws java.io.IOException */ private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException { - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions()); boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes(); @@ -407,15 +419,14 @@ public class HBaseTablespace extends Tablespace { @Override public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions()); List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); - Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); HTable htable = null; HBaseAdmin hAdmin = null; try { - htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { @@ -424,8 +435,12 @@ public class HBaseTablespace extends Tablespace { throw new IOException("Expecting at least one region."); } List<Fragment> fragments = new ArrayList<Fragment>(1); - Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); + Fragment fragment = new HBaseFragment( + tableDesc.getUri(), + fragmentId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, + HConstants.EMPTY_BYTE_ARRAY, + regLoc.getHostname()); fragments.add(fragment); return fragments; } @@ -458,7 +473,7 @@ public class HBaseTablespace extends Tablespace { stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); } - hAdmin = new HBaseAdmin(hconf); + hAdmin = new HBaseAdmin(hbaseConf); Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); // region startkey -> HBaseFragment @@ -499,8 +514,12 @@ public class HBaseTablespace extends Tablespace { prevFragment.setStopRow(fragmentStop); } } else { - HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), - fragmentStart, fragmentStop, location.getHostname()); + HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(), + fragmentId, + htable.getName().getNameAsString(), + fragmentStart, + fragmentStop, + location.getHostname()); // get region size boolean foundLength = false; @@ -557,7 +576,7 @@ public class HBaseTablespace extends Tablespace { TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) throws IOException { if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { - return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); + return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir); } else { return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir); } @@ -566,17 +585,16 @@ public class HBaseTablespace extends Tablespace { @Override public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException { - Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); HTable htable = null; HBaseAdmin hAdmin = null; try { - htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { return new ArrayList<Fragment>(1); } - hAdmin = new HBaseAdmin(hconf); + hAdmin = new HBaseAdmin(hbaseConf); Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); @@ -599,8 +617,13 @@ public class HBaseTablespace extends Tablespace { serverLoadMap.put(location.getServerName(), serverLoad); } - HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(), - location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname()); + HBaseFragment fragment = new HBaseFragment( + tableDesc.getUri(), + tableDesc.getName(), + htable.getName().getNameAsString(), + location.getRegionInfo().getStartKey(), + location.getRegionInfo().getEndKey(), + location.getHostname()); // get region size boolean foundLength = false; @@ -642,7 +665,7 @@ public class HBaseTablespace extends Tablespace { } } - public HConnection getConnection(Configuration hbaseConf) throws IOException { + public HConnection getConnection() throws IOException { synchronized(connMap) { HConnectionKey key = new HConnectionKey(hbaseConf); HConnection conn = connMap.get(key); @@ -937,18 +960,17 @@ public class HBaseTablespace extends Tablespace { } @Override - public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { + public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { if (tableDesc == null) { throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); } - Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null); Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + Configuration hbaseConf = HBaseConfiguration.create(this.hbaseConf); hbaseConf.set("hbase.loadincremental.threads.max", "2"); JobContextImpl jobContext = new JobContextImpl(hbaseConf, @@ -993,8 +1015,7 @@ public class HBaseTablespace extends Tablespace { sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName()); } - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions()); HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName()); try { @@ -1062,59 +1083,45 @@ public class HBaseTablespace extends Tablespace { } } - public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { - if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { - List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>(); - rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); - return rules; - } else { - return null; + @Override + public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException { + if (REWRITE_RULE.isEligible(context, plan)) { + REWRITE_RULE.rewrite(context, plan); } } - private Column[] getIndexColumns(TableDesc tableDesc) throws IOException { - List<Column> indexColumns = new ArrayList<Column>(); - - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - - boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); - for (int i = 0; i < isRowKeys.length; i++) { - if (isRowKeys[i]) { - indexColumns.add(tableDesc.getSchema().getColumn(i)); - } - } - - return indexColumns.toArray(new Column[]{}); + @Override + public StorageProperty getProperty() { + return HBASE_STORAGE_PROPERTIES; } @Override - public StorageProperty getStorageProperty() { - StorageProperty storageProperty = new StorageProperty(); - storageProperty.setSortedInsert(true); - storageProperty.setSupportsInsertInto(true); - return storageProperty; + public FormatProperty getFormatProperty(String format) { + return HFILE_FORMAT_PROPERTIES; } - public void beforeInsertOrCATS(LogicalNode node) throws IOException { + public void prepareTable(LogicalNode node) throws IOException { if (node.getType() == NodeType.CREATE_TABLE) { CreateTableNode cNode = (CreateTableNode)node; if (!cNode.isExternal()) { TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); - createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists()); + createTable( + ((CreateTableNode) node).getUri(), tableMeta, cNode.getTableSchema(), + cNode.isExternal(), cNode.isIfNotExists()); } } } @Override - public void rollbackOutputCommit(LogicalNode node) throws IOException { + public void rollbackTable(LogicalNode node) throws IOException { if (node.getType() == NodeType.CREATE_TABLE) { CreateTableNode cNode = (CreateTableNode)node; if (cNode.isExternal()) { return; } - TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); - HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta)); + HBaseAdmin hAdmin = new HBaseAdmin(this.hbaseConf); + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); try { HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema()); LOG.info("Delete table cause query failed:" + new String(hTableDesc.getName())); @@ -1127,7 +1134,7 @@ public class HBaseTablespace extends Tablespace { } @Override - public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException { if (tableDesc != null) { Schema tableSchema = tableDesc.getSchema(); if (tableSchema.size() != outSchema.size()) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java new file mode 100644 index 0000000..ebf557e --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.OverridableConf; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.logical.SortNode.SortPurpose; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.util.KeyValueSet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This rewrite rule injects a sort operation to preserve the writing rows in + * an ascending order of HBase row keys, required by HFile. + */ +public class SortedInsertRewriter implements LogicalPlanRewriteRule { + + @Override + public String getName() { + return "SortedInsertRewriter"; + } + + @Override + public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) { + boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false")); + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + LogicalNode node = rootNode.getChild(); + return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT; + } + + public static Column[] getIndexColumns(Schema tableSchema, KeyValueSet tableProperty) throws IOException { + List<Column> indexColumns = new ArrayList<Column>(); + + ColumnMapping columnMapping = new ColumnMapping(tableSchema, tableProperty); + + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeys.length; i++) { + if (isRowKeys[i]) { + indexColumns.add(tableSchema.getColumn(i)); + } + } + + return indexColumns.toArray(new Column[]{}); + } + + @Override + public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException { + LogicalRootNode rootNode = plan.getRootBlock().getRoot(); + + StoreTableNode storeTable = rootNode.getChild(); + Schema tableSchema = storeTable.getTableSchema(); + + Column[] sortColumns; + try { + sortColumns = getIndexColumns(tableSchema, storeTable.getOptions()); + } catch (IOException e) { + throw new PlanningException(e); + } + + int[] sortColumnIndexes = new int[sortColumns.length]; + for (int i = 0; i < sortColumns.length; i++) { + sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); + } + + UnaryNode insertNode = rootNode.getChild(); + LogicalNode childNode = insertNode.getChild(); + + Schema sortSchema = childNode.getOutSchema(); + SortNode sortNode = plan.createNode(SortNode.class); + sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); + sortNode.setInSchema(sortSchema); + sortNode.setOutSchema(sortSchema); + + SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; + int index = 0; + + for (int i = 0; i < sortColumnIndexes.length; i++) { + Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); + if (sortColumn == null) { + throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); + } + sortSpecs[index++] = new SortSpec(sortColumn, true, true); + } + sortNode.setSortSpecs(sortSpecs); + + sortNode.setChild(insertNode.getChild()); + insertNode.setChild(sortNode); + plan.getRootBlock().registerNode(sortNode); + + return plan; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto index 668b116..33d45b3 100644 --- a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto +++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto @@ -25,11 +25,12 @@ option java_generate_equals_and_hash = true; import "CatalogProtos.proto"; message HBaseFragmentProto { - required string tableName = 1; - required string hbaseTableName = 2; - required bytes startRow = 3; - required bytes stopRow = 4; - required bool last = 5; - required int64 length = 6; - optional string regionLocation = 7; + required string uri = 1; + required string tableName = 2; + required string hbaseTableName = 3; + required bytes startRow = 4; + required bytes stopRow = 5; + required bool last = 6; + required int64 length = 7; + optional string regionLocation = 8; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java index dd52324..5b1e2bd 100644 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java @@ -43,7 +43,7 @@ public class TestColumnMapping { TableMeta tableMeta = new TableMeta("HBASE", keyValueSet); - ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions()); List<String> cfNames = columnMapping.getColumnFamilyNames(); assertEquals(2, cfNames.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java deleted file mode 100644 index b59fe7b..0000000 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.plan.expr.*; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.TableSpaceManager; -import org.apache.tajo.util.Pair; -import org.junit.Test; - -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -public class TestHBaseStorageManager { - @Test - public void testGetIndexPredications() throws Exception { - Column rowkeyColumn = new Column("rk", Type.TEXT); - // where rk >= '020' and rk <= '055' - ScanNode scanNode = new ScanNode(1); - EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020"))); - EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055"))); - EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); - scanNode.setQual(evalNodeA); - - HBaseTablespace storageManager = - (HBaseTablespace) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE"); - List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); - assertNotNull(indexEvals); - assertEquals(1, indexEvals.size()); - Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); - assertEquals("020", indexPredicateValue.getFirst().asChars()); - assertEquals("055", indexPredicateValue.getSecond().asChars()); - - // where (rk >= '020' and rk <= '055') or rk = '075' - EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075"))); - EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); - scanNode.setQual(evalNodeB); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); - assertEquals(2, indexEvals.size()); - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); - assertEquals("020", indexPredicateValue.getFirst().asChars()); - assertEquals("055", indexPredicateValue.getSecond().asChars()); - - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); - assertEquals("075", indexPredicateValue.getFirst().asChars()); - assertEquals("075", indexPredicateValue.getSecond().asChars()); - - // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') - EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); - EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); - EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); - EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); - scanNode.setQual(evalNodeD); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); - assertEquals(2, indexEvals.size()); - - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); - assertEquals("020", indexPredicateValue.getFirst().asChars()); - assertEquals("055", indexPredicateValue.getSecond().asChars()); - - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); - assertEquals("072", indexPredicateValue.getFirst().asChars()); - assertEquals("078", indexPredicateValue.getSecond().asChars()); - - // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073') - evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); - evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); - evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); - EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073"))); - evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6); - EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD); - scanNode.setQual(evalNodeE); - indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); - assertEquals(2, indexEvals.size()); - - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); - assertEquals("020", indexPredicateValue.getFirst().asChars()); - assertEquals("055", indexPredicateValue.getSecond().asChars()); - - indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); - assertEquals("073", indexPredicateValue.getFirst().asChars()); - assertEquals("078", indexPredicateValue.getSecond().asChars()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java new file mode 100644 index 0000000..f7cbb5a --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.TableSpaceManager; +import org.apache.tajo.util.Pair; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestHBaseTableSpace { + @BeforeClass + public static void setUp() throws IOException { + String tableSpaceUri = "hbase:zk://host1:2171"; + HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri)); + hBaseTablespace.init(new TajoConf()); + TableSpaceManager.addTableSpaceForTest(hBaseTablespace); + } + + @Test + public void testExtractQuorum() { + assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171"))); + assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171/table1"))); + assertEquals("host1:2171,host2:2172", + HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171,host2:2172/table1"))); + } + + @Test + public void testTablespaceHandler() throws Exception { + assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace); + assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get()) + instanceof HBaseTablespace); + } + + @Test + public void testGetIndexPredications() throws Exception { + Column rowkeyColumn = new Column("rk", Type.TEXT); + // where rk >= '020' and rk <= '055' + ScanNode scanNode = new ScanNode(1); + EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020"))); + EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055"))); + EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); + scanNode.setQual(evalNodeA); + + HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get(); + List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertNotNull(indexEvals); + assertEquals(1, indexEvals.size()); + Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or rk = '075' + EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075"))); + EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); + scanNode.setQual(evalNodeB); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("075", indexPredicateValue.getFirst().asChars()); + assertEquals("075", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') + EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); + EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); + EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("072", indexPredicateValue.getFirst().asChars()); + assertEquals("078", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073') + evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); + evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); + evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073"))); + evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6); + EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD); + scanNode.setQual(evalNodeE); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("073", indexPredicateValue.getFirst().asChars()); + assertEquals("078", indexPredicateValue.getSecond().asChars()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 3a59ec9..9b98b0d 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -74,6 +74,8 @@ <configuration> <excludes> <exclude>src/test/resources/dataset/**</exclude> + <exclude>src/main/resources/*.json</exclude> + <exclude>src/test/resources/*.json</exclude> </excludes> </configuration> </plugin> @@ -350,10 +352,6 @@ <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> </dependency> - <dependency> - <groupId>net.minidev</groupId> - <artifactId>json-smart</artifactId> - </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index 5c8242f..081fa3f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage; +import com.google.common.base.Optional; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -51,20 +52,23 @@ public abstract class FileAppender implements Appender { this.workDir = workDir; this.taskAttemptId = taskAttemptId; - try { - if (taskAttemptId != null) { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); - } - this.path = ((FileTablespace) TableSpaceManager.getFileStorageManager((TajoConf) conf)) - .getAppenderFilePath(taskAttemptId, workDir); - } else { - this.path = workDir; + if (taskAttemptId != null) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e); + + Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri()); + + if (!spaceResult.isPresent()) { + throw new IllegalStateException("No TableSpace for " + workDir.toUri()); + } + + FileTablespace space = spaceResult.get(); + this.path = space.getAppenderFilePath(taskAttemptId, workDir); + + } else { + this.path = workDir; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java index 6ab8574..2ce1f09 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java @@ -35,18 +35,28 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.Bytes; import org.apache.tajo.util.TUtil; import java.io.IOException; +import java.net.URI; import java.text.NumberFormat; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT; + public class FileTablespace extends Tablespace { + + public static final PathFilter hiddenFileFilter = new PathFilter() { + public boolean accept(Path p) { + String name = p.getName(); + return !name.startsWith("_") && !name.startsWith("."); + } + }; private final Log LOG = LogFactory.getLog(FileTablespace.class); static final String OUTPUT_FILE_PREFIX="part-"; @@ -83,27 +93,54 @@ public class FileTablespace extends Tablespace { }; protected FileSystem fs; - protected Path tableBaseDir; + protected Path basePath; protected boolean blocksMetadataEnabled; private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); - public FileTablespace(String storeType) { - super(storeType); + public FileTablespace(String spaceName, URI uri) { + super(spaceName, uri); } @Override protected void storageInit() throws IOException { - this.tableBaseDir = TajoConf.getWarehouseDir(conf); - this.fs = tableBaseDir.getFileSystem(conf); - this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - if (!this.blocksMetadataEnabled) + this.basePath = new Path(uri); + this.fs = basePath.getFileSystem(conf); + this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString()); + + this.blocksMetadataEnabled = + conf.getBoolean(DFS_HDFS_BLOCKS_METADATA_ENABLED, DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + + if (!this.blocksMetadataEnabled) { LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); + } + } + + @Override + public void setConfig(String name, String value) { + conf.set(name, value); + } + + @Override + public void setConfigs(Map<String, String> configs) { + for (Map.Entry<String, String> c : configs.entrySet()) { + conf.set(c.getKey(), c.getValue()); + } + } + + @Override + public long getTableVolume(URI uri) throws IOException { + Path path = new Path(uri); + ContentSummary summary = fs.getContentSummary(path); + return summary.getLength(); + } + + @Override + public URI getRootUri() { + return fs.getUri(); } public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) throws IOException { - FileSystem fs = path.getFileSystem(conf); FileStatus status = fs.getFileStatus(path); return getFileScanner(meta, schema, path, status); } @@ -128,8 +165,9 @@ public class FileTablespace extends Tablespace { return fileSystem.exists(path); } - public Path getTablePath(String tableName) { - return new Path(tableBaseDir, tableName); + @Override + public URI getTableUri(String databaseName, String tableName) { + return StorageUtil.concatPath(basePath, databaseName, tableName).toUri(); } private String partitionPath = ""; @@ -154,12 +192,12 @@ public class FileTablespace extends Tablespace { } public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); + Path tablePath = new Path(basePath, tableName); return split(tableName, tablePath, fs.getDefaultBlockSize()); } public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); + Path tablePath = new Path(basePath, tableName); return split(tableName, tablePath, fragmentSize); } @@ -314,7 +352,6 @@ public class FileTablespace extends Tablespace { for (int i = 0; i < dirs.length; ++i) { Path p = dirs[i]; - FileSystem fs = p.getFileSystem(conf); FileStatus[] matches = fs.globStatus(p, inputFilter); if (matches == null) { errors.add(new IOException("Input path does not exist: " + p)); @@ -323,8 +360,7 @@ public class FileTablespace extends Tablespace { } else { for (FileStatus globStat : matches) { if (globStat.isDirectory()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), - inputFilter)) { + for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) { result.add(stat); } } else { @@ -492,8 +528,6 @@ public class FileTablespace extends Tablespace { List<BlockLocation> blockLocations = Lists.newArrayList(); for (Path p : inputs) { - FileSystem fs = p.getFileSystem(conf); - ArrayList<FileStatus> files = Lists.newArrayList(); if (fs.isFile(p)) { files.addAll(Lists.newArrayList(fs.getFileStatus(p))); @@ -586,7 +620,7 @@ public class FileTablespace extends Tablespace { return; } - DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); + DistributedFileSystem fs = (DistributedFileSystem) this.fs; int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); int blockLocationIdx = 0; @@ -629,7 +663,7 @@ public class FileTablespace extends Tablespace { @Override public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { - return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath())); + return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getUri())); } @Override @@ -640,13 +674,13 @@ public class FileTablespace extends Tablespace { String simpleTableName = splitted[1]; // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) - Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName); - tableDesc.setPath(tablePath.toUri()); + Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName); + tableDesc.setUri(tablePath.toUri()); } else { - Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given."); + Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given."); } - Path path = new Path(tableDesc.getPath()); + Path path = new Path(tableDesc.getUri()); FileSystem fs = path.getFileSystem(conf); TableStats stats = new TableStats(); @@ -679,7 +713,7 @@ public class FileTablespace extends Tablespace { @Override public void purgeTable(TableDesc tableDesc) throws IOException { try { - Path path = new Path(tableDesc.getPath()); + Path path = new Path(tableDesc.getUri()); FileSystem fs = path.getFileSystem(conf); LOG.info("Delete table data dir: " + path); fs.delete(path, true); @@ -692,7 +726,7 @@ public class FileTablespace extends Tablespace { public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException { // Listing table data file which is not empty. // If the table is a partitioned table, return file list which has same partition key. - Path tablePath = new Path(tableDesc.getPath()); + Path tablePath = new Path(tableDesc.getUri()); FileSystem fs = tablePath.getFileSystem(conf); //In the case of partitioned table, we should return same partition key data files. @@ -704,7 +738,7 @@ public class FileTablespace extends Tablespace { List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); if (fs.exists(tablePath)) { if (!partitionPath.isEmpty()) { - Path partPath = new Path(tableDesc.getPath() + partitionPath); + Path partPath = new Path(tableDesc.getUri() + partitionPath); if (fs.exists(partPath)) { getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments, new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth); @@ -768,7 +802,7 @@ public class FileTablespace extends Tablespace { // Intermediate directory if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, Tablespace.hiddenFileFilter); + FileStatus[] files = fs.listStatus(path, hiddenFileFilter); if (files != null && files.length > 0) { @@ -817,44 +851,43 @@ public class FileTablespace extends Tablespace { } } - @Override - public StorageProperty getStorageProperty() { - StorageProperty storageProperty = new StorageProperty(); - storageProperty.setSortedInsert(false); - if (storeType.equalsIgnoreCase("RAW")) { - storageProperty.setSupportsInsertInto(false); - } else { - storageProperty.setSupportsInsertInto(true); - } + private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true); + private static final FormatProperty GeneralFileProperties = new FormatProperty(false); + private static final FormatProperty HFileProperties = new FormatProperty(true); - return storageProperty; + @Override + public StorageProperty getProperty() { + return FileStorageProperties; } @Override - public void close() { + public FormatProperty getFormatProperty(String format) { + if (format.equalsIgnoreCase("hbase")) { + return HFileProperties; + } else { + return GeneralFileProperties; + } } @Override - public void beforeInsertOrCATS(LogicalNode node) throws IOException { + public void close() { } @Override - public void rollbackOutputCommit(LogicalNode node) throws IOException { + public void prepareTable(LogicalNode node) throws IOException { } @Override - public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + public void rollbackTable(LogicalNode node) throws IOException { } @Override - public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) - throws IOException { - return null; + public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException { } @Override - public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, - Schema schema, TableDesc tableDesc) throws IOException { + public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan, + Schema schema, TableDesc tableDesc) throws IOException { return commitOutputData(queryContext, true); } @@ -879,8 +912,8 @@ public class FileTablespace extends Tablespace { Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); Path finalOutputDir; - if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { - finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); + if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) { + finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI)); try { FileSystem fs = stagingResultDir.getFileSystem(conf); @@ -949,7 +982,7 @@ public class FileTablespace extends Tablespace { if (fs.exists(finalOutputDir)) { fs.mkdirs(oldTableDir); - for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) { + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { fs.rename(status.getPath(), oldTableDir); } @@ -971,7 +1004,7 @@ public class FileTablespace extends Tablespace { if (movedToOldTable && !committed) { // if commit is failed, recover the old data - for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) { + for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) { fs.delete(status.getPath(), true); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java index 66c7f13..49485f5 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -82,8 +82,9 @@ public class HashShuffleAppenderManager { if (!fs.exists(dataFile.getParent())) { fs.mkdirs(dataFile.getParent()); } - FileAppender appender = (FileAppender)((FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf)) - .getAppender(meta, outSchema, dataFile); + + FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get(); + FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 75ad0d5..ab63d55 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -122,7 +122,7 @@ public class TestCompressionStorages { String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); - Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath); + Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -154,7 +154,7 @@ public class TestCompressionStorages { FileFragment[] tablets = new FileFragment[1]; tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen); - Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema); + Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema); if (storeType.equalsIgnoreCase("CSV")) { if (SplittableCompressionCodec.class.isAssignableFrom(codec)) { http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java index 9726ecc..2d919cd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java @@ -103,7 +103,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); Tuple tuple; @@ -125,7 +125,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance1.json"); - Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); assertNotNull(scanner.next()); @@ -147,7 +147,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0"); FileFragment fragment = getFileFragment("testErrorTolerance2.json"); - Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); try { @@ -166,7 +166,7 @@ public class TestDelimitedTextFile { TableMeta meta = CatalogUtil.newTableMeta("JSON"); meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1"); FileFragment fragment = getFileFragment("testErrorTolerance3.json"); - Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment); + Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment); scanner.init(); try { http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java deleted file mode 100644 index a6d6077..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.*; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; -import java.util.UUID; - -import static org.junit.Assert.*; - -public class TestFileStorageManager { - private TajoConf conf; - private static String TEST_PATH = "target/test-data/TestFileStorageManager"; - private Path testDir; - private FileSystem fs; - - @Before - public void setUp() throws Exception { - conf = new TajoConf(); - testDir = CommonTestingUtil.getTestDir(TEST_PATH); - fs = testDir.getFileSystem(conf); - } - - @After - public void tearDown() throws Exception { - } - - @Test - public final void testGetScannerAndAppender() throws IOException { - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - - TableMeta meta = CatalogUtil.newTableMeta("CSV"); - - VTuple[] tuples = new VTuple[4]; - for(int i=0; i < tuples.length; i++) { - tuples[i] = new VTuple(new Datum[] { - DatumFactory.createInt4(i), - DatumFactory.createInt4(i + 32), - DatumFactory.createText("name" + i)}); - } - - Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); - fs.mkdirs(path.getParent()); - FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); - assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri()); - - Appender appender = fileStorageManager.getAppender(meta, schema, path); - appender.init(); - for(Tuple t : tuples) { - appender.addTuple(t); - } - appender.close(); - - Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path); - scanner.init(); - int i=0; - while(scanner.next() != null) { - i++; - } - assertEquals(4,i); - } - - @Test - public void testGetSplit() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(1).build(); - cluster.waitClusterUp(); - TajoConf tajoConf = new TajoConf(conf); - tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); - - int testCount = 10; - Path tablePath = new Path("/testGetSplit"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test partitions - List<Path> partitions = Lists.newArrayList(); - for (int i =0; i < testCount; i++){ - Path tmpFile = new Path(tablePath, String.valueOf(i)); - DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); - partitions.add(tmpFile); - } - - assertTrue(fs.exists(tablePath)); - FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf); - assertEquals(fs.getUri(), sm.getFileSystem().getUri()); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age",Type.INT4); - schema.addColumn("name",Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); - - List<Fragment> splits = Lists.newArrayList(); - // Get FileFragments in partition batch - splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); - assertEquals(testCount, splits.size()); - // -1 is unknown volumeId - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - - splits.clear(); - splits.addAll(sm.getSplits("data", meta, schema, - partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); - assertEquals(testCount / 2, splits.size()); - assertEquals(1, splits.get(0).getHosts().length); - assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(true); - } - } - - @Test - public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { - final Configuration conf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(2).build(); - cluster.waitClusterUp(); - - TajoConf tajoConf = new TajoConf(conf); - tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); - - int testCount = 10; - Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); - try { - DistributedFileSystem fs = cluster.getFileSystem(); - - // Create test files - for (int i = 0; i < testCount; i++) { - Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); - DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); - } - assertTrue(fs.exists(tablePath)); - FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf); - assertEquals(fs.getUri(), sm.getFileSystem().getUri()); - - Schema schema = new Schema(); - schema.addColumn("id", Type.INT4); - schema.addColumn("age", Type.INT4); - schema.addColumn("name", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta("CSV"); - - List<Fragment> splits = Lists.newArrayList(); - splits.addAll(sm.getSplits("data", meta, schema, tablePath)); - - assertEquals(testCount, splits.size()); - assertEquals(2, splits.get(0).getHosts().length); - assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length); - assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); - fs.close(); - } finally { - cluster.shutdown(true); - } - } - - @Test - public void testStoreType() throws Exception { - final Configuration hdfsConf = new HdfsConfiguration(); - String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); - hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); - hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); - hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); - - final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) - .numDataNodes(2).build(); - cluster.waitClusterUp(); - - TajoConf tajoConf = new TajoConf(hdfsConf); - tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); - - try { - /* Local FileSystem */ - FileTablespace sm = (FileTablespace) TableSpaceManager.getStorageManager(conf, "CSV"); - assertEquals(fs.getUri(), sm.getFileSystem().getUri()); - - /* Distributed FileSystem */ - sm = (FileTablespace) TableSpaceManager.getStorageManager(tajoConf, "CSV"); - assertNotEquals(fs.getUri(), sm.getFileSystem().getUri()); - assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri()); - } finally { - cluster.shutdown(true); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java index a6c238b..9237e07 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java @@ -57,7 +57,7 @@ public class TestFileSystems { public TestFileSystems(FileSystem fs) throws IOException { this.fs = fs; this.conf = new TajoConf(fs.getConf()); - sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf); + sm = TableSpaceManager.getLocalFs(); testDir = getTestDir(this.fs, TEST_PATH); } http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java new file mode 100644 index 0000000..ec3e143 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.*; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.*; + +public class TestFileTablespace { + private TajoConf conf; + private static String TEST_PATH = "target/test-data/TestFileTablespace"; + private Path testDir; + private FileSystem localFs; + + @Before + public void setUp() throws Exception { + conf = new TajoConf(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + localFs = testDir.getFileSystem(conf); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public final void testGetScannerAndAppender() throws IOException { + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + + TableMeta meta = CatalogUtil.newTableMeta("CSV"); + + VTuple[] tuples = new VTuple[4]; + for(int i=0; i < tuples.length; i++) { + tuples[i] = new VTuple(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i + 32), + DatumFactory.createText("name" + i)}); + } + + Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); + localFs.mkdirs(path.getParent()); + FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs(); + assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri()); + + Appender appender = fileStorageManager.getAppender(meta, schema, path); + appender.init(); + for(Tuple t : tuples) { + appender.addTuple(t); + } + appender.close(); + + Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path); + scanner.init(); + int i=0; + while(scanner.next() != null) { + i++; + } + assertEquals(4,i); + } + + @Test + public void testGetSplit() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build(); + cluster.waitClusterUp(); + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); + + int testCount = 10; + Path tablePath = new Path("/testGetSplit"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test partitions + List<Path> partitions = Lists.newArrayList(); + for (int i =0; i < testCount; i++){ + Path tmpFile = new Path(tablePath, String.valueOf(i)); + DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl); + partitions.add(tmpFile); + } + + assertTrue(fs.exists(tablePath)); + FileTablespace space = new FileTablespace("testGetSplit", fs.getUri()); + space.init(new TajoConf(conf)); + assertEquals(fs.getUri(), space.getUri()); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age",Type.INT4); + schema.addColumn("name",Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta("CSV"); + + List<Fragment> splits = Lists.newArrayList(); + // Get FileFragments in partition batch + splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()]))); + assertEquals(testCount, splits.size()); + // -1 is unknown volumeId + assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + + splits.clear(); + splits.addAll(space.getSplits("data", meta, schema, + partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2]))); + assertEquals(testCount / 2, splits.size()); + assertEquals(1, splits.get(0).getHosts().length); + assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(true); + } + } + + @Test + public void testGetSplitWithBlockStorageLocationsBatching() throws Exception { + final Configuration conf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + cluster.waitClusterUp(); + + TajoConf tajoConf = new TajoConf(conf); + tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo"); + + int testCount = 10; + Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching"); + try { + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create test files + for (int i = 0; i < testCount; i++) { + Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat"); + DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl); + } + assertTrue(fs.exists(tablePath)); + + FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri()); + sm.init(new TajoConf(conf)); + + assertEquals(fs.getUri(), sm.getUri()); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT4); + schema.addColumn("name", Type.TEXT); + TableMeta meta = CatalogUtil.newTableMeta("CSV"); + + List<Fragment> splits = Lists.newArrayList(); + splits.addAll(sm.getSplits("data", meta, schema, tablePath)); + + assertEquals(testCount, splits.size()); + assertEquals(2, splits.get(0).getHosts().length); + assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length); + assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]); + fs.close(); + } finally { + cluster.shutdown(true); + } + } + + @Test + public void testGetFileTablespace() throws Exception { + final Configuration hdfsConf = new HdfsConfiguration(); + String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString(); + hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath); + hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true); + + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build(); + cluster.waitClusterUp(); + URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo"); + + Optional<Tablespace> existingTs = Optional.absent(); + try { + /* Local FileSystem */ + FileTablespace space = TableSpaceManager.getLocalFs(); + assertEquals(localFs.getUri(), space.getFileSystem().getUri()); + + FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri); + distTablespace.init(conf); + existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace); + + /* Distributed FileSystem */ + space = (FileTablespace) TableSpaceManager.get(uri).get(); + assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri()); + + space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get(); + assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri()); + + } finally { + + if (existingTs.isPresent()) { + TableSpaceManager.addTableSpaceForTest(existingTs.get()); + } + + cluster.shutdown(true); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java index 266f906..c13ce16 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -65,7 +65,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "line.data"); - FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -118,7 +118,7 @@ public class TestLineReader { meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName()); Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName()); - FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -176,7 +176,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testLineDelimitedReader"); - FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); @@ -279,7 +279,7 @@ public class TestLineReader { TableMeta meta = CatalogUtil.newTableMeta("TEXT"); Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data"); - FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender( + FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender( null, null, meta, schema, tablePath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 03a601d..79928ff 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -94,7 +94,7 @@ public class TestMergeScanner { conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro"); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = TableSpaceManager.getFileStorageManager(conf); + sm = TableSpaceManager.getLocalFs(); } @Test @@ -114,7 +114,7 @@ public class TestMergeScanner { } Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path); + Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path); appender1.enableStats(); appender1.init(); int tupleNum = 10000; @@ -136,7 +136,7 @@ public class TestMergeScanner { } Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path); + Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path); appender2.enableStats(); appender2.init();
