http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java deleted file mode 100644 index d143e58..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java +++ /dev/null @@ -1,445 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.filter.*; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.storage.Scanner; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.BytesUtils; - -import java.io.IOException; -import java.util.Collection; -import java.util.NavigableMap; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -public class HBaseScanner implements Scanner { - private static final Log LOG = LogFactory.getLog(HBaseScanner.class); - private static final int DEFAULT_FETCH_SIZE = 1000; - private static final int MAX_LIST_SIZE = 100; - - protected boolean inited = false; - private TajoConf conf; - private Schema schema; - private TableMeta meta; - private HBaseFragment fragment; - private Scan scan; - private HTableInterface htable; - private Configuration hbaseConf; - private Column[] targets; - private TableStats tableStats; - private ResultScanner scanner; - private AtomicBoolean finished = new AtomicBoolean(false); - private float progress = 0.0f; - private int scanFetchSize; - private Result[] scanResults; - private int scanResultIndex = -1; - private Column[] schemaColumns; - - private ColumnMapping columnMapping; - private int[] targetIndexes; - - private int numRows = 0; - - private byte[][][] mappingColumnFamilies; - private boolean[] isRowKeyMappings; - private boolean[] isBinaryColumns; - private boolean[] isColumnKeys; - private boolean[] isColumnValues; - - private int[] rowKeyFieldIndexes; - private char rowKeyDelimiter; - - public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { - this.conf = (TajoConf)conf; - this.schema = schema; - this.meta = meta; - this.fragment = (HBaseFragment)fragment; - this.tableStats = new TableStats(); - } - - @Override - public void init() throws IOException { - inited = true; - schemaColumns = schema.toArray(); - if (fragment != null) { - tableStats.setNumBytes(0); - tableStats.setNumBlocks(1); - } - if (schema != null) { - for(Column eachColumn: schema.getColumns()) { - ColumnStats columnStats = new ColumnStats(eachColumn); - tableStats.addColumnStat(columnStats); - } - } - - scanFetchSize = Integer.parseInt(meta.getOption(HBaseStorageConstants.META_FETCH_ROWNUM_KEY, "" + DEFAULT_FETCH_SIZE)); - if (targets == null) { - targets = schema.toArray(); - } - - columnMapping = new ColumnMapping(schema, meta); - targetIndexes = new int[targets.length]; - int index = 0; - for (Column eachTargetColumn: targets) { - targetIndexes[index++] = schema.getColumnId(eachTargetColumn.getQualifiedName()); - } - - mappingColumnFamilies = columnMapping.getMappingColumns(); - isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - isBinaryColumns = columnMapping.getIsBinaryColumns(); - isColumnKeys = columnMapping.getIsColumnKeys(); - isColumnValues = columnMapping.getIsColumnValues(); - - rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); - rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); - - hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); - - initScanner(); - } - - private void initScanner() throws IOException { - scan = new Scan(); - scan.setBatch(scanFetchSize); - scan.setCacheBlocks(false); - scan.setCaching(scanFetchSize); - - FilterList filters = null; - if (targetIndexes == null || targetIndexes.length == 0) { - filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); - filters.addFilter(new FirstKeyOnlyFilter()); - filters.addFilter(new KeyOnlyFilter()); - } else { - boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - for (int eachIndex : targetIndexes) { - if (isRowKeyMappings[eachIndex]) { - continue; - } - byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex]; - if (mappingColumn[1] == null) { - scan.addFamily(mappingColumn[0]); - } else { - scan.addColumn(mappingColumn[0], mappingColumn[1]); - } - } - } - - scan.setStartRow(fragment.getStartRow()); - if (fragment.isLast() && fragment.getStopRow() != null && - fragment.getStopRow().length > 0) { - // last and stopRow is not empty - if (filters == null) { - filters = new FilterList(); - } - filters.addFilter(new InclusiveStopFilter(fragment.getStopRow())); - } else { - scan.setStopRow(fragment.getStopRow()); - } - - if (filters != null) { - scan.setFilter(filters); - } - - if (htable == null) { - HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) - .getConnection(hbaseConf); - htable = hconn.getTable(fragment.getHbaseTableName()); - } - scanner = htable.getScanner(scan); - } - - @Override - public Tuple next() throws IOException { - if (finished.get()) { - return null; - } - - if (scanResults == null || scanResultIndex >= scanResults.length) { - scanResults = scanner.next(scanFetchSize); - if (scanResults == null || scanResults.length == 0) { - finished.set(true); - progress = 1.0f; - return null; - } - scanResultIndex = 0; - } - - Result result = scanResults[scanResultIndex++]; - Tuple resultTuple = new VTuple(schema.size()); - for (int i = 0; i < targetIndexes.length; i++) { - resultTuple.put(targetIndexes[i], getDatum(result, targetIndexes[i])); - } - numRows++; - return resultTuple; - } - - private Datum getDatum(Result result, int fieldId) throws IOException { - byte[] value = null; - if (isRowKeyMappings[fieldId]) { - value = result.getRow(); - if (!isBinaryColumns[fieldId] && rowKeyFieldIndexes[fieldId] >= 0) { - int rowKeyFieldIndex = rowKeyFieldIndexes[fieldId]; - - byte[][] rowKeyFields = BytesUtils.splitPreserveAllTokens(value, rowKeyDelimiter); - - if (rowKeyFields.length < rowKeyFieldIndex) { - return NullDatum.get(); - } else { - value = rowKeyFields[rowKeyFieldIndex]; - } - } - } else { - if (isColumnKeys[fieldId]) { - NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); - if (cfMap != null) { - Set<byte[]> keySet = cfMap.keySet(); - if (keySet.size() == 1) { - try { - return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], keySet.iterator().next()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } else { - StringBuilder sb = new StringBuilder(); - sb.append("["); - int count = 0; - for (byte[] eachKey : keySet) { - if (count > 0) { - sb.append(", "); - } - Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachKey); - sb.append("\"").append(datum.asChars()).append("\""); - count++; - if (count > MAX_LIST_SIZE) { - break; - } - } - sb.append("]"); - return new TextDatum(sb.toString()); - } - } - } else if (isColumnValues[fieldId]) { - NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); - if (cfMap != null) { - Collection<byte[]> valueList = cfMap.values(); - if (valueList.size() == 1) { - try { - return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], valueList.iterator().next()); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } else { - StringBuilder sb = new StringBuilder(); - sb.append("["); - int count = 0; - for (byte[] eachValue : valueList) { - if (count > 0) { - sb.append(", "); - } - Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachValue); - sb.append("\"").append(datum.asChars()).append("\""); - count++; - if (count > MAX_LIST_SIZE) { - break; - } - } - sb.append("]"); - return new TextDatum(sb.toString()); - } - } - } else { - if (mappingColumnFamilies[fieldId][1] == null) { - NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); - if (cfMap != null && !cfMap.isEmpty()) { - int count = 0; - String delim = ""; - - if (cfMap.size() == 0) { - return NullDatum.get(); - } else if (cfMap.size() == 1) { - // If a column family is mapped without column name like "cf1:" and the number of cells is one, - // return value is flat format not json format. - NavigableMap.Entry<byte[], byte[]> entry = cfMap.entrySet().iterator().next(); - byte[] entryKey = entry.getKey(); - byte[] entryValue = entry.getValue(); - if (entryKey == null || entryKey.length == 0) { - try { - if (isBinaryColumns[fieldId]) { - return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); - } else { - return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } - } - StringBuilder sb = new StringBuilder(); - sb.append("{"); - for (NavigableMap.Entry<byte[], byte[]> entry : cfMap.entrySet()) { - byte[] entryKey = entry.getKey(); - byte[] entryValue = entry.getValue(); - - String keyText = new String(entryKey); - String valueText = null; - if (entryValue != null) { - try { - if (isBinaryColumns[fieldId]) { - valueText = HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); - } else { - valueText = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } - sb.append(delim).append("\"").append(keyText).append("\":\"").append(valueText).append("\""); - delim = ", "; - count++; - if (count > MAX_LIST_SIZE) { - break; - } - } //end of for - sb.append("}"); - return new TextDatum(sb.toString()); - } else { - value = null; - } - } else { - value = result.getValue(mappingColumnFamilies[fieldId][0], mappingColumnFamilies[fieldId][1]); - } - } - } - - if (value == null) { - return NullDatum.get(); - } else { - try { - if (isBinaryColumns[fieldId]) { - return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], value); - } else { - return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], value); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - } - } - - @Override - public void reset() throws IOException { - progress = 0.0f; - scanResultIndex = -1; - scanResults = null; - finished.set(false); - tableStats = new TableStats(); - - if (scanner != null) { - scanner.close(); - scanner = null; - } - - initScanner(); - } - - @Override - public void close() throws IOException { - progress = 1.0f; - finished.set(true); - if (scanner != null) { - try { - scanner.close(); - scanner = null; - } catch (Exception e) { - LOG.warn("Error while closing hbase scanner: " + e.getMessage(), e); - } - } - if (htable != null) { - htable.close(); - htable = null; - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public void setTarget(Column[] targets) { - if (inited) { - throw new IllegalStateException("Should be called before init()"); - } - this.targets = targets; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setSearchCondition(Object expr) { - // TODO implements adding column filter to scanner. - } - - @Override - public boolean isSplittable() { - return true; - } - - @Override - public float getProgress() { - return progress; - } - - @Override - public TableStats getInputStats() { - tableStats.setNumRows(numRows); - return tableStats; - } - - @Override - public Schema getSchema() { - return schema; - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java deleted file mode 100644 index 2c525a1..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -public interface HBaseStorageConstants { - public static final String KEY_COLUMN_MAPPING = "key"; - public static final String VALUE_COLUMN_MAPPING = "value"; - public static final String META_FETCH_ROWNUM_KEY = "fetch.rownum"; - public static final String META_TABLE_KEY = "table"; - public static final String META_COLUMNS_KEY = "columns"; - public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys"; - public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file"; - public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum"; - public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter"; - - public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode"; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java deleted file mode 100644 index b47b98c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java +++ /dev/null @@ -1,1126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.tajo.*; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.expr.*; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.rewrite.RewriteRule; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.*; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.*; - -/** - * StorageManager for HBase table. - */ -public class HBaseStorageManager extends StorageManager { - private static final Log LOG = LogFactory.getLog(HBaseStorageManager.class); - - private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); - - public HBaseStorageManager (StoreType storeType) { - super(storeType); - } - - @Override - public void storageInit() throws IOException { - } - - @Override - public void closeStorageManager() { - synchronized (connMap) { - for (HConnection eachConn: connMap.values()) { - try { - eachConn.close(); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - } - } - - @Override - public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { - createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); - TableStats stats = new TableStats(); - stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); - tableDesc.setStats(stats); - } - - private void createTable(TableMeta tableMeta, Schema schema, - boolean isExternal, boolean ifNotExists) throws IOException { - String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); - if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_TABLE_KEY + "' attribute."); - } - TableName hTableName = TableName.valueOf(hbaseTableName); - - String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); - if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) { - throw new IOException("Columns property has more entry than Tajo table columns"); - } - - ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); - int numRowKeys = 0; - boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (isRowKeyMappings[i]) { - numRowKeys++; - } - } - if (numRowKeys > 1) { - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { - throw new IOException("Key field type should be TEXT type."); - } - } - } - - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { - throw new IOException("Column key field('<cfname>:key:') type should be TEXT type."); - } - if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { - throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type."); - } - } - - Configuration hConf = getHBaseConfiguration(conf, tableMeta); - HBaseAdmin hAdmin = new HBaseAdmin(hConf); - - try { - if (isExternal) { - // If tajo table is external table, only check validation. - if (mappedColumns == null || mappedColumns.isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); - } - if (!hAdmin.tableExists(hTableName)) { - throw new IOException("HBase table [" + hbaseTableName + "] not exists. " + - "External table should be a existed table."); - } - HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName); - Set<String> tableColumnFamilies = new HashSet<String>(); - for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) { - tableColumnFamilies.add(eachColumn.getNameAsString()); - } - - Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames(); - if (mappingColumnFamilies.isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); - } - - for (String eachMappingColumnFamily : mappingColumnFamilies) { - if (!tableColumnFamilies.contains(eachMappingColumnFamily)) { - throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName); - } - } - } else { - if (hAdmin.tableExists(hbaseTableName)) { - if (ifNotExists) { - return; - } else { - throw new IOException("HBase table [" + hbaseTableName + "] already exists."); - } - } - // Creating hbase table - HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema); - - byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta); - if (splitKeys == null) { - hAdmin.createTable(hTableDescriptor); - } else { - hAdmin.createTable(hTableDescriptor, splitKeys); - } - } - } finally { - hAdmin.close(); - } - } - - /** - * Returns initial region split keys. - * - * @param conf - * @param schema - * @param meta - * @return - * @throws IOException - */ - private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException { - String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, ""); - String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, ""); - - if ((splitRowKeys == null || splitRowKeys.isEmpty()) && - (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) { - return null; - } - - ColumnMapping columnMapping = new ColumnMapping(schema, meta); - boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns(); - boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); - - boolean rowkeyBinary = false; - int numRowKeys = 0; - Column rowKeyColumn = null; - for (int i = 0; i < isBinaryColumns.length; i++) { - if (isBinaryColumns[i] && isRowKeys[i]) { - rowkeyBinary = true; - } - if (isRowKeys[i]) { - numRowKeys++; - rowKeyColumn = schema.getColumn(i); - } - } - - if (rowkeyBinary && numRowKeys > 1) { - throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " + - "Multiple region for creation is not support."); - } - - if (splitRowKeys != null && !splitRowKeys.isEmpty()) { - String[] splitKeyTokens = splitRowKeys.split(","); - byte[][] splitKeys = new byte[splitKeyTokens.length][]; - for (int i = 0; i < splitKeyTokens.length; i++) { - if (numRowKeys == 1 && rowkeyBinary) { - splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); - } else { - splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); - } - } - return splitKeys; - } - - if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) { - // If there is many split keys, Tajo allows to define in the file. - Path path = new Path(splitRowKeysFile); - FileSystem fs = path.getFileSystem(conf); - if (!fs.exists(path)) { - throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists."); - } - - SortedSet<String> splitKeySet = new TreeSet<String>(); - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(fs.open(path))); - String line = null; - while ( (line = reader.readLine()) != null ) { - if (line.isEmpty()) { - continue; - } - splitKeySet.add(line); - } - } finally { - if (reader != null) { - reader.close(); - } - } - - if (splitKeySet.isEmpty()) { - return null; - } - - byte[][] splitKeys = new byte[splitKeySet.size()][]; - int index = 0; - for (String eachKey: splitKeySet) { - if (numRowKeys == 1 && rowkeyBinary) { - splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); - } else { - splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); - } - } - - return splitKeys; - } - - return null; - } - - /** - * Creates Configuration instance and sets with hbase connection options. - * - * @param conf - * @param tableMeta - * @return - * @throws IOException - */ - public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException { - String zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, ""); - if (zkQuorum == null || zkQuorum.trim().isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute."); - } - - Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf); - hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); - - for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) { - String key = eachOption.getKey(); - if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { - hbaseConf.set(key, eachOption.getValue()); - } - } - return hbaseConf; - } - - /** - * Creates HTableDescription using table meta data. - * - * @param tableMeta - * @param schema - * @return - * @throws IOException - */ - public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException { - String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); - if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { - throw new IOException("HBase mapped table is required a '" + - HBaseStorageConstants.META_TABLE_KEY + "' attribute."); - } - TableName hTableName = TableName.valueOf(hbaseTableName); - - ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); - - HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName); - - Collection<String> columnFamilies = columnMapping.getColumnFamilyNames(); - //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column. - if (columnFamilies.isEmpty()) { - for (Column eachColumn: schema.getColumns()) { - columnFamilies.add(eachColumn.getSimpleName()); - } - } - - for (String eachColumnFamily: columnFamilies) { - hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily)); - } - - return hTableDescriptor; - } - - @Override - public void purgeTable(TableDesc tableDesc) throws IOException { - HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta())); - - try { - HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema()); - LOG.info("Deleting hbase table: " + new String(hTableDesc.getName())); - hAdmin.disableTable(hTableDesc.getName()); - hAdmin.deleteTable(hTableDesc.getName()); - } finally { - hAdmin.close(); - } - } - - /** - * Returns columns which are mapped to the rowkey of the hbase table. - * - * @param tableDesc - * @return - * @throws IOException - */ - private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException { - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes(); - - Column indexColumn = null; - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (isRowKeyMappings[i]) { - if (columnMapping.getNumRowKeys() == 1 || - rowKeyIndexes[i] == 0) { - indexColumn = tableDesc.getSchema().getColumn(i); - } - } - } - return new Column[]{indexColumn}; - } - - @Override - public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - - List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); - Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); - HTable htable = null; - HBaseAdmin hAdmin = null; - - try { - htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); - - org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); - if (null == regLoc) { - throw new IOException("Expecting at least one region."); - } - List<Fragment> fragments = new ArrayList<Fragment>(1); - Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), - HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); - fragments.add(fragment); - return fragments; - } - - List<byte[]> startRows; - List<byte[]> stopRows; - - if (indexPredications != null && !indexPredications.isEmpty()) { - // indexPredications is Disjunctive set - startRows = new ArrayList<byte[]>(); - stopRows = new ArrayList<byte[]>(); - for (IndexPredication indexPredication: indexPredications) { - byte[] startRow; - byte[] stopRow; - if (indexPredication.getStartValue() != null) { - startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); - } else { - startRow = HConstants.EMPTY_START_ROW; - } - if (indexPredication.getStopValue() != null) { - stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); - } else { - stopRow = HConstants.EMPTY_END_ROW; - } - startRows.add(startRow); - stopRows.add(stopRow); - } - } else { - startRows = TUtil.newList(HConstants.EMPTY_START_ROW); - stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); - } - - hAdmin = new HBaseAdmin(hconf); - Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); - - // region startkey -> HBaseFragment - Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>(); - for (int i = 0; i < keys.getFirst().length; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - - byte[] regionStartKey = keys.getFirst()[i]; - byte[] regionStopKey = keys.getSecond()[i]; - - int startRowsSize = startRows.size(); - for (int j = 0; j < startRowsSize; j++) { - byte[] startRow = startRows.get(j); - byte[] stopRow = stopRows.get(j); - // determine if the given start an stop key fall into the region - if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) - && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { - byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? - regionStartKey : startRow; - - byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && - regionStopKey.length > 0 ? regionStopKey : stopRow; - - String regionName = location.getRegionInfo().getRegionNameAsString(); - - ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); - if (serverLoad == null) { - serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); - serverLoadMap.put(location.getServerName(), serverLoad); - } - - if (fragmentMap.containsKey(regionStartKey)) { - HBaseFragment prevFragment = fragmentMap.get(regionStartKey); - if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { - prevFragment.setStartRow(fragmentStart); - } - if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { - prevFragment.setStopRow(fragmentStop); - } - } else { - HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), - fragmentStart, fragmentStop, location.getHostname()); - - // get region size - boolean foundLength = false; - for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { - if (regionName.equals(Bytes.toString(entry.getKey()))) { - RegionLoad regionLoad = entry.getValue(); - long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; - fragment.setLength(storeFileSize); - foundLength = true; - break; - } - } - - if (!foundLength) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } - - fragmentMap.put(regionStartKey, fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } - } - } - } - } - - List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values()); - Collections.sort(fragments); - if (!fragments.isEmpty()) { - fragments.get(fragments.size() - 1).setLast(true); - } - return (ArrayList<Fragment>) (ArrayList) fragments; - } finally { - if (htable != null) { - htable.close(); - } - if (hAdmin != null) { - hAdmin.close(); - } - } - } - - private byte[] serialize(ColumnMapping columnMapping, - IndexPredication indexPredication, Datum datum) throws IOException { - if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { - return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); - } else { - return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum); - } - } - - @Override - public Appender getAppender(OverridableConf queryContext, - QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) - throws IOException { - if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { - return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); - } else { - return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir); - } - } - - @Override - public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException { - Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); - HTable htable = null; - HBaseAdmin hAdmin = null; - try { - htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); - - org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); - if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { - return new ArrayList<Fragment>(1); - } - hAdmin = new HBaseAdmin(hconf); - Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); - - List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); - - int start = currentPage * numFragments; - if (start >= keys.getFirst().length) { - return new ArrayList<Fragment>(1); - } - int end = (currentPage + 1) * numFragments; - if (end > keys.getFirst().length) { - end = keys.getFirst().length; - } - for (int i = start; i < end; i++) { - HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); - - String regionName = location.getRegionInfo().getRegionNameAsString(); - ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); - if (serverLoad == null) { - serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); - serverLoadMap.put(location.getServerName(), serverLoad); - } - - HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(), - location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname()); - - // get region size - boolean foundLength = false; - for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { - if (regionName.equals(Bytes.toString(entry.getKey()))) { - RegionLoad regionLoad = entry.getValue(); - long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; - if (storeLength == 0) { - // If store size is smaller than 1 MB, storeLength is zero - storeLength = 1 * 1024 * 1024; //default 1MB - } - fragment.setLength(storeLength); - foundLength = true; - break; - } - } - - if (!foundLength) { - fragment.setLength(TajoConstants.UNKNOWN_LENGTH); - } - - fragments.add(fragment); - if (LOG.isDebugEnabled()) { - LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); - } - } - - if (!fragments.isEmpty()) { - ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); - } - return fragments; - } finally { - if (htable != null) { - htable.close(); - } - if (hAdmin != null) { - hAdmin.close(); - } - } - } - - public HConnection getConnection(Configuration hbaseConf) throws IOException { - synchronized(connMap) { - HConnectionKey key = new HConnectionKey(hbaseConf); - HConnection conn = connMap.get(key); - if (conn == null) { - conn = HConnectionManager.createConnection(hbaseConf); - connMap.put(key, conn); - } - - return conn; - } - } - - static class HConnectionKey { - final static String[] CONNECTION_PROPERTIES = new String[] { - HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.ZOOKEEPER_CLIENT_PORT, - HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, - HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.HBASE_CLIENT_PREFETCH_LIMIT, - HConstants.HBASE_META_SCANNER_CACHING, - HConstants.HBASE_CLIENT_INSTANCE_ID, - HConstants.RPC_CODEC_CONF_KEY }; - - private Map<String, String> properties; - private String username; - - HConnectionKey(Configuration conf) { - Map<String, String> m = new HashMap<String, String>(); - if (conf != null) { - for (String property : CONNECTION_PROPERTIES) { - String value = conf.get(property); - if (value != null) { - m.put(property, value); - } - } - } - this.properties = Collections.unmodifiableMap(m); - - try { - UserProvider provider = UserProvider.instantiate(conf); - User currentUser = provider.getCurrent(); - if (currentUser != null) { - username = currentUser.getName(); - } - } catch (IOException ioe) { - LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); - } - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - if (username != null) { - result = username.hashCode(); - } - for (String property : CONNECTION_PROPERTIES) { - String value = properties.get(property); - if (value != null) { - result = prime * result + value.hashCode(); - } - } - - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) - return true; - if (obj == null) - return false; - if (getClass() != obj.getClass()) - return false; - HConnectionKey that = (HConnectionKey) obj; - if (this.username != null && !this.username.equals(that.username)) { - return false; - } else if (this.username == null && that.username != null) { - return false; - } - if (this.properties == null) { - if (that.properties != null) { - return false; - } - } else { - if (that.properties == null) { - return false; - } - for (String property : CONNECTION_PROPERTIES) { - String thisValue = this.properties.get(property); - String thatValue = that.properties.get(property); - //noinspection StringEquality - if (thisValue == thatValue) { - continue; - } - if (thisValue == null || !thisValue.equals(thatValue)) { - return false; - } - } - } - return true; - } - - @Override - public String toString() { - return "HConnectionKey{" + - "properties=" + properties + - ", username='" + username + '\'' + - '}'; - } - } - - public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping, - TableDesc tableDesc, ScanNode scanNode) throws IOException { - List<IndexPredication> indexPredications = new ArrayList<IndexPredication>(); - Column[] indexableColumns = getIndexableColumns(tableDesc); - if (indexableColumns != null && indexableColumns.length == 1) { - // Currently supports only single index column. - List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns); - for (Set<EvalNode> eachEvalSet: indexablePredicateList) { - Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); - if (indexPredicationValues != null) { - IndexPredication indexPredication = new IndexPredication(); - indexPredication.setColumn(indexableColumns[0]); - indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName())); - indexPredication.setStartValue(indexPredicationValues.getFirst()); - indexPredication.setStopValue(indexPredicationValues.getSecond()); - - indexPredications.add(indexPredication); - } - } - } - return indexPredications; - } - - public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException { - List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); - - // if a query statement has a search condition, try to find indexable predicates - if (indexableColumns != null && scanNode.getQual() != null) { - EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual()); - - // add qualifier to schema for qual - for (Column column : indexableColumns) { - for (EvalNode disjunctiveExpr : disjunctiveForms) { - EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr); - Set<EvalNode> indexablePredicateSet = Sets.newHashSet(); - for (EvalNode conjunctiveExpr : conjunctiveForms) { - if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) { - indexablePredicateSet.add(conjunctiveExpr); - } - } - if (!indexablePredicateSet.isEmpty()) { - indexablePredicateList.add(indexablePredicateSet); - } - } - } - } - - return indexablePredicateList; - } - - private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { - if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) { - Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode); - // if it contains only single variable matched to a target column - return variables.size() == 1 && variables.contains(targetColumn); - } else { - return false; - } - } - - /** - * - * @param evalNode The expression to be checked - * @return true if an conjunctive expression, consisting of indexable expressions - */ - private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) { - if (evalNode.getType() == EvalType.AND) { - BinaryEval orEval = (BinaryEval) evalNode; - boolean indexable = - checkIfIndexablePredicate(orEval.getLeftExpr()) && - checkIfIndexablePredicate(orEval.getRightExpr()); - - boolean sameVariable = - EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr()) - .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr())); - - return indexable && sameVariable; - } else { - return false; - } - } - - /** - * Check if an expression consists of one variable and one constant and - * the expression is a comparison operator. - * - * @param evalNode The expression to be checked - * @return true if an expression consists of one variable and one constant - * and the expression is a comparison operator. Other, false. - */ - private boolean checkIfIndexablePredicate(EvalNode evalNode) { - return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode); - } - - public static boolean isIndexableOperator(EvalNode expr) { - return expr.getType() == EvalType.EQUAL || - expr.getType() == EvalType.LEQ || - expr.getType() == EvalType.LTH || - expr.getType() == EvalType.GEQ || - expr.getType() == EvalType.GTH || - expr.getType() == EvalType.BETWEEN; - } - - public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping, - Set<EvalNode> evalNodes) { - Datum startDatum = null; - Datum endDatum = null; - for (EvalNode evalNode: evalNodes) { - if (evalNode instanceof BinaryEval) { - BinaryEval binaryEval = (BinaryEval) evalNode; - EvalNode left = binaryEval.getLeftExpr(); - EvalNode right = binaryEval.getRightExpr(); - - Datum constValue = null; - if (left.getType() == EvalType.CONST) { - constValue = ((ConstEval) left).getValue(); - } else if (right.getType() == EvalType.CONST) { - constValue = ((ConstEval) right).getValue(); - } - - if (constValue != null) { - if (evalNode.getType() == EvalType.EQUAL || - evalNode.getType() == EvalType.GEQ || - evalNode.getType() == EvalType.GTH) { - if (startDatum != null) { - if (constValue.compareTo(startDatum) > 0) { - startDatum = constValue; - } - } else { - startDatum = constValue; - } - } - - if (evalNode.getType() == EvalType.EQUAL || - evalNode.getType() == EvalType.LEQ || - evalNode.getType() == EvalType.LTH) { - if (endDatum != null) { - if (constValue.compareTo(endDatum) < 0) { - endDatum = constValue; - } - } else { - endDatum = constValue; - } - } - } - } else if (evalNode instanceof BetweenPredicateEval) { - BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode; - if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) { - Datum value = ((ConstEval) betweenEval.getBegin()).getValue(); - if (startDatum != null) { - if (value.compareTo(startDatum) > 0) { - startDatum = value; - } - } else { - startDatum = value; - } - - value = ((ConstEval) betweenEval.getEnd()).getValue(); - if (endDatum != null) { - if (value.compareTo(endDatum) < 0) { - endDatum = value; - } - } else { - endDatum = value; - } - } - } - } - - if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) { - endDatum = new TextDatum(endDatum.asChars() + - new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE})); - } - if (startDatum != null || endDatum != null) { - return new Pair<Datum, Datum>(startDatum, endDatum); - } else { - return null; - } - } - - @Override - public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { - if (tableDesc == null) { - throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); - } - Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - - Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); - hbaseConf.set("hbase.loadincremental.threads.max", "2"); - - JobContextImpl jobContext = new JobContextImpl(hbaseConf, - new JobID(finalEbId.getQueryId().toString(), finalEbId.getId())); - - FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext); - Path jobAttemptPath = committer.getJobAttemptPath(jobContext); - FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf()); - if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) { - LOG.warn("No query attempt file in " + jobAttemptPath); - return stagingResultDir; - } - committer.commitJob(jobContext); - - if (tableDesc.getName() == null && tableDesc.getPath() != null) { - - // insert into location - return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false); - } else { - // insert into table - String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); - - HTable htable = new HTable(hbaseConf, tableName); - try { - LoadIncrementalHFiles loadIncrementalHFiles = null; - try { - loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e.getMessage(), e); - } - loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); - - return stagingResultDir; - } finally { - htable.close(); - } - } - } - - @Override - public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, - Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) - throws IOException { - try { - int[] sortKeyIndexes = new int[sortSpecs.length]; - for (int i = 0; i < sortSpecs.length; i++) { - sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName()); - } - - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); - - HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName()); - try { - byte[][] endKeys = htable.getEndKeys(); - if (endKeys.length == 1) { - return new TupleRange[]{dataRange}; - } - List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length); - - TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs); - Tuple previousTuple = dataRange.getStart(); - - for (byte[] eachEndKey : endKeys) { - Tuple endTuple = new VTuple(sortSpecs.length); - byte[][] rowKeyFields; - if (sortSpecs.length > 1) { - byte[][] splitValues = BytesUtils.splitPreserveAllTokens(eachEndKey, columnMapping.getRowKeyDelimiter()); - if (splitValues.length == sortSpecs.length) { - rowKeyFields = splitValues; - } else { - rowKeyFields = new byte[sortSpecs.length][]; - for (int j = 0; j < sortSpecs.length; j++) { - if (j < splitValues.length) { - rowKeyFields[j] = splitValues[j]; - } else { - rowKeyFields[j] = null; - } - } - } - - } else { - rowKeyFields = new byte[1][]; - rowKeyFields[0] = eachEndKey; - } - - for (int i = 0; i < sortSpecs.length; i++) { - if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) { - endTuple.put(i, - HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), - rowKeyFields[i])); - } else { - endTuple.put(i, - HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), - rowKeyFields[i])); - } - } - tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple)); - previousTuple = endTuple; - } - - // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value. - if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) { - tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd()); - } else { - tupleRanges.remove(tupleRanges.size() - 1); - } - return tupleRanges.toArray(new TupleRange[]{}); - } finally { - htable.close(); - } - } catch (Throwable t) { - LOG.error(t.getMessage(), t); - throw new IOException(t.getMessage(), t); - } - } - - public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { - if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { - List<RewriteRule> rules = new ArrayList<RewriteRule>(); - rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); - return rules; - } else { - return null; - } - } - - private Column[] getIndexColumns(TableDesc tableDesc) throws IOException { - List<Column> indexColumns = new ArrayList<Column>(); - - ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); - - boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); - for (int i = 0; i < isRowKeys.length; i++) { - if (isRowKeys[i]) { - indexColumns.add(tableDesc.getSchema().getColumn(i)); - } - } - - return indexColumns.toArray(new Column[]{}); - } - - @Override - public StorageProperty getStorageProperty() { - StorageProperty storageProperty = new StorageProperty(); - storageProperty.setSortedInsert(true); - storageProperty.setSupportsInsertInto(true); - return storageProperty; - } - - public void beforeInsertOrCATS(LogicalNode node) throws IOException { - if (node.getType() == NodeType.CREATE_TABLE) { - CreateTableNode cNode = (CreateTableNode)node; - if (!cNode.isExternal()) { - TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); - createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists()); - } - } - } - - @Override - public void rollbackOutputCommit(LogicalNode node) throws IOException { - if (node.getType() == NodeType.CREATE_TABLE) { - CreateTableNode cNode = (CreateTableNode)node; - if (cNode.isExternal()) { - return; - } - TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); - HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta)); - - try { - HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema()); - LOG.info("Delete table cause query failed:" + hTableDesc.getName()); - hAdmin.disableTable(hTableDesc.getName()); - hAdmin.deleteTable(hTableDesc.getName()); - } finally { - hAdmin.close(); - } - } - } - - @Override - public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { - if (tableDesc != null) { - Schema tableSchema = tableDesc.getSchema(); - if (tableSchema.size() != outSchema.size()) { - throw new IOException("The number of table columns is different from SELECT columns"); - } - - for (int i = 0; i < tableSchema.size(); i++) { - if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) { - throw new IOException(outSchema.getColumn(i).getQualifiedName() + - "(" + outSchema.getColumn(i).getDataType().getType() + ")" + - " is different column type with " + tableSchema.getColumn(i).getSimpleName() + - "(" + tableSchema.getColumn(i).getDataType().getType() + ")"); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java deleted file mode 100644 index a0ad492..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.util.NumberUtil; - -import java.io.IOException; - -public class HBaseTextSerializerDeserializer { - public static Datum deserialize(Column col, byte[] bytes) throws IOException { - Datum datum; - switch (col.getDataType().getType()) { - case INT1: - case INT2: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : - DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length)); - break; - case INT4: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : - DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length)); - break; - case INT8: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : - DatumFactory.createInt8(new String(bytes, 0, bytes.length)); - break; - case FLOAT4: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : - DatumFactory.createFloat4(new String(bytes, 0, bytes.length)); - break; - case FLOAT8: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : - DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length)); - break; - case TEXT: - datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); - break; - default: - datum = NullDatum.get(); - break; - } - return datum; - } - - public static byte[] serialize(Column col, Datum datum) throws IOException { - if (datum == null || datum instanceof NullDatum) { - return null; - } - - return datum.asChars().getBytes(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java deleted file mode 100644 index b9425f9..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java +++ /dev/null @@ -1,167 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.util.Bytes; - -import java.io.IOException; -import java.util.TreeSet; - -public class HFileAppender extends AbstractHBaseAppender { - private static final Log LOG = LogFactory.getLog(HFileAppender.class); - - private RecordWriter<ImmutableBytesWritable, Cell> writer; - private TaskAttemptContext writerContext; - private Path workingFilePath; - private FileOutputCommitter committer; - - public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, - Schema schema, TableMeta meta, Path stagingDir) { - super(conf, taskAttemptId, schema, meta, stagingDir); - } - - @Override - public void init() throws IOException { - super.init(); - - Configuration taskConf = new Configuration(); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString()); - - ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId(); - writerContext = new TaskAttemptContextImpl(taskConf, - new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP, - taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId())); - - HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2(); - try { - writer = hFileOutputFormat2.getRecordWriter(writerContext); - - committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext); - workingFilePath = committer.getWorkPath(); - } catch (InterruptedException e) { - throw new IOException(e.getMessage(), e); - } - - LOG.info("Created hbase file writer: " + workingFilePath); - } - - long totalNumBytes = 0; - ImmutableBytesWritable keyWritable = new ImmutableBytesWritable(); - boolean first = true; - TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR); - - - @Override - public void addTuple(Tuple tuple) throws IOException { - Datum datum; - - byte[] rowkey = getRowKeyBytes(tuple); - - if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) { - try { - for (KeyValue kv : kvSet) { - writer.write(keyWritable, kv); - totalNumBytes += keyWritable.getLength() + kv.getLength(); - } - kvSet.clear(); - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } - } - - first = false; - - keyWritable.set(rowkey); - - readKeyValues(tuple, rowkey); - if (keyValues != null) { - for (KeyValue eachKeyVal: keyValues) { - kvSet.add(eachKeyVal); - } - } - } - - @Override - public void flush() throws IOException { - } - - @Override - public long getEstimatedOutputSize() throws IOException { - // StoreTableExec uses this value as rolling file length - // Not rolling - return 0; - } - - @Override - public void close() throws IOException { - if (!kvSet.isEmpty()) { - try { - for (KeyValue kv : kvSet) { - writer.write(keyWritable, kv); - totalNumBytes += keyWritable.getLength() + keyWritable.getLength(); - } - kvSet.clear(); - // Statistical section - if (enabledStats) { - stats.incrementRow(); - } - } catch (InterruptedException e) { - LOG.error(e.getMessage(), e); - } - } - - if (enabledStats) { - stats.setNumBytes(totalNumBytes); - } - if (writer != null) { - try { - writer.close(writerContext); - committer.commitTask(writerContext); - } catch (InterruptedException e) { - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java deleted file mode 100644 index 3a58e50..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; - -public class IndexPredication { - private Column column; - private int columnId; - private Datum startValue; - private Datum stopValue; - - public Column getColumn() { - return column; - } - - public void setColumn(Column column) { - this.column = column; - } - - public int getColumnId() { - return columnId; - } - - public void setColumnId(int columnId) { - this.columnId = columnId; - } - - public Datum getStartValue() { - return startValue; - } - - public void setStartValue(Datum startValue) { - this.startValue = startValue; - } - - public Datum getStopValue() { - return stopValue; - } - - public void setStopValue(Datum stopValue) { - this.stopValue = stopValue; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java deleted file mode 100644 index 4577703..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -public class RowKeyMapping { - private boolean isBinary; - private int keyFieldIndex; - - public boolean isBinary() { - return isBinary; - } - - public void setBinary(boolean isBinary) { - this.isBinary = isBinary; - } - - public int getKeyFieldIndex() { - return keyFieldIndex; - } - - public void setKeyFieldIndex(int keyFieldIndex) { - this.keyFieldIndex = keyFieldIndex; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java deleted file mode 100644 index ccba3be..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.TupleComparator; - -import java.io.IOException; - -public interface IndexMethod { - IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException; - IndexReader getIndexReader(final Path fileName, Schema keySchema, - TupleComparator comparator) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java deleted file mode 100644 index 7baf7aa..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public interface IndexReader { - - /** - * Find the offset corresponding to key which is equal to a given key. - * - * @param key - * @return - * @throws IOException - */ - public long find(Tuple key) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java deleted file mode 100644 index 04738f8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public abstract class IndexWriter { - - public abstract void write(Tuple key, long offset) throws IOException; - - public abstract void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java deleted file mode 100644 index 688bbc7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public interface OrderIndexReader extends IndexReader { - /** - * Find the offset corresponding to key which is equal to or greater than - * a given key. - * - * @param key to find - * @return - * @throws IOException - */ - public long find(Tuple key, boolean nextKey) throws IOException; - - /** - * Return the next offset from the latest find or next offset - * @return - * @throws IOException - */ - public long next() throws IOException; -}
