http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java new file mode 100644 index 0000000..670f87e --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java @@ -0,0 +1,1147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.CreateTableNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.*; + +/** + * Tablespace for HBase table. + */ +public class HBaseTablespace extends Tablespace { + private static final Log LOG = LogFactory.getLog(HBaseTablespace.class); + + private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); + + public HBaseTablespace(String storeType) { + super(storeType); + } + + @Override + public void storageInit() throws IOException { + } + + @Override + public void close() { + synchronized (connMap) { + for (HConnection eachConn: connMap.values()) { + try { + eachConn.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); + TableStats stats = new TableStats(); + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + tableDesc.setStats(stats); + } + + private void createTable(TableMeta tableMeta, Schema schema, + boolean isExternal, boolean ifNotExists) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) { + throw new IOException("Columns property has more entry than Tajo table columns"); + } + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + int numRowKeys = 0; + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + numRowKeys++; + } + } + if (numRowKeys > 1) { + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Key field type should be TEXT type."); + } + } + } + + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column key field('<cfname>:key:') type should be TEXT type."); + } + if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type."); + } + } + + Configuration hConf = getHBaseConfiguration(conf, tableMeta); + HBaseAdmin hAdmin = new HBaseAdmin(hConf); + + try { + if (isExternal) { + // If tajo table is external table, only check validation. + if (mappedColumns == null || mappedColumns.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + if (!hAdmin.tableExists(hTableName)) { + throw new IOException("HBase table [" + hbaseTableName + "] not exists. " + + "External table should be a existed table."); + } + HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName); + Set<String> tableColumnFamilies = new HashSet<String>(); + for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) { + tableColumnFamilies.add(eachColumn.getNameAsString()); + } + + Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames(); + if (mappingColumnFamilies.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + + for (String eachMappingColumnFamily : mappingColumnFamilies) { + if (!tableColumnFamilies.contains(eachMappingColumnFamily)) { + throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName); + } + } + } else { + if (hAdmin.tableExists(hbaseTableName)) { + if (ifNotExists) { + return; + } else { + throw new IOException("HBase table [" + hbaseTableName + "] already exists."); + } + } + // Creating hbase table + HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema); + + byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta); + if (splitKeys == null) { + hAdmin.createTable(hTableDescriptor); + } else { + hAdmin.createTable(hTableDescriptor, splitKeys); + } + } + } finally { + hAdmin.close(); + } + } + + /** + * Returns initial region split keys. + * + * @param conf + * @param schema + * @param meta + * @return + * @throws java.io.IOException + */ + private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException { + String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, ""); + String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, ""); + + if ((splitRowKeys == null || splitRowKeys.isEmpty()) && + (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) { + return null; + } + + ColumnMapping columnMapping = new ColumnMapping(schema, meta); + boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns(); + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + + boolean rowkeyBinary = false; + int numRowKeys = 0; + Column rowKeyColumn = null; + for (int i = 0; i < isBinaryColumns.length; i++) { + if (isBinaryColumns[i] && isRowKeys[i]) { + rowkeyBinary = true; + } + if (isRowKeys[i]) { + numRowKeys++; + rowKeyColumn = schema.getColumn(i); + } + } + + if (rowkeyBinary && numRowKeys > 1) { + throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " + + "Multiple region for creation is not support."); + } + + if (splitRowKeys != null && !splitRowKeys.isEmpty()) { + String[] splitKeyTokens = splitRowKeys.split(","); + byte[][] splitKeys = new byte[splitKeyTokens.length][]; + for (int i = 0; i < splitKeyTokens.length; i++) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } else { + splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } + } + return splitKeys; + } + + if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) { + // If there is many split keys, Tajo allows to define in the file. + Path path = new Path(splitRowKeysFile); + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists."); + } + + SortedSet<String> splitKeySet = new TreeSet<String>(); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(fs.open(path))); + String line = null; + while ( (line = reader.readLine()) != null ) { + if (line.isEmpty()) { + continue; + } + splitKeySet.add(line); + } + } finally { + if (reader != null) { + reader.close(); + } + } + + if (splitKeySet.isEmpty()) { + return null; + } + + byte[][] splitKeys = new byte[splitKeySet.size()][]; + int index = 0; + for (String eachKey: splitKeySet) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } else { + splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } + } + + return splitKeys; + } + + return null; + } + + /** + * Creates Configuration instance and sets with hbase connection options. + * + * @param conf + * @param tableMeta + * @return + * @throws java.io.IOException + */ + public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException { + Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf); + + String zkQuorum = hbaseConf.get(HConstants.ZOOKEEPER_QUORUM); + if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_QUORUM_KEY)) { + zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, ""); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); + } + + if (zkQuorum == null || zkQuorum.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute."); + } + + String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT); + if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) { + zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, ""); + hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort); + } + + if (zkPort == null || zkPort.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute."); + } + + for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) { + String key = eachOption.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + hbaseConf.set(key, eachOption.getValue()); + } + } + return hbaseConf; + } + + /** + * Creates HTableDescription using table meta data. + * + * @param tableMeta + * @param schema + * @return + * @throws java.io.IOException + */ + public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + + HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName); + + Collection<String> columnFamilies = columnMapping.getColumnFamilyNames(); + //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column. + if (columnFamilies.isEmpty()) { + for (Column eachColumn: schema.getRootColumns()) { + columnFamilies.add(eachColumn.getSimpleName()); + } + } + + for (String eachColumnFamily: columnFamilies) { + hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily)); + } + + return hTableDescriptor; + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta())); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema()); + LOG.info("Deleting hbase table: " + new String(hTableDesc.getName())); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + + /** + * Returns columns which are mapped to the rowkey of the hbase table. + * + * @param tableDesc + * @return + * @throws java.io.IOException + */ + private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes(); + + Column indexColumn = null; + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + if (columnMapping.getNumRowKeys() == 1 || + rowKeyIndexes[i] == 0) { + indexColumn = tableDesc.getSchema().getColumn(i); + } + } + } + return new Column[]{indexColumn}; + } + + @Override + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List<Fragment> fragments = new ArrayList<Fragment>(1); + Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); + fragments.add(fragment); + return fragments; + } + + List<byte[]> startRows; + List<byte[]> stopRows; + + if (indexPredications != null && !indexPredications.isEmpty()) { + // indexPredications is Disjunctive set + startRows = new ArrayList<byte[]>(); + stopRows = new ArrayList<byte[]>(); + for (IndexPredication indexPredication: indexPredications) { + byte[] startRow; + byte[] stopRow; + if (indexPredication.getStartValue() != null) { + startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); + } else { + startRow = HConstants.EMPTY_START_ROW; + } + if (indexPredication.getStopValue() != null) { + stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); + } else { + stopRow = HConstants.EMPTY_END_ROW; + } + startRows.add(startRow); + stopRows.add(stopRow); + } + } else { + startRows = TUtil.newList(HConstants.EMPTY_START_ROW); + stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); + } + + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + // region startkey -> HBaseFragment + Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>(); + for (int i = 0; i < keys.getFirst().length; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + byte[] regionStartKey = keys.getFirst()[i]; + byte[] regionStopKey = keys.getSecond()[i]; + + int startRowsSize = startRows.size(); + for (int j = 0; j < startRowsSize; j++) { + byte[] startRow = startRows.get(j); + byte[] stopRow = stopRows.get(j); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) + && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? + regionStartKey : startRow; + + byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && + regionStopKey.length > 0 ? regionStopKey : stopRow; + + String regionName = location.getRegionInfo().getRegionNameAsString(); + + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + if (fragmentMap.containsKey(regionStartKey)) { + HBaseFragment prevFragment = fragmentMap.get(regionStartKey); + if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + prevFragment.setStartRow(fragmentStart); + } + if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + prevFragment.setStopRow(fragmentStop); + } + } else { + HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + fragmentStart, fragmentStop, location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + fragment.setLength(storeFileSize); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragmentMap.put(regionStartKey, fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + } + } + } + + List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values()); + Collections.sort(fragments); + if (!fragments.isEmpty()) { + fragments.get(fragments.size() - 1).setLast(true); + } + return (ArrayList<Fragment>) (ArrayList) fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + private byte[] serialize(ColumnMapping columnMapping, + IndexPredication indexPredication, Datum datum) throws IOException { + if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { + return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } else { + return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } + } + + @Override + public Appender getAppender(OverridableConf queryContext, + TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); + } else { + return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir); + } + } + + @Override + public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException { + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + return new ArrayList<Fragment>(1); + } + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); + + int start = currentPage * numFragments; + if (start >= keys.getFirst().length) { + return new ArrayList<Fragment>(1); + } + int end = (currentPage + 1) * numFragments; + if (end > keys.getFirst().length) { + end = keys.getFirst().length; + } + for (int i = start; i < end; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + String regionName = location.getRegionInfo().getRegionNameAsString(); + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(), + location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + if (storeLength == 0) { + // If store size is smaller than 1 MB, storeLength is zero + storeLength = 1 * 1024 * 1024; //default 1MB + } + fragment.setLength(storeLength); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragments.add(fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + + if (!fragments.isEmpty()) { + ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); + } + return fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + public HConnection getConnection(Configuration hbaseConf) throws IOException { + synchronized(connMap) { + HConnectionKey key = new HConnectionKey(hbaseConf); + HConnection conn = connMap.get(key); + if (conn == null) { + conn = HConnectionManager.createConnection(hbaseConf); + connMap.put(key, conn); + } + + return conn; + } + } + + static class HConnectionKey { + final static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID, + HConstants.RPC_CODEC_CONF_KEY }; + + private Map<String, String> properties; + private String username; + + HConnectionKey(Configuration conf) { + Map<String, String> m = new HashMap<String, String>(); + if (conf != null) { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + m.put(property, value); + } + } + } + this.properties = Collections.unmodifiableMap(m); + + try { + UserProvider provider = UserProvider.instantiate(conf); + User currentUser = provider.getCurrent(); + if (currentUser != null) { + username = currentUser.getName(); + } + } catch (IOException ioe) { + LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + if (username != null) { + result = username.hashCode(); + } + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.username != null && !this.username.equals(that.username)) { + return false; + } else if (this.username == null && that.username != null) { + return false; + } + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + // noinspection StringEquality + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } + + @Override + public String toString() { + return "HConnectionKey{" + + "properties=" + properties + + ", username='" + username + '\'' + + '}'; + } + } + + public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping, + TableDesc tableDesc, ScanNode scanNode) throws IOException { + List<IndexPredication> indexPredications = new ArrayList<IndexPredication>(); + Column[] indexableColumns = getIndexableColumns(tableDesc); + if (indexableColumns != null && indexableColumns.length == 1) { + // Currently supports only single index column. + List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns); + for (Set<EvalNode> eachEvalSet: indexablePredicateList) { + Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); + if (indexPredicationValues != null) { + IndexPredication indexPredication = new IndexPredication(); + indexPredication.setColumn(indexableColumns[0]); + indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName())); + indexPredication.setStartValue(indexPredicationValues.getFirst()); + indexPredication.setStopValue(indexPredicationValues.getSecond()); + + indexPredications.add(indexPredication); + } + } + } + return indexPredications; + } + + public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException { + List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); + + // if a query statement has a search condition, try to find indexable predicates + if (indexableColumns != null && scanNode.getQual() != null) { + EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual()); + + // add qualifier to schema for qual + for (Column column : indexableColumns) { + for (EvalNode disjunctiveExpr : disjunctiveForms) { + EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr); + Set<EvalNode> indexablePredicateSet = Sets.newHashSet(); + for (EvalNode conjunctiveExpr : conjunctiveForms) { + if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) { + indexablePredicateSet.add(conjunctiveExpr); + } + } + if (!indexablePredicateSet.isEmpty()) { + indexablePredicateList.add(indexablePredicateSet); + } + } + } + } + + return indexablePredicateList; + } + + private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { + if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) { + Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode); + // if it contains only single variable matched to a target column + return variables.size() == 1 && variables.contains(targetColumn); + } else { + return false; + } + } + + /** + * + * @param evalNode The expression to be checked + * @return true if an conjunctive expression, consisting of indexable expressions + */ + private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) { + if (evalNode.getType() == EvalType.AND) { + BinaryEval orEval = (BinaryEval) evalNode; + boolean indexable = + checkIfIndexablePredicate(orEval.getLeftExpr()) && + checkIfIndexablePredicate(orEval.getRightExpr()); + + boolean sameVariable = + EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr()) + .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr())); + + return indexable && sameVariable; + } else { + return false; + } + } + + /** + * Check if an expression consists of one variable and one constant and + * the expression is a comparison operator. + * + * @param evalNode The expression to be checked + * @return true if an expression consists of one variable and one constant + * and the expression is a comparison operator. Other, false. + */ + private boolean checkIfIndexablePredicate(EvalNode evalNode) { + return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode); + } + + public static boolean isIndexableOperator(EvalNode expr) { + return expr.getType() == EvalType.EQUAL || + expr.getType() == EvalType.LEQ || + expr.getType() == EvalType.LTH || + expr.getType() == EvalType.GEQ || + expr.getType() == EvalType.GTH || + expr.getType() == EvalType.BETWEEN; + } + + public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping, + Set<EvalNode> evalNodes) { + Datum startDatum = null; + Datum endDatum = null; + for (EvalNode evalNode: evalNodes) { + if (evalNode instanceof BinaryEval) { + BinaryEval binaryEval = (BinaryEval) evalNode; + EvalNode left = binaryEval.getLeftExpr(); + EvalNode right = binaryEval.getRightExpr(); + + Datum constValue = null; + if (left.getType() == EvalType.CONST) { + constValue = ((ConstEval) left).getValue(); + } else if (right.getType() == EvalType.CONST) { + constValue = ((ConstEval) right).getValue(); + } + + if (constValue != null) { + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.GEQ || + evalNode.getType() == EvalType.GTH) { + if (startDatum != null) { + if (constValue.compareTo(startDatum) > 0) { + startDatum = constValue; + } + } else { + startDatum = constValue; + } + } + + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.LEQ || + evalNode.getType() == EvalType.LTH) { + if (endDatum != null) { + if (constValue.compareTo(endDatum) < 0) { + endDatum = constValue; + } + } else { + endDatum = constValue; + } + } + } + } else if (evalNode instanceof BetweenPredicateEval) { + BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode; + if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) { + Datum value = ((ConstEval) betweenEval.getBegin()).getValue(); + if (startDatum != null) { + if (value.compareTo(startDatum) > 0) { + startDatum = value; + } + } else { + startDatum = value; + } + + value = ((ConstEval) betweenEval.getEnd()).getValue(); + if (endDatum != null) { + if (value.compareTo(endDatum) < 0) { + endDatum = value; + } + } else { + endDatum = value; + } + } + } + } + + if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) { + endDatum = new TextDatum(endDatum.asChars() + + new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE})); + } + if (startDatum != null || endDatum != null) { + return new Pair<Datum, Datum>(startDatum, endDatum); + } else { + return null; + } + } + + @Override + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + if (tableDesc == null) { + throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); + } + Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null); + + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + + Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + hbaseConf.set("hbase.loadincremental.threads.max", "2"); + + JobContextImpl jobContext = new JobContextImpl(hbaseConf, + new JobID(finalEbId.getQueryId().toString(), finalEbId.getId())); + + FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext); + Path jobAttemptPath = committer.getJobAttemptPath(jobContext); + FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf()); + if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) { + LOG.warn("No query attempt file in " + jobAttemptPath); + return stagingResultDir; + } + committer.commitJob(jobContext); + + // insert into table + String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); + + HTable htable = new HTable(hbaseConf, tableName); + try { + LoadIncrementalHFiles loadIncrementalHFiles = null; + try { + loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } + loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); + + return stagingResultDir; + } finally { + htable.close(); + } + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + try { + int[] sortKeyIndexes = new int[sortSpecs.length]; + for (int i = 0; i < sortSpecs.length; i++) { + sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName()); + } + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + + HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName()); + try { + byte[][] endKeys = htable.getEndKeys(); + if (endKeys.length == 1) { + return new TupleRange[]{dataRange}; + } + List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length); + + TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs); + Tuple previousTuple = dataRange.getStart(); + + for (byte[] eachEndKey : endKeys) { + Tuple endTuple = new VTuple(sortSpecs.length); + byte[][] rowKeyFields; + if (sortSpecs.length > 1) { + byte[][] splitValues = BytesUtils.splitPreserveAllTokens( + eachEndKey, columnMapping.getRowKeyDelimiter(), columnMapping.getNumColumns()); + if (splitValues.length == sortSpecs.length) { + rowKeyFields = splitValues; + } else { + rowKeyFields = new byte[sortSpecs.length][]; + for (int j = 0; j < sortSpecs.length; j++) { + if (j < splitValues.length) { + rowKeyFields[j] = splitValues[j]; + } else { + rowKeyFields[j] = null; + } + } + } + + } else { + rowKeyFields = new byte[1][]; + rowKeyFields[0] = eachEndKey; + } + + for (int i = 0; i < sortSpecs.length; i++) { + if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) { + endTuple.put(i, + HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } else { + endTuple.put(i, + HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } + } + tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple)); + previousTuple = endTuple; + } + + // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value. + if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) { + tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd()); + } else { + tupleRanges.remove(tupleRanges.size() - 1); + } + return tupleRanges.toArray(new TupleRange[]{}); + } finally { + htable.close(); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new IOException(t.getMessage(), t); + } + } + + public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>(); + rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); + return rules; + } else { + return null; + } + } + + private Column[] getIndexColumns(TableDesc tableDesc) throws IOException { + List<Column> indexColumns = new ArrayList<Column>(); + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeys.length; i++) { + if (isRowKeys[i]) { + indexColumns.add(tableDesc.getSchema().getColumn(i)); + } + } + + return indexColumns.toArray(new Column[]{}); + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(true); + storageProperty.setSupportsInsertInto(true); + return storageProperty; + } + + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (!cNode.isExternal()) { + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists()); + } + } + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (cNode.isExternal()) { + return; + } + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta)); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema()); + LOG.info("Delete table cause query failed:" + new String(hTableDesc.getName())); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + } + + @Override + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + if (tableDesc != null) { + Schema tableSchema = tableDesc.getSchema(); + if (tableSchema.size() != outSchema.size()) { + throw new IOException("The number of table columns is different from SELECT columns"); + } + + for (int i = 0; i < tableSchema.size(); i++) { + if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) { + throw new IOException(outSchema.getColumn(i).getQualifiedName() + + "(" + outSchema.getColumn(i).getDataType().getType() + ")" + + " is different column type with " + tableSchema.getColumn(i).getSimpleName() + + "(" + tableSchema.getColumn(i).getDataType().getType() + ")"); + } + } + } + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java index 39ccf44..b59fe7b 100644 --- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java @@ -46,8 +46,8 @@ public class TestHBaseStorageManager { EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); scanNode.setQual(evalNodeA); - HBaseStorageManager storageManager = - (HBaseStorageManager) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE"); + HBaseTablespace storageManager = + (HBaseTablespace) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE"); List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); assertNotNull(indexEvals); assertEquals(1, indexEvals.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java index c041771..5c8242f 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -57,7 +57,7 @@ public abstract class FileAppender implements Appender { throw new IllegalArgumentException("Configuration must be an instance of TajoConf"); } - this.path = ((FileStorageManager) TableSpaceManager.getFileStorageManager((TajoConf) conf)) + this.path = ((FileTablespace) TableSpaceManager.getFileStorageManager((TajoConf) conf)) .getAppenderFilePath(taskAttemptId, workDir); } else { this.path = workDir;
