http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java new file mode 100644 index 0000000..d143e58 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -0,0 +1,445 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.*; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.BytesUtils; + +import java.io.IOException; +import java.util.Collection; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HBaseScanner implements Scanner { + private static final Log LOG = LogFactory.getLog(HBaseScanner.class); + private static final int DEFAULT_FETCH_SIZE = 1000; + private static final int MAX_LIST_SIZE = 100; + + protected boolean inited = false; + private TajoConf conf; + private Schema schema; + private TableMeta meta; + private HBaseFragment fragment; + private Scan scan; + private HTableInterface htable; + private Configuration hbaseConf; + private Column[] targets; + private TableStats tableStats; + private ResultScanner scanner; + private AtomicBoolean finished = new AtomicBoolean(false); + private float progress = 0.0f; + private int scanFetchSize; + private Result[] scanResults; + private int scanResultIndex = -1; + private Column[] schemaColumns; + + private ColumnMapping columnMapping; + private int[] targetIndexes; + + private int numRows = 0; + + private byte[][][] mappingColumnFamilies; + private boolean[] isRowKeyMappings; + private boolean[] isBinaryColumns; + private boolean[] isColumnKeys; + private boolean[] isColumnValues; + + private int[] rowKeyFieldIndexes; + private char rowKeyDelimiter; + + public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { + this.conf = (TajoConf)conf; + this.schema = schema; + this.meta = meta; + this.fragment = (HBaseFragment)fragment; + this.tableStats = new TableStats(); + } + + @Override + public void init() throws IOException { + inited = true; + schemaColumns = schema.toArray(); + if (fragment != null) { + tableStats.setNumBytes(0); + tableStats.setNumBlocks(1); + } + if (schema != null) { + for(Column eachColumn: schema.getColumns()) { + ColumnStats columnStats = new ColumnStats(eachColumn); + tableStats.addColumnStat(columnStats); + } + } + + scanFetchSize = Integer.parseInt(meta.getOption(HBaseStorageConstants.META_FETCH_ROWNUM_KEY, "" + DEFAULT_FETCH_SIZE)); + if (targets == null) { + targets = schema.toArray(); + } + + columnMapping = new ColumnMapping(schema, meta); + targetIndexes = new int[targets.length]; + int index = 0; + for (Column eachTargetColumn: targets) { + targetIndexes[index++] = schema.getColumnId(eachTargetColumn.getQualifiedName()); + } + + mappingColumnFamilies = columnMapping.getMappingColumns(); + isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + isBinaryColumns = columnMapping.getIsBinaryColumns(); + isColumnKeys = columnMapping.getIsColumnKeys(); + isColumnValues = columnMapping.getIsColumnValues(); + + rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); + rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); + + hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); + + initScanner(); + } + + private void initScanner() throws IOException { + scan = new Scan(); + scan.setBatch(scanFetchSize); + scan.setCacheBlocks(false); + scan.setCaching(scanFetchSize); + + FilterList filters = null; + if (targetIndexes == null || targetIndexes.length == 0) { + filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); + filters.addFilter(new FirstKeyOnlyFilter()); + filters.addFilter(new KeyOnlyFilter()); + } else { + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + for (int eachIndex : targetIndexes) { + if (isRowKeyMappings[eachIndex]) { + continue; + } + byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex]; + if (mappingColumn[1] == null) { + scan.addFamily(mappingColumn[0]); + } else { + scan.addColumn(mappingColumn[0], mappingColumn[1]); + } + } + } + + scan.setStartRow(fragment.getStartRow()); + if (fragment.isLast() && fragment.getStopRow() != null && + fragment.getStopRow().length > 0) { + // last and stopRow is not empty + if (filters == null) { + filters = new FilterList(); + } + filters.addFilter(new InclusiveStopFilter(fragment.getStopRow())); + } else { + scan.setStopRow(fragment.getStopRow()); + } + + if (filters != null) { + scan.setFilter(filters); + } + + if (htable == null) { + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(hbaseConf); + htable = hconn.getTable(fragment.getHbaseTableName()); + } + scanner = htable.getScanner(scan); + } + + @Override + public Tuple next() throws IOException { + if (finished.get()) { + return null; + } + + if (scanResults == null || scanResultIndex >= scanResults.length) { + scanResults = scanner.next(scanFetchSize); + if (scanResults == null || scanResults.length == 0) { + finished.set(true); + progress = 1.0f; + return null; + } + scanResultIndex = 0; + } + + Result result = scanResults[scanResultIndex++]; + Tuple resultTuple = new VTuple(schema.size()); + for (int i = 0; i < targetIndexes.length; i++) { + resultTuple.put(targetIndexes[i], getDatum(result, targetIndexes[i])); + } + numRows++; + return resultTuple; + } + + private Datum getDatum(Result result, int fieldId) throws IOException { + byte[] value = null; + if (isRowKeyMappings[fieldId]) { + value = result.getRow(); + if (!isBinaryColumns[fieldId] && rowKeyFieldIndexes[fieldId] >= 0) { + int rowKeyFieldIndex = rowKeyFieldIndexes[fieldId]; + + byte[][] rowKeyFields = BytesUtils.splitPreserveAllTokens(value, rowKeyDelimiter); + + if (rowKeyFields.length < rowKeyFieldIndex) { + return NullDatum.get(); + } else { + value = rowKeyFields[rowKeyFieldIndex]; + } + } + } else { + if (isColumnKeys[fieldId]) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null) { + Set<byte[]> keySet = cfMap.keySet(); + if (keySet.size() == 1) { + try { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], keySet.iterator().next()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append("["); + int count = 0; + for (byte[] eachKey : keySet) { + if (count > 0) { + sb.append(", "); + } + Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachKey); + sb.append("\"").append(datum.asChars()).append("\""); + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } + sb.append("]"); + return new TextDatum(sb.toString()); + } + } + } else if (isColumnValues[fieldId]) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null) { + Collection<byte[]> valueList = cfMap.values(); + if (valueList.size() == 1) { + try { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], valueList.iterator().next()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append("["); + int count = 0; + for (byte[] eachValue : valueList) { + if (count > 0) { + sb.append(", "); + } + Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachValue); + sb.append("\"").append(datum.asChars()).append("\""); + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } + sb.append("]"); + return new TextDatum(sb.toString()); + } + } + } else { + if (mappingColumnFamilies[fieldId][1] == null) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null && !cfMap.isEmpty()) { + int count = 0; + String delim = ""; + + if (cfMap.size() == 0) { + return NullDatum.get(); + } else if (cfMap.size() == 1) { + // If a column family is mapped without column name like "cf1:" and the number of cells is one, + // return value is flat format not json format. + NavigableMap.Entry<byte[], byte[]> entry = cfMap.entrySet().iterator().next(); + byte[] entryKey = entry.getKey(); + byte[] entryValue = entry.getValue(); + if (entryKey == null || entryKey.length == 0) { + try { + if (isBinaryColumns[fieldId]) { + return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); + } else { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (NavigableMap.Entry<byte[], byte[]> entry : cfMap.entrySet()) { + byte[] entryKey = entry.getKey(); + byte[] entryValue = entry.getValue(); + + String keyText = new String(entryKey); + String valueText = null; + if (entryValue != null) { + try { + if (isBinaryColumns[fieldId]) { + valueText = HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); + } else { + valueText = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + sb.append(delim).append("\"").append(keyText).append("\":\"").append(valueText).append("\""); + delim = ", "; + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } //end of for + sb.append("}"); + return new TextDatum(sb.toString()); + } else { + value = null; + } + } else { + value = result.getValue(mappingColumnFamilies[fieldId][0], mappingColumnFamilies[fieldId][1]); + } + } + } + + if (value == null) { + return NullDatum.get(); + } else { + try { + if (isBinaryColumns[fieldId]) { + return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], value); + } else { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], value); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + + @Override + public void reset() throws IOException { + progress = 0.0f; + scanResultIndex = -1; + scanResults = null; + finished.set(false); + tableStats = new TableStats(); + + if (scanner != null) { + scanner.close(); + scanner = null; + } + + initScanner(); + } + + @Override + public void close() throws IOException { + progress = 1.0f; + finished.set(true); + if (scanner != null) { + try { + scanner.close(); + scanner = null; + } catch (Exception e) { + LOG.warn("Error while closing hbase scanner: " + e.getMessage(), e); + } + } + if (htable != null) { + htable.close(); + htable = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public void setTarget(Column[] targets) { + if (inited) { + throw new IllegalStateException("Should be called before init()"); + } + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + // TODO implements adding column filter to scanner. + } + + @Override + public boolean isSplittable() { + return true; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public TableStats getInputStats() { + tableStats.setNumRows(numRows); + return tableStats; + } + + @Override + public Schema getSchema() { + return schema; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java new file mode 100644 index 0000000..2c525a1 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +public interface HBaseStorageConstants { + public static final String KEY_COLUMN_MAPPING = "key"; + public static final String VALUE_COLUMN_MAPPING = "value"; + public static final String META_FETCH_ROWNUM_KEY = "fetch.rownum"; + public static final String META_TABLE_KEY = "table"; + public static final String META_COLUMNS_KEY = "columns"; + public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys"; + public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file"; + public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum"; + public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter"; + + public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode"; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java new file mode 100644 index 0000000..b47b98c --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java @@ -0,0 +1,1126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.*; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.*; + +/** + * StorageManager for HBase table. + */ +public class HBaseStorageManager extends StorageManager { + private static final Log LOG = LogFactory.getLog(HBaseStorageManager.class); + + private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); + + public HBaseStorageManager (StoreType storeType) { + super(storeType); + } + + @Override + public void storageInit() throws IOException { + } + + @Override + public void closeStorageManager() { + synchronized (connMap) { + for (HConnection eachConn: connMap.values()) { + try { + eachConn.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); + TableStats stats = new TableStats(); + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + tableDesc.setStats(stats); + } + + private void createTable(TableMeta tableMeta, Schema schema, + boolean isExternal, boolean ifNotExists) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) { + throw new IOException("Columns property has more entry than Tajo table columns"); + } + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + int numRowKeys = 0; + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + numRowKeys++; + } + } + if (numRowKeys > 1) { + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Key field type should be TEXT type."); + } + } + } + + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column key field('<cfname>:key:') type should be TEXT type."); + } + if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type."); + } + } + + Configuration hConf = getHBaseConfiguration(conf, tableMeta); + HBaseAdmin hAdmin = new HBaseAdmin(hConf); + + try { + if (isExternal) { + // If tajo table is external table, only check validation. + if (mappedColumns == null || mappedColumns.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + if (!hAdmin.tableExists(hTableName)) { + throw new IOException("HBase table [" + hbaseTableName + "] not exists. " + + "External table should be a existed table."); + } + HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName); + Set<String> tableColumnFamilies = new HashSet<String>(); + for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) { + tableColumnFamilies.add(eachColumn.getNameAsString()); + } + + Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames(); + if (mappingColumnFamilies.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + + for (String eachMappingColumnFamily : mappingColumnFamilies) { + if (!tableColumnFamilies.contains(eachMappingColumnFamily)) { + throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName); + } + } + } else { + if (hAdmin.tableExists(hbaseTableName)) { + if (ifNotExists) { + return; + } else { + throw new IOException("HBase table [" + hbaseTableName + "] already exists."); + } + } + // Creating hbase table + HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema); + + byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta); + if (splitKeys == null) { + hAdmin.createTable(hTableDescriptor); + } else { + hAdmin.createTable(hTableDescriptor, splitKeys); + } + } + } finally { + hAdmin.close(); + } + } + + /** + * Returns initial region split keys. + * + * @param conf + * @param schema + * @param meta + * @return + * @throws IOException + */ + private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException { + String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, ""); + String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, ""); + + if ((splitRowKeys == null || splitRowKeys.isEmpty()) && + (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) { + return null; + } + + ColumnMapping columnMapping = new ColumnMapping(schema, meta); + boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns(); + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + + boolean rowkeyBinary = false; + int numRowKeys = 0; + Column rowKeyColumn = null; + for (int i = 0; i < isBinaryColumns.length; i++) { + if (isBinaryColumns[i] && isRowKeys[i]) { + rowkeyBinary = true; + } + if (isRowKeys[i]) { + numRowKeys++; + rowKeyColumn = schema.getColumn(i); + } + } + + if (rowkeyBinary && numRowKeys > 1) { + throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " + + "Multiple region for creation is not support."); + } + + if (splitRowKeys != null && !splitRowKeys.isEmpty()) { + String[] splitKeyTokens = splitRowKeys.split(","); + byte[][] splitKeys = new byte[splitKeyTokens.length][]; + for (int i = 0; i < splitKeyTokens.length; i++) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } else { + splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } + } + return splitKeys; + } + + if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) { + // If there is many split keys, Tajo allows to define in the file. + Path path = new Path(splitRowKeysFile); + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists."); + } + + SortedSet<String> splitKeySet = new TreeSet<String>(); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(fs.open(path))); + String line = null; + while ( (line = reader.readLine()) != null ) { + if (line.isEmpty()) { + continue; + } + splitKeySet.add(line); + } + } finally { + if (reader != null) { + reader.close(); + } + } + + if (splitKeySet.isEmpty()) { + return null; + } + + byte[][] splitKeys = new byte[splitKeySet.size()][]; + int index = 0; + for (String eachKey: splitKeySet) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } else { + splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } + } + + return splitKeys; + } + + return null; + } + + /** + * Creates Configuration instance and sets with hbase connection options. + * + * @param conf + * @param tableMeta + * @return + * @throws IOException + */ + public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException { + String zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, ""); + if (zkQuorum == null || zkQuorum.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute."); + } + + Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); + + for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) { + String key = eachOption.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + hbaseConf.set(key, eachOption.getValue()); + } + } + return hbaseConf; + } + + /** + * Creates HTableDescription using table meta data. + * + * @param tableMeta + * @param schema + * @return + * @throws IOException + */ + public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + + HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName); + + Collection<String> columnFamilies = columnMapping.getColumnFamilyNames(); + //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column. + if (columnFamilies.isEmpty()) { + for (Column eachColumn: schema.getColumns()) { + columnFamilies.add(eachColumn.getSimpleName()); + } + } + + for (String eachColumnFamily: columnFamilies) { + hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily)); + } + + return hTableDescriptor; + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta())); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema()); + LOG.info("Deleting hbase table: " + new String(hTableDesc.getName())); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + + /** + * Returns columns which are mapped to the rowkey of the hbase table. + * + * @param tableDesc + * @return + * @throws IOException + */ + private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes(); + + Column indexColumn = null; + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + if (columnMapping.getNumRowKeys() == 1 || + rowKeyIndexes[i] == 0) { + indexColumn = tableDesc.getSchema().getColumn(i); + } + } + } + return new Column[]{indexColumn}; + } + + @Override + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List<Fragment> fragments = new ArrayList<Fragment>(1); + Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); + fragments.add(fragment); + return fragments; + } + + List<byte[]> startRows; + List<byte[]> stopRows; + + if (indexPredications != null && !indexPredications.isEmpty()) { + // indexPredications is Disjunctive set + startRows = new ArrayList<byte[]>(); + stopRows = new ArrayList<byte[]>(); + for (IndexPredication indexPredication: indexPredications) { + byte[] startRow; + byte[] stopRow; + if (indexPredication.getStartValue() != null) { + startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); + } else { + startRow = HConstants.EMPTY_START_ROW; + } + if (indexPredication.getStopValue() != null) { + stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); + } else { + stopRow = HConstants.EMPTY_END_ROW; + } + startRows.add(startRow); + stopRows.add(stopRow); + } + } else { + startRows = TUtil.newList(HConstants.EMPTY_START_ROW); + stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); + } + + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + // region startkey -> HBaseFragment + Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>(); + for (int i = 0; i < keys.getFirst().length; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + byte[] regionStartKey = keys.getFirst()[i]; + byte[] regionStopKey = keys.getSecond()[i]; + + int startRowsSize = startRows.size(); + for (int j = 0; j < startRowsSize; j++) { + byte[] startRow = startRows.get(j); + byte[] stopRow = stopRows.get(j); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) + && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? + regionStartKey : startRow; + + byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && + regionStopKey.length > 0 ? regionStopKey : stopRow; + + String regionName = location.getRegionInfo().getRegionNameAsString(); + + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + if (fragmentMap.containsKey(regionStartKey)) { + HBaseFragment prevFragment = fragmentMap.get(regionStartKey); + if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + prevFragment.setStartRow(fragmentStart); + } + if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + prevFragment.setStopRow(fragmentStop); + } + } else { + HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + fragmentStart, fragmentStop, location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + fragment.setLength(storeFileSize); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragmentMap.put(regionStartKey, fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + } + } + } + + List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values()); + Collections.sort(fragments); + if (!fragments.isEmpty()) { + fragments.get(fragments.size() - 1).setLast(true); + } + return (ArrayList<Fragment>) (ArrayList) fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + private byte[] serialize(ColumnMapping columnMapping, + IndexPredication indexPredication, Datum datum) throws IOException { + if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { + return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } else { + return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } + } + + @Override + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); + } else { + return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir); + } + } + + @Override + public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException { + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + return new ArrayList<Fragment>(1); + } + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); + + int start = currentPage * numFragments; + if (start >= keys.getFirst().length) { + return new ArrayList<Fragment>(1); + } + int end = (currentPage + 1) * numFragments; + if (end > keys.getFirst().length) { + end = keys.getFirst().length; + } + for (int i = start; i < end; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + String regionName = location.getRegionInfo().getRegionNameAsString(); + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(), + location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + if (storeLength == 0) { + // If store size is smaller than 1 MB, storeLength is zero + storeLength = 1 * 1024 * 1024; //default 1MB + } + fragment.setLength(storeLength); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragments.add(fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + + if (!fragments.isEmpty()) { + ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); + } + return fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + public HConnection getConnection(Configuration hbaseConf) throws IOException { + synchronized(connMap) { + HConnectionKey key = new HConnectionKey(hbaseConf); + HConnection conn = connMap.get(key); + if (conn == null) { + conn = HConnectionManager.createConnection(hbaseConf); + connMap.put(key, conn); + } + + return conn; + } + } + + static class HConnectionKey { + final static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID, + HConstants.RPC_CODEC_CONF_KEY }; + + private Map<String, String> properties; + private String username; + + HConnectionKey(Configuration conf) { + Map<String, String> m = new HashMap<String, String>(); + if (conf != null) { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + m.put(property, value); + } + } + } + this.properties = Collections.unmodifiableMap(m); + + try { + UserProvider provider = UserProvider.instantiate(conf); + User currentUser = provider.getCurrent(); + if (currentUser != null) { + username = currentUser.getName(); + } + } catch (IOException ioe) { + LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + if (username != null) { + result = username.hashCode(); + } + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.username != null && !this.username.equals(that.username)) { + return false; + } else if (this.username == null && that.username != null) { + return false; + } + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + //noinspection StringEquality + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } + + @Override + public String toString() { + return "HConnectionKey{" + + "properties=" + properties + + ", username='" + username + '\'' + + '}'; + } + } + + public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping, + TableDesc tableDesc, ScanNode scanNode) throws IOException { + List<IndexPredication> indexPredications = new ArrayList<IndexPredication>(); + Column[] indexableColumns = getIndexableColumns(tableDesc); + if (indexableColumns != null && indexableColumns.length == 1) { + // Currently supports only single index column. + List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns); + for (Set<EvalNode> eachEvalSet: indexablePredicateList) { + Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); + if (indexPredicationValues != null) { + IndexPredication indexPredication = new IndexPredication(); + indexPredication.setColumn(indexableColumns[0]); + indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName())); + indexPredication.setStartValue(indexPredicationValues.getFirst()); + indexPredication.setStopValue(indexPredicationValues.getSecond()); + + indexPredications.add(indexPredication); + } + } + } + return indexPredications; + } + + public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException { + List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); + + // if a query statement has a search condition, try to find indexable predicates + if (indexableColumns != null && scanNode.getQual() != null) { + EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual()); + + // add qualifier to schema for qual + for (Column column : indexableColumns) { + for (EvalNode disjunctiveExpr : disjunctiveForms) { + EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr); + Set<EvalNode> indexablePredicateSet = Sets.newHashSet(); + for (EvalNode conjunctiveExpr : conjunctiveForms) { + if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) { + indexablePredicateSet.add(conjunctiveExpr); + } + } + if (!indexablePredicateSet.isEmpty()) { + indexablePredicateList.add(indexablePredicateSet); + } + } + } + } + + return indexablePredicateList; + } + + private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { + if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) { + Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode); + // if it contains only single variable matched to a target column + return variables.size() == 1 && variables.contains(targetColumn); + } else { + return false; + } + } + + /** + * + * @param evalNode The expression to be checked + * @return true if an conjunctive expression, consisting of indexable expressions + */ + private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) { + if (evalNode.getType() == EvalType.AND) { + BinaryEval orEval = (BinaryEval) evalNode; + boolean indexable = + checkIfIndexablePredicate(orEval.getLeftExpr()) && + checkIfIndexablePredicate(orEval.getRightExpr()); + + boolean sameVariable = + EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr()) + .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr())); + + return indexable && sameVariable; + } else { + return false; + } + } + + /** + * Check if an expression consists of one variable and one constant and + * the expression is a comparison operator. + * + * @param evalNode The expression to be checked + * @return true if an expression consists of one variable and one constant + * and the expression is a comparison operator. Other, false. + */ + private boolean checkIfIndexablePredicate(EvalNode evalNode) { + return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode); + } + + public static boolean isIndexableOperator(EvalNode expr) { + return expr.getType() == EvalType.EQUAL || + expr.getType() == EvalType.LEQ || + expr.getType() == EvalType.LTH || + expr.getType() == EvalType.GEQ || + expr.getType() == EvalType.GTH || + expr.getType() == EvalType.BETWEEN; + } + + public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping, + Set<EvalNode> evalNodes) { + Datum startDatum = null; + Datum endDatum = null; + for (EvalNode evalNode: evalNodes) { + if (evalNode instanceof BinaryEval) { + BinaryEval binaryEval = (BinaryEval) evalNode; + EvalNode left = binaryEval.getLeftExpr(); + EvalNode right = binaryEval.getRightExpr(); + + Datum constValue = null; + if (left.getType() == EvalType.CONST) { + constValue = ((ConstEval) left).getValue(); + } else if (right.getType() == EvalType.CONST) { + constValue = ((ConstEval) right).getValue(); + } + + if (constValue != null) { + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.GEQ || + evalNode.getType() == EvalType.GTH) { + if (startDatum != null) { + if (constValue.compareTo(startDatum) > 0) { + startDatum = constValue; + } + } else { + startDatum = constValue; + } + } + + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.LEQ || + evalNode.getType() == EvalType.LTH) { + if (endDatum != null) { + if (constValue.compareTo(endDatum) < 0) { + endDatum = constValue; + } + } else { + endDatum = constValue; + } + } + } + } else if (evalNode instanceof BetweenPredicateEval) { + BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode; + if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) { + Datum value = ((ConstEval) betweenEval.getBegin()).getValue(); + if (startDatum != null) { + if (value.compareTo(startDatum) > 0) { + startDatum = value; + } + } else { + startDatum = value; + } + + value = ((ConstEval) betweenEval.getEnd()).getValue(); + if (endDatum != null) { + if (value.compareTo(endDatum) < 0) { + endDatum = value; + } + } else { + endDatum = value; + } + } + } + } + + if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) { + endDatum = new TextDatum(endDatum.asChars() + + new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE})); + } + if (startDatum != null || endDatum != null) { + return new Pair<Datum, Datum>(startDatum, endDatum); + } else { + return null; + } + } + + @Override + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + if (tableDesc == null) { + throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); + } + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + hbaseConf.set("hbase.loadincremental.threads.max", "2"); + + JobContextImpl jobContext = new JobContextImpl(hbaseConf, + new JobID(finalEbId.getQueryId().toString(), finalEbId.getId())); + + FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext); + Path jobAttemptPath = committer.getJobAttemptPath(jobContext); + FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf()); + if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) { + LOG.warn("No query attempt file in " + jobAttemptPath); + return stagingResultDir; + } + committer.commitJob(jobContext); + + if (tableDesc.getName() == null && tableDesc.getPath() != null) { + + // insert into location + return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false); + } else { + // insert into table + String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); + + HTable htable = new HTable(hbaseConf, tableName); + try { + LoadIncrementalHFiles loadIncrementalHFiles = null; + try { + loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } + loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); + + return stagingResultDir; + } finally { + htable.close(); + } + } + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + try { + int[] sortKeyIndexes = new int[sortSpecs.length]; + for (int i = 0; i < sortSpecs.length; i++) { + sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName()); + } + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + + HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName()); + try { + byte[][] endKeys = htable.getEndKeys(); + if (endKeys.length == 1) { + return new TupleRange[]{dataRange}; + } + List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length); + + TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs); + Tuple previousTuple = dataRange.getStart(); + + for (byte[] eachEndKey : endKeys) { + Tuple endTuple = new VTuple(sortSpecs.length); + byte[][] rowKeyFields; + if (sortSpecs.length > 1) { + byte[][] splitValues = BytesUtils.splitPreserveAllTokens(eachEndKey, columnMapping.getRowKeyDelimiter()); + if (splitValues.length == sortSpecs.length) { + rowKeyFields = splitValues; + } else { + rowKeyFields = new byte[sortSpecs.length][]; + for (int j = 0; j < sortSpecs.length; j++) { + if (j < splitValues.length) { + rowKeyFields[j] = splitValues[j]; + } else { + rowKeyFields[j] = null; + } + } + } + + } else { + rowKeyFields = new byte[1][]; + rowKeyFields[0] = eachEndKey; + } + + for (int i = 0; i < sortSpecs.length; i++) { + if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) { + endTuple.put(i, + HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } else { + endTuple.put(i, + HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } + } + tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple)); + previousTuple = endTuple; + } + + // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value. + if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) { + tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd()); + } else { + tupleRanges.remove(tupleRanges.size() - 1); + } + return tupleRanges.toArray(new TupleRange[]{}); + } finally { + htable.close(); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new IOException(t.getMessage(), t); + } + } + + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + List<RewriteRule> rules = new ArrayList<RewriteRule>(); + rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); + return rules; + } else { + return null; + } + } + + private Column[] getIndexColumns(TableDesc tableDesc) throws IOException { + List<Column> indexColumns = new ArrayList<Column>(); + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeys.length; i++) { + if (isRowKeys[i]) { + indexColumns.add(tableDesc.getSchema().getColumn(i)); + } + } + + return indexColumns.toArray(new Column[]{}); + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(true); + storageProperty.setSupportsInsertInto(true); + return storageProperty; + } + + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (!cNode.isExternal()) { + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists()); + } + } + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (cNode.isExternal()) { + return; + } + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta)); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema()); + LOG.info("Delete table cause query failed:" + hTableDesc.getName()); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + } + + @Override + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + if (tableDesc != null) { + Schema tableSchema = tableDesc.getSchema(); + if (tableSchema.size() != outSchema.size()) { + throw new IOException("The number of table columns is different from SELECT columns"); + } + + for (int i = 0; i < tableSchema.size(); i++) { + if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) { + throw new IOException(outSchema.getColumn(i).getQualifiedName() + + "(" + outSchema.getColumn(i).getDataType().getType() + ")" + + " is different column type with " + tableSchema.getColumn(i).getSimpleName() + + "(" + tableSchema.getColumn(i).getDataType().getType() + ")"); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java new file mode 100644 index 0000000..a0ad492 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; + +public class HBaseTextSerializerDeserializer { + public static Datum deserialize(Column col, byte[] bytes) throws IOException { + Datum datum; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + case INT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + case INT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt8(new String(bytes, 0, bytes.length)); + break; + case FLOAT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createFloat4(new String(bytes, 0, bytes.length)); + break; + case FLOAT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length)); + break; + case TEXT: + datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); + break; + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + public static byte[] serialize(Column col, Datum datum) throws IOException { + if (datum == null || datum instanceof NullDatum) { + return null; + } + + return datum.asChars().getBytes(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java new file mode 100644 index 0000000..b9425f9 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.util.TreeSet; + +public class HFileAppender extends AbstractHBaseAppender { + private static final Log LOG = LogFactory.getLog(HFileAppender.class); + + private RecordWriter<ImmutableBytesWritable, Cell> writer; + private TaskAttemptContext writerContext; + private Path workingFilePath; + private FileOutputCommitter committer; + + public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + super(conf, taskAttemptId, schema, meta, stagingDir); + } + + @Override + public void init() throws IOException { + super.init(); + + Configuration taskConf = new Configuration(); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString()); + + ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId(); + writerContext = new TaskAttemptContextImpl(taskConf, + new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP, + taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId())); + + HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2(); + try { + writer = hFileOutputFormat2.getRecordWriter(writerContext); + + committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext); + workingFilePath = committer.getWorkPath(); + } catch (InterruptedException e) { + throw new IOException(e.getMessage(), e); + } + + LOG.info("Created hbase file writer: " + workingFilePath); + } + + long totalNumBytes = 0; + ImmutableBytesWritable keyWritable = new ImmutableBytesWritable(); + boolean first = true; + TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR); + + + @Override + public void addTuple(Tuple tuple) throws IOException { + Datum datum; + + byte[] rowkey = getRowKeyBytes(tuple); + + if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) { + try { + for (KeyValue kv : kvSet) { + writer.write(keyWritable, kv); + totalNumBytes += keyWritable.getLength() + kv.getLength(); + } + kvSet.clear(); + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } + } + + first = false; + + keyWritable.set(rowkey); + + readKeyValues(tuple, rowkey); + if (keyValues != null) { + for (KeyValue eachKeyVal: keyValues) { + kvSet.add(eachKeyVal); + } + } + } + + @Override + public void flush() throws IOException { + } + + @Override + public long getEstimatedOutputSize() throws IOException { + // StoreTableExec uses this value as rolling file length + // Not rolling + return 0; + } + + @Override + public void close() throws IOException { + if (!kvSet.isEmpty()) { + try { + for (KeyValue kv : kvSet) { + writer.write(keyWritable, kv); + totalNumBytes += keyWritable.getLength() + keyWritable.getLength(); + } + kvSet.clear(); + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } + } + + if (enabledStats) { + stats.setNumBytes(totalNumBytes); + } + if (writer != null) { + try { + writer.close(writerContext); + committer.commitTask(writerContext); + } catch (InterruptedException e) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java new file mode 100644 index 0000000..3a58e50 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +public class IndexPredication { + private Column column; + private int columnId; + private Datum startValue; + private Datum stopValue; + + public Column getColumn() { + return column; + } + + public void setColumn(Column column) { + this.column = column; + } + + public int getColumnId() { + return columnId; + } + + public void setColumnId(int columnId) { + this.columnId = columnId; + } + + public Datum getStartValue() { + return startValue; + } + + public void setStartValue(Datum startValue) { + this.startValue = startValue; + } + + public Datum getStopValue() { + return stopValue; + } + + public void setStopValue(Datum stopValue) { + this.stopValue = stopValue; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java new file mode 100644 index 0000000..4577703 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +public class RowKeyMapping { + private boolean isBinary; + private int keyFieldIndex; + + public boolean isBinary() { + return isBinary; + } + + public void setBinary(boolean isBinary) { + this.isBinary = isBinary; + } + + public int getKeyFieldIndex() { + return keyFieldIndex; + } + + public void setKeyFieldIndex(int keyFieldIndex) { + this.keyFieldIndex = keyFieldIndex; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 3a3bb57..b10d423 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -18,6 +18,7 @@ package org.apache.tajo.storage.parquet; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.storage.StorageConstants; import parquet.hadoop.ParquetOutputFormat; import parquet.hadoop.metadata.CompressionCodecName; @@ -51,11 +52,11 @@ public class ParquetAppender extends FileAppender { * @param conf Configuration properties. * @param schema The table schema. * @param meta The table metadata. - * @param path The path of the Parquet file to write to. + * @param workDir The path of the Parquet file to write to. */ - public ParquetAppender(Configuration conf, Schema schema, TableMeta meta, - Path path) throws IOException { - super(conf, schema, meta, path); + public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta, + Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); this.blockSize = Integer.parseInt( meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE)); this.pageSize = Integer.parseInt( http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java index 0e5c0e9..23815d9 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -710,8 +711,9 @@ public class RCFile { return out.getPos(); } - public RCFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException { - super(conf, schema, meta, path); + public RCFileAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); RECORD_INTERVAL = conf.getInt(RECORD_INTERVAL_CONF_STR, RECORD_INTERVAL); COLUMNS_BUFFER_SIZE = conf.getInt(COLUMNS_BUFFER_SIZE_CONF_STR, COLUMNS_BUFFER_SIZE); http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java index f5cef62..14e0f26 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.*; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -73,8 +74,9 @@ public class SequenceFileAppender extends FileAppender { private Writable EMPTY_KEY; - public SequenceFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, path); + public SequenceFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); this.meta = meta; this.schema = schema; } http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java index aad97bc..29c6987 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; @@ -41,7 +42,6 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.*; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; -import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; @@ -80,9 +80,10 @@ public class DelimitedTextFile { private NonSyncByteArrayOutputStream os; private FieldSerializerDeserializer serde; - public DelimitedTextFileAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) + public DelimitedTextFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path path) throws IOException { - super(conf, schema, meta, path); + super(conf, taskAttemptId, schema, meta, path); this.fs = path.getFileSystem(conf); this.meta = meta; this.schema = schema; http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniAppender.java index a17fbf3..0681c2c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniAppender.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -47,8 +48,9 @@ public class TrevniAppender extends FileAppender { private TableStatistics stats = null; private boolean flushed = false; - public TrevniAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { - super(conf, schema, meta, path); + public TrevniAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); } public void init() throws IOException { http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/proto/StorageFragmentProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/src/main/proto/StorageFragmentProtos.proto new file mode 100644 index 0000000..dd79d74 --- /dev/null +++ b/tajo-storage/src/main/proto/StorageFragmentProtos.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage.fragment"; +option java_outer_classname = "StorageFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message HBaseFragmentProto { + required string tableName = 1; + required string hbaseTableName = 2; + required bytes startRow = 3; + required bytes stopRow = 4; + required bool last = 5; + required int64 length = 6; + optional string regionLocation = 7; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/69373878/tajo-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml index f262585..0f05536 100644 --- a/tajo-storage/src/main/resources/storage-default.xml +++ b/tajo-storage/src/main/resources/storage-default.xml @@ -35,7 +35,7 @@ <!--- Registered Scanner Handler --> <property> <name>tajo.storage.scanner-handler</name> - <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,hbase</value> </property> <!--- Fragment Class Configurations --> @@ -75,6 +75,10 @@ <name>tajo.storage.fragment.avro.class</name> <value>org.apache.tajo.storage.fragment.FileFragment</value> </property> + <property> + <name>tajo.storage.fragment.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseFragment</value> + </property> <!--- Scanner Handler --> <property> @@ -122,12 +126,17 @@ <value>org.apache.tajo.storage.avro.AvroScanner</value> </property> + <property> + <name>tajo.storage.scanner-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HBaseScanner</value> + </property> + <!--- Appender Handler --> <property> <name>tajo.storage.appender-handler</name> - <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value> + <value>textfile,csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,hbase,hfile</value> </property> - + <property> <name>tajo.storage.appender-handler.textfile.class</name> <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value> @@ -172,4 +181,14 @@ <name>tajo.storage.appender-handler.avro.class</name> <value>org.apache.tajo.storage.avro.AvroAppender</value> </property> + + <property> + <name>tajo.storage.appender-handler.hbase.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> + + <property> + <name>tajo.storage.appender-handler.hfile.class</name> + <value>org.apache.tajo.storage.hbase.HFileAppender</value> + </property> </configuration>
