http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java new file mode 100644 index 0000000..7ddf09a --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.util.BytesUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ColumnMapping { + private TableMeta tableMeta; + private Schema schema; + private char rowKeyDelimiter; + + private String hbaseTableName; + + private int[] rowKeyFieldIndexes; + private boolean[] isRowKeyMappings; + private boolean[] isBinaryColumns; + private boolean[] isColumnKeys; + private boolean[] isColumnValues; + + // schema order -> 0: cf name, 1: column name -> name bytes + private byte[][][] mappingColumns; + + private int numRowKeys; + + public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException { + this.schema = schema; + this.tableMeta = tableMeta; + + init(); + } + + public void init() throws IOException { + hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY); + String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); + if (delim.length() > 0) { + rowKeyDelimiter = delim.charAt(0); + } + isRowKeyMappings = new boolean[schema.size()]; + rowKeyFieldIndexes = new int[schema.size()]; + isBinaryColumns = new boolean[schema.size()]; + isColumnKeys = new boolean[schema.size()]; + isColumnValues = new boolean[schema.size()]; + + mappingColumns = new byte[schema.size()][][]; + + for (int i = 0; i < schema.size(); i++) { + rowKeyFieldIndexes[i] = -1; + } + + String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (columnMapping == null || columnMapping.isEmpty()) { + throw new IOException("'columns' property is required."); + } + + String[] columnMappingTokens = columnMapping.split(","); + + if (columnMappingTokens.length != schema.getColumns().size()) { + throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns"); + } + + int index = 0; + for (String eachToken: columnMappingTokens) { + mappingColumns[index] = new byte[2][]; + + byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':'); + + if (mappingTokens.length == 3) { + if (mappingTokens[0].length == 0) { + // cfname + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + + "or '<cfname>:value:' or '<cfname>:value:#b'"); + } + //<cfname>:key: or <cfname>:value: + if (mappingTokens[2].length != 0) { + String binaryOption = new String(mappingTokens[2]); + if ("#b".equals(binaryOption)) { + isBinaryColumns[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + + "or '<cfname>:value:' or '<cfname>:value:#b'"); + } + } + mappingColumns[index][0] = mappingTokens[0]; + String keyOrValue = new String(mappingTokens[1]); + if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnKeys[index] = true; + } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { + isColumnValues[index] = true; + } else { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); + } + } else if (mappingTokens.length == 2) { + //<cfname>: or <cfname>:<qualifier> or :key + String cfName = new String(mappingTokens[0]); + String columnName = new String(mappingTokens[1]); + RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName); + if (rowKeyMapping != null) { + isRowKeyMappings[index] = true; + numRowKeys++; + isBinaryColumns[index] = rowKeyMapping.isBinary(); + if (!cfName.isEmpty()) { + if (rowKeyDelimiter == 0) { + throw new IOException("hbase.rowkey.delimiter is required."); + } + rowKeyFieldIndexes[index] = Integer.parseInt(cfName); + } else { + rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column. + } + } else { + if (cfName.isEmpty()) { + throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); + } + if (cfName != null) { + mappingColumns[index][0] = Bytes.toBytes(cfName); + } + + if (columnName != null && !columnName.isEmpty()) { + String[] columnNameTokens = columnName.split("#"); + if (columnNameTokens[0].isEmpty()) { + mappingColumns[index][1] = null; + } else { + mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]); + } + if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) { + isBinaryColumns[index] = true; + } + } + } + } else { + throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'"); + } + + index++; + } // for loop + } + + public List<String> getColumnFamilyNames() { + List<String> cfNames = new ArrayList<String>(); + + for (byte[][] eachCfName: mappingColumns) { + if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) { + String cfName = new String(eachCfName[0]); + if (!cfNames.contains(cfName)) { + cfNames.add(cfName); + } + } + } + + return cfNames; + } + + private RowKeyMapping getRowKeyMapping(String cfName, String columnName) { + if (columnName == null || columnName.isEmpty()) { + return null; + } + + String[] tokens = columnName.split("#"); + if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) { + return null; + } + + RowKeyMapping rowKeyMapping = new RowKeyMapping(); + + if (tokens.length == 2 && "b".equals(tokens[1])) { + rowKeyMapping.setBinary(true); + } + + if (cfName != null && !cfName.isEmpty()) { + rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName)); + } + return rowKeyMapping; + } + + public char getRowKeyDelimiter() { + return rowKeyDelimiter; + } + + public int[] getRowKeyFieldIndexes() { + return rowKeyFieldIndexes; + } + + public boolean[] getIsRowKeyMappings() { + return isRowKeyMappings; + } + + public byte[][][] getMappingColumns() { + return mappingColumns; + } + + public Schema getSchema() { + return schema; + } + + public boolean[] getIsBinaryColumns() { + return isBinaryColumns; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + + public boolean[] getIsColumnKeys() { + return isColumnKeys; + } + + public int getNumRowKeys() { + return numRowKeys; + } + + public boolean[] getIsColumnValues() { + return isColumnValues; + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java new file mode 100644 index 0000000..c05c5bb --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; + +public class HBaseBinarySerializerDeserializer { + + public static Datum deserialize(Column col, byte[] bytes) throws IOException { + Datum datum; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes)); + break; + case INT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes)); + break; + case INT8: + if (bytes.length == 4) { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes)); + } else { + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes)); + } + break; + case FLOAT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes)); + break; + case FLOAT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes)); + break; + case TEXT: + datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); + break; + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + public static byte[] serialize(Column col, Datum datum) throws IOException { + if (datum == null || datum instanceof NullDatum) { + return null; + } + + byte[] bytes; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + bytes = Bytes.toBytes(datum.asInt2()); + break; + case INT4: + bytes = Bytes.toBytes(datum.asInt4()); + break; + case INT8: + bytes = Bytes.toBytes(datum.asInt8()); + break; + case FLOAT4: + bytes = Bytes.toBytes(datum.asFloat4()); + break; + case FLOAT8: + bytes = Bytes.toBytes(datum.asFloat8()); + break; + case TEXT: + bytes = Bytes.toBytes(datum.asChars()); + break; + default: + bytes = null; + break; + } + + return bytes; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java new file mode 100644 index 0000000..2674511 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java @@ -0,0 +1,198 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.hbase.StorageFragmentProtos.*; + +public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable { + @Expose + private String tableName; + @Expose + private String hbaseTableName; + @Expose + private byte[] startRow; + @Expose + private byte[] stopRow; + @Expose + private String regionLocation; + @Expose + private boolean last; + @Expose + private long length; + + public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) { + this.tableName = tableName; + this.hbaseTableName = hbaseTableName; + this.startRow = startRow; + this.stopRow = stopRow; + this.regionLocation = regionLocation; + this.last = false; + } + + public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException { + HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); + builder.mergeFrom(raw); + builder.build(); + init(builder.build()); + } + + private void init(HBaseFragmentProto proto) { + this.tableName = proto.getTableName(); + this.hbaseTableName = proto.getHbaseTableName(); + this.startRow = proto.getStartRow().toByteArray(); + this.stopRow = proto.getStopRow().toByteArray(); + this.regionLocation = proto.getRegionLocation(); + this.length = proto.getLength(); + this.last = proto.getLast(); + } + + @Override + public int compareTo(HBaseFragment t) { + return Bytes.compareTo(startRow, t.startRow); + } + + @Override + public String getTableName() { + return tableName; + } + + @Override + public String getKey() { + return new String(startRow); + } + + @Override + public boolean isEmpty() { + return startRow == null || stopRow == null; + } + + @Override + public long getLength() { + return length; + } + + public void setLength(long length) { + this.length = length; + } + + @Override + public String[] getHosts() { + return new String[] {regionLocation}; + } + + public Object clone() throws CloneNotSupportedException { + HBaseFragment frag = (HBaseFragment) super.clone(); + frag.tableName = tableName; + frag.hbaseTableName = hbaseTableName; + frag.startRow = startRow; + frag.stopRow = stopRow; + frag.regionLocation = regionLocation; + frag.last = last; + frag.length = length; + return frag; + } + + @Override + public boolean equals(Object o) { + if (o instanceof HBaseFragment) { + HBaseFragment t = (HBaseFragment) o; + if (tableName.equals(t.tableName) + && Bytes.equals(startRow, t.startRow) + && Bytes.equals(stopRow, t.stopRow)) { + return true; + } + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow); + } + + @Override + public String toString() { + return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + + ", \"startRow\": \"" + new String(startRow) + "\"" + + ", \"stopRow\": \"" + new String(stopRow) + "\"" + + ", \"length\": \"" + length + "\"}" ; + } + + @Override + public FragmentProto getProto() { + HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); + builder.setTableName(tableName) + .setHbaseTableName(hbaseTableName) + .setStartRow(ByteString.copyFrom(startRow)) + .setStopRow(ByteString.copyFrom(stopRow)) + .setLast(last) + .setLength(length) + .setRegionLocation(regionLocation); + + FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); + fragmentBuilder.setId(this.tableName); + fragmentBuilder.setContents(builder.buildPartial().toByteString()); + fragmentBuilder.setStoreType(StoreType.HBASE.name()); + return fragmentBuilder.build(); + } + + public byte[] getStartRow() { + return startRow; + } + + public byte[] getStopRow() { + return stopRow; + } + + public String getRegionLocation() { + return regionLocation; + } + + public boolean isLast() { + return last; + } + + public void setLast(boolean last) { + this.last = last; + } + + public String getHbaseTableName() { + return hbaseTableName; + } + + public void setHbaseTableName(String hbaseTableName) { + this.hbaseTableName = hbaseTableName; + } + + public void setStartRow(byte[] startRow) { + this.startRow = startRow; + } + + public void setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java new file mode 100644 index 0000000..50f61a8 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; + +public class HBasePutAppender extends AbstractHBaseAppender { + private HTableInterface htable; + private long totalNumBytes; + + public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + super(conf, taskAttemptId, schema, meta, stagingDir); + } + + @Override + public void init() throws IOException { + super.init(); + + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); + HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE)) + .getConnection(hbaseConf); + htable = hconn.getTable(columnMapping.getHbaseTableName()); + htable.setAutoFlushTo(false); + htable.setWriteBufferSize(5 * 1024 * 1024); + } + + @Override + public void addTuple(Tuple tuple) throws IOException { + byte[] rowkey = getRowKeyBytes(tuple); + totalNumBytes += rowkey.length; + Put put = new Put(rowkey); + readKeyValues(tuple, rowkey); + + for (int i = 0; i < columnNum; i++) { + if (isRowKeyMappings[i]) { + continue; + } + Datum datum = tuple.get(i); + byte[] value; + if (isBinaryColumns[i]) { + value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); + } else { + value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); + } + + if (isColumnKeys[i]) { + columnKeyDatas[columnKeyValueDataIndexes[i]] = value; + } else if (isColumnValues[i]) { + columnValueDatas[columnKeyValueDataIndexes[i]] = value; + } else { + put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); + totalNumBytes += value.length; + } + } + + for (int i = 0; i < columnKeyDatas.length; i++) { + put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); + totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length; + } + + htable.put(put); + + if (enabledStats) { + stats.incrementRow(); + stats.setNumBytes(totalNumBytes); + } + } + + @Override + public void flush() throws IOException { + htable.flushCommits(); + } + + @Override + public long getEstimatedOutputSize() throws IOException { + return 0; + } + + @Override + public void close() throws IOException { + if (htable != null) { + htable.flushCommits(); + htable.close(); + } + if (enabledStats) { + stats.setNumBytes(totalNumBytes); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java new file mode 100644 index 0000000..5cae077 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java @@ -0,0 +1,449 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.filter.InclusiveStopFilter; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.storage.Scanner; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.BytesUtils; + +import java.io.IOException; +import java.util.Collection; +import java.util.NavigableMap; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HBaseScanner implements Scanner { + private static final Log LOG = LogFactory.getLog(HBaseScanner.class); + private static final int DEFAULT_FETCH_SIZE = 1000; + private static final int MAX_LIST_SIZE = 100; + + protected boolean inited = false; + private TajoConf conf; + private Schema schema; + private TableMeta meta; + private HBaseFragment fragment; + private Scan scan; + private HTableInterface htable; + private Configuration hbaseConf; + private Column[] targets; + private TableStats tableStats; + private ResultScanner scanner; + private AtomicBoolean finished = new AtomicBoolean(false); + private float progress = 0.0f; + private int scanFetchSize; + private Result[] scanResults; + private int scanResultIndex = -1; + private Column[] schemaColumns; + + private ColumnMapping columnMapping; + private int[] targetIndexes; + + private int numRows = 0; + + private byte[][][] mappingColumnFamilies; + private boolean[] isRowKeyMappings; + private boolean[] isBinaryColumns; + private boolean[] isColumnKeys; + private boolean[] isColumnValues; + + private int[] rowKeyFieldIndexes; + private char rowKeyDelimiter; + + public HBaseScanner (Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { + this.conf = (TajoConf)conf; + this.schema = schema; + this.meta = meta; + this.fragment = (HBaseFragment)fragment; + this.tableStats = new TableStats(); + } + + @Override + public void init() throws IOException { + inited = true; + schemaColumns = schema.toArray(); + if (fragment != null) { + tableStats.setNumBytes(0); + tableStats.setNumBlocks(1); + } + if (schema != null) { + for(Column eachColumn: schema.getColumns()) { + ColumnStats columnStats = new ColumnStats(eachColumn); + tableStats.addColumnStat(columnStats); + } + } + + scanFetchSize = Integer.parseInt( + meta.getOption(HBaseStorageConstants.META_FETCH_ROWNUM_KEY, "" + DEFAULT_FETCH_SIZE)); + if (targets == null) { + targets = schema.toArray(); + } + + columnMapping = new ColumnMapping(schema, meta); + targetIndexes = new int[targets.length]; + int index = 0; + for (Column eachTargetColumn: targets) { + targetIndexes[index++] = schema.getColumnId(eachTargetColumn.getQualifiedName()); + } + + mappingColumnFamilies = columnMapping.getMappingColumns(); + isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + isBinaryColumns = columnMapping.getIsBinaryColumns(); + isColumnKeys = columnMapping.getIsColumnKeys(); + isColumnValues = columnMapping.getIsColumnValues(); + + rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); + rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); + + hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); + + initScanner(); + } + + private void initScanner() throws IOException { + scan = new Scan(); + scan.setBatch(scanFetchSize); + scan.setCacheBlocks(false); + scan.setCaching(scanFetchSize); + + FilterList filters = null; + if (targetIndexes == null || targetIndexes.length == 0) { + filters = new FilterList(FilterList.Operator.MUST_PASS_ALL); + filters.addFilter(new FirstKeyOnlyFilter()); + filters.addFilter(new KeyOnlyFilter()); + } else { + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + for (int eachIndex : targetIndexes) { + if (isRowKeyMappings[eachIndex]) { + continue; + } + byte[][] mappingColumn = columnMapping.getMappingColumns()[eachIndex]; + if (mappingColumn[1] == null) { + scan.addFamily(mappingColumn[0]); + } else { + scan.addColumn(mappingColumn[0], mappingColumn[1]); + } + } + } + + scan.setStartRow(fragment.getStartRow()); + if (fragment.isLast() && fragment.getStopRow() != null && + fragment.getStopRow().length > 0) { + // last and stopRow is not empty + if (filters == null) { + filters = new FilterList(); + } + filters.addFilter(new InclusiveStopFilter(fragment.getStopRow())); + } else { + scan.setStopRow(fragment.getStopRow()); + } + + if (filters != null) { + scan.setFilter(filters); + } + + if (htable == null) { + HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE)) + .getConnection(hbaseConf); + htable = hconn.getTable(fragment.getHbaseTableName()); + } + scanner = htable.getScanner(scan); + } + + @Override + public Tuple next() throws IOException { + if (finished.get()) { + return null; + } + + if (scanResults == null || scanResultIndex >= scanResults.length) { + scanResults = scanner.next(scanFetchSize); + if (scanResults == null || scanResults.length == 0) { + finished.set(true); + progress = 1.0f; + return null; + } + scanResultIndex = 0; + } + + Result result = scanResults[scanResultIndex++]; + Tuple resultTuple = new VTuple(schema.size()); + for (int i = 0; i < targetIndexes.length; i++) { + resultTuple.put(targetIndexes[i], getDatum(result, targetIndexes[i])); + } + numRows++; + return resultTuple; + } + + private Datum getDatum(Result result, int fieldId) throws IOException { + byte[] value = null; + if (isRowKeyMappings[fieldId]) { + value = result.getRow(); + if (!isBinaryColumns[fieldId] && rowKeyFieldIndexes[fieldId] >= 0) { + int rowKeyFieldIndex = rowKeyFieldIndexes[fieldId]; + + byte[][] rowKeyFields = BytesUtils.splitPreserveAllTokens(value, rowKeyDelimiter); + + if (rowKeyFields.length < rowKeyFieldIndex) { + return NullDatum.get(); + } else { + value = rowKeyFields[rowKeyFieldIndex]; + } + } + } else { + if (isColumnKeys[fieldId]) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null) { + Set<byte[]> keySet = cfMap.keySet(); + if (keySet.size() == 1) { + try { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], keySet.iterator().next()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append("["); + int count = 0; + for (byte[] eachKey : keySet) { + if (count > 0) { + sb.append(", "); + } + Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachKey); + sb.append("\"").append(datum.asChars()).append("\""); + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } + sb.append("]"); + return new TextDatum(sb.toString()); + } + } + } else if (isColumnValues[fieldId]) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null) { + Collection<byte[]> valueList = cfMap.values(); + if (valueList.size() == 1) { + try { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], valueList.iterator().next()); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append("["); + int count = 0; + for (byte[] eachValue : valueList) { + if (count > 0) { + sb.append(", "); + } + Datum datum = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], eachValue); + sb.append("\"").append(datum.asChars()).append("\""); + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } + sb.append("]"); + return new TextDatum(sb.toString()); + } + } + } else { + if (mappingColumnFamilies[fieldId][1] == null) { + NavigableMap<byte[], byte[]> cfMap = result.getFamilyMap(mappingColumnFamilies[fieldId][0]); + if (cfMap != null && !cfMap.isEmpty()) { + int count = 0; + String delim = ""; + + if (cfMap.size() == 0) { + return NullDatum.get(); + } else if (cfMap.size() == 1) { + // If a column family is mapped without column name like "cf1:" and the number of cells is one, + // return value is flat format not json format. + NavigableMap.Entry<byte[], byte[]> entry = cfMap.entrySet().iterator().next(); + byte[] entryKey = entry.getKey(); + byte[] entryValue = entry.getValue(); + if (entryKey == null || entryKey.length == 0) { + try { + if (isBinaryColumns[fieldId]) { + return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); + } else { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + StringBuilder sb = new StringBuilder(); + sb.append("{"); + for (NavigableMap.Entry<byte[], byte[]> entry : cfMap.entrySet()) { + byte[] entryKey = entry.getKey(); + byte[] entryValue = entry.getValue(); + + String keyText = new String(entryKey); + String valueText = null; + if (entryValue != null) { + try { + if (isBinaryColumns[fieldId]) { + valueText = HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); + } else { + valueText = HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], entryValue).asChars(); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + sb.append(delim).append("\"").append(keyText).append("\":\"").append(valueText).append("\""); + delim = ", "; + count++; + if (count > MAX_LIST_SIZE) { + break; + } + } //end of for + sb.append("}"); + return new TextDatum(sb.toString()); + } else { + value = null; + } + } else { + value = result.getValue(mappingColumnFamilies[fieldId][0], mappingColumnFamilies[fieldId][1]); + } + } + } + + if (value == null) { + return NullDatum.get(); + } else { + try { + if (isBinaryColumns[fieldId]) { + return HBaseBinarySerializerDeserializer.deserialize(schemaColumns[fieldId], value); + } else { + return HBaseTextSerializerDeserializer.deserialize(schemaColumns[fieldId], value); + } + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + + @Override + public void reset() throws IOException { + progress = 0.0f; + scanResultIndex = -1; + scanResults = null; + finished.set(false); + tableStats = new TableStats(); + + if (scanner != null) { + scanner.close(); + scanner = null; + } + + initScanner(); + } + + @Override + public void close() throws IOException { + progress = 1.0f; + finished.set(true); + if (scanner != null) { + try { + scanner.close(); + scanner = null; + } catch (Exception e) { + LOG.warn("Error while closing hbase scanner: " + e.getMessage(), e); + } + } + if (htable != null) { + htable.close(); + htable = null; + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public void setTarget(Column[] targets) { + if (inited) { + throw new IllegalStateException("Should be called before init()"); + } + this.targets = targets; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + // TODO implements adding column filter to scanner. + } + + @Override + public boolean isSplittable() { + return true; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public TableStats getInputStats() { + tableStats.setNumRows(numRows); + return tableStats; + } + + @Override + public Schema getSchema() { + return schema; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java new file mode 100644 index 0000000..2c525a1 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +public interface HBaseStorageConstants { + public static final String KEY_COLUMN_MAPPING = "key"; + public static final String VALUE_COLUMN_MAPPING = "value"; + public static final String META_FETCH_ROWNUM_KEY = "fetch.rownum"; + public static final String META_TABLE_KEY = "table"; + public static final String META_COLUMNS_KEY = "columns"; + public static final String META_SPLIT_ROW_KEYS_KEY = "hbase.split.rowkeys"; + public static final String META_SPLIT_ROW_KEYS_FILE_KEY = "hbase.split.rowkeys.file"; + public static final String META_ZK_QUORUM_KEY = "hbase.zookeeper.quorum"; + public static final String META_ROWKEY_DELIMITER = "hbase.rowkey.delimiter"; + + public static final String INSERT_PUT_MODE = "tajo.hbase.insert.put.mode"; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java new file mode 100644 index 0000000..a6e7a81 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java @@ -0,0 +1,1135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.CreateTableNode; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.RewriteRule; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.util.Bytes; +import org.apache.tajo.util.BytesUtils; +import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.*; + +/** + * StorageManager for HBase table. + */ +public class HBaseStorageManager extends StorageManager { + private static final Log LOG = LogFactory.getLog(HBaseStorageManager.class); + + private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>(); + + public HBaseStorageManager (StoreType storeType) { + super(storeType); + } + + @Override + public void storageInit() throws IOException { + } + + @Override + public void closeStorageManager() { + synchronized (connMap) { + for (HConnection eachConn: connMap.values()) { + try { + eachConn.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + + @Override + public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { + createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists); + TableStats stats = new TableStats(); + stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); + tableDesc.setStats(stats); + } + + private void createTable(TableMeta tableMeta, Schema schema, + boolean isExternal, boolean ifNotExists) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + String mappedColumns = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); + if (mappedColumns != null && mappedColumns.split(",").length > schema.size()) { + throw new IOException("Columns property has more entry than Tajo table columns"); + } + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + int numRowKeys = 0; + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + numRowKeys++; + } + } + if (numRowKeys > 1) { + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Key field type should be TEXT type."); + } + } + } + + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (columnMapping.getIsColumnKeys()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column key field('<cfname>:key:') type should be TEXT type."); + } + if (columnMapping.getIsColumnValues()[i] && schema.getColumn(i).getDataType().getType() != Type.TEXT) { + throw new IOException("Column value field(('<cfname>:value:') type should be TEXT type."); + } + } + + Configuration hConf = getHBaseConfiguration(conf, tableMeta); + HBaseAdmin hAdmin = new HBaseAdmin(hConf); + + try { + if (isExternal) { + // If tajo table is external table, only check validation. + if (mappedColumns == null || mappedColumns.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + if (!hAdmin.tableExists(hTableName)) { + throw new IOException("HBase table [" + hbaseTableName + "] not exists. " + + "External table should be a existed table."); + } + HTableDescriptor hTableDescriptor = hAdmin.getTableDescriptor(hTableName); + Set<String> tableColumnFamilies = new HashSet<String>(); + for (HColumnDescriptor eachColumn : hTableDescriptor.getColumnFamilies()) { + tableColumnFamilies.add(eachColumn.getNameAsString()); + } + + Collection<String> mappingColumnFamilies =columnMapping.getColumnFamilyNames(); + if (mappingColumnFamilies.isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_COLUMNS_KEY + "' attribute."); + } + + for (String eachMappingColumnFamily : mappingColumnFamilies) { + if (!tableColumnFamilies.contains(eachMappingColumnFamily)) { + throw new IOException("There is no " + eachMappingColumnFamily + " column family in " + hbaseTableName); + } + } + } else { + if (hAdmin.tableExists(hbaseTableName)) { + if (ifNotExists) { + return; + } else { + throw new IOException("HBase table [" + hbaseTableName + "] already exists."); + } + } + // Creating hbase table + HTableDescriptor hTableDescriptor = parseHTableDescriptor(tableMeta, schema); + + byte[][] splitKeys = getSplitKeys(conf, schema, tableMeta); + if (splitKeys == null) { + hAdmin.createTable(hTableDescriptor); + } else { + hAdmin.createTable(hTableDescriptor, splitKeys); + } + } + } finally { + hAdmin.close(); + } + } + + /** + * Returns initial region split keys. + * + * @param conf + * @param schema + * @param meta + * @return + * @throws java.io.IOException + */ + private byte[][] getSplitKeys(TajoConf conf, Schema schema, TableMeta meta) throws IOException { + String splitRowKeys = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_KEY, ""); + String splitRowKeysFile = meta.getOption(HBaseStorageConstants.META_SPLIT_ROW_KEYS_FILE_KEY, ""); + + if ((splitRowKeys == null || splitRowKeys.isEmpty()) && + (splitRowKeysFile == null || splitRowKeysFile.isEmpty())) { + return null; + } + + ColumnMapping columnMapping = new ColumnMapping(schema, meta); + boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns(); + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + + boolean rowkeyBinary = false; + int numRowKeys = 0; + Column rowKeyColumn = null; + for (int i = 0; i < isBinaryColumns.length; i++) { + if (isBinaryColumns[i] && isRowKeys[i]) { + rowkeyBinary = true; + } + if (isRowKeys[i]) { + numRowKeys++; + rowKeyColumn = schema.getColumn(i); + } + } + + if (rowkeyBinary && numRowKeys > 1) { + throw new IOException("If rowkey is mapped to multi column and a rowkey is binary, " + + "Multiple region for creation is not support."); + } + + if (splitRowKeys != null && !splitRowKeys.isEmpty()) { + String[] splitKeyTokens = splitRowKeys.split(","); + byte[][] splitKeys = new byte[splitKeyTokens.length][]; + for (int i = 0; i < splitKeyTokens.length; i++) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[i] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } else { + splitKeys[i] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(splitKeyTokens[i])); + } + } + return splitKeys; + } + + if (splitRowKeysFile != null && !splitRowKeysFile.isEmpty()) { + // If there is many split keys, Tajo allows to define in the file. + Path path = new Path(splitRowKeysFile); + FileSystem fs = path.getFileSystem(conf); + if (!fs.exists(path)) { + throw new IOException("hbase.split.rowkeys.file=" + path.toString() + " not exists."); + } + + SortedSet<String> splitKeySet = new TreeSet<String>(); + BufferedReader reader = null; + try { + reader = new BufferedReader(new InputStreamReader(fs.open(path))); + String line = null; + while ( (line = reader.readLine()) != null ) { + if (line.isEmpty()) { + continue; + } + splitKeySet.add(line); + } + } finally { + if (reader != null) { + reader.close(); + } + } + + if (splitKeySet.isEmpty()) { + return null; + } + + byte[][] splitKeys = new byte[splitKeySet.size()][]; + int index = 0; + for (String eachKey: splitKeySet) { + if (numRowKeys == 1 && rowkeyBinary) { + splitKeys[index++] = HBaseBinarySerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } else { + splitKeys[index++] = HBaseTextSerializerDeserializer.serialize(rowKeyColumn, new TextDatum(eachKey)); + } + } + + return splitKeys; + } + + return null; + } + + /** + * Creates Configuration instance and sets with hbase connection options. + * + * @param conf + * @param tableMeta + * @return + * @throws java.io.IOException + */ + public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException { + String zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, ""); + if (zkQuorum == null || zkQuorum.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute."); + } + + Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf); + hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum); + + for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) { + String key = eachOption.getKey(); + if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) { + hbaseConf.set(key, eachOption.getValue()); + } + } + return hbaseConf; + } + + /** + * Creates HTableDescription using table meta data. + * + * @param tableMeta + * @param schema + * @return + * @throws java.io.IOException + */ + public static HTableDescriptor parseHTableDescriptor(TableMeta tableMeta, Schema schema) throws IOException { + String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, ""); + if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) { + throw new IOException("HBase mapped table is required a '" + + HBaseStorageConstants.META_TABLE_KEY + "' attribute."); + } + TableName hTableName = TableName.valueOf(hbaseTableName); + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + + HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName); + + Collection<String> columnFamilies = columnMapping.getColumnFamilyNames(); + //If 'columns' attribute is empty, Tajo table columns are mapped to all HBase table column. + if (columnFamilies.isEmpty()) { + for (Column eachColumn: schema.getColumns()) { + columnFamilies.add(eachColumn.getSimpleName()); + } + } + + for (String eachColumnFamily: columnFamilies) { + hTableDescriptor.addFamily(new HColumnDescriptor(eachColumnFamily)); + } + + return hTableDescriptor; + } + + @Override + public void purgeTable(TableDesc tableDesc) throws IOException { + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta())); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema()); + LOG.info("Deleting hbase table: " + new String(hTableDesc.getName())); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + + /** + * Returns columns which are mapped to the rowkey of the hbase table. + * + * @param tableDesc + * @return + * @throws java.io.IOException + */ + private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings(); + int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes(); + + Column indexColumn = null; + for (int i = 0; i < isRowKeyMappings.length; i++) { + if (isRowKeyMappings[i]) { + if (columnMapping.getNumRowKeys() == 1 || + rowKeyIndexes[i] == 0) { + indexColumn = tableDesc.getSchema().getColumn(i); + } + } + } + return new Column[]{indexColumn}; + } + + @Override + public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException { + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode); + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + HRegionLocation regLoc = htable.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY, false); + if (null == regLoc) { + throw new IOException("Expecting at least one region."); + } + List<Fragment> fragments = new ArrayList<Fragment>(1); + Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname()); + fragments.add(fragment); + return fragments; + } + + List<byte[]> startRows; + List<byte[]> stopRows; + + if (indexPredications != null && !indexPredications.isEmpty()) { + // indexPredications is Disjunctive set + startRows = new ArrayList<byte[]>(); + stopRows = new ArrayList<byte[]>(); + for (IndexPredication indexPredication: indexPredications) { + byte[] startRow; + byte[] stopRow; + if (indexPredication.getStartValue() != null) { + startRow = serialize(columnMapping, indexPredication, indexPredication.getStartValue()); + } else { + startRow = HConstants.EMPTY_START_ROW; + } + if (indexPredication.getStopValue() != null) { + stopRow = serialize(columnMapping, indexPredication, indexPredication.getStopValue()); + } else { + stopRow = HConstants.EMPTY_END_ROW; + } + startRows.add(startRow); + stopRows.add(stopRow); + } + } else { + startRows = TUtil.newList(HConstants.EMPTY_START_ROW); + stopRows = TUtil.newList(HConstants.EMPTY_END_ROW); + } + + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + // region startkey -> HBaseFragment + Map<byte[], HBaseFragment> fragmentMap = new HashMap<byte[], HBaseFragment>(); + for (int i = 0; i < keys.getFirst().length; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + byte[] regionStartKey = keys.getFirst()[i]; + byte[] regionStopKey = keys.getSecond()[i]; + + int startRowsSize = startRows.size(); + for (int j = 0; j < startRowsSize; j++) { + byte[] startRow = startRows.get(j); + byte[] stopRow = stopRows.get(j); + // determine if the given start an stop key fall into the region + if ((startRow.length == 0 || regionStopKey.length == 0 || Bytes.compareTo(startRow, regionStopKey) < 0) + && (stopRow.length == 0 || Bytes.compareTo(stopRow, regionStartKey) > 0)) { + byte[] fragmentStart = (startRow.length == 0 || Bytes.compareTo(regionStartKey, startRow) >= 0) ? + regionStartKey : startRow; + + byte[] fragmentStop = (stopRow.length == 0 || Bytes.compareTo(regionStopKey, stopRow) <= 0) && + regionStopKey.length > 0 ? regionStopKey : stopRow; + + String regionName = location.getRegionInfo().getRegionNameAsString(); + + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + if (fragmentMap.containsKey(regionStartKey)) { + HBaseFragment prevFragment = fragmentMap.get(regionStartKey); + if (Bytes.compareTo(fragmentStart, prevFragment.getStartRow()) < 0) { + prevFragment.setStartRow(fragmentStart); + } + if (Bytes.compareTo(fragmentStop, prevFragment.getStopRow()) > 0) { + prevFragment.setStopRow(fragmentStop); + } + } else { + HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(), + fragmentStart, fragmentStop, location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeFileSize = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + fragment.setLength(storeFileSize); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragmentMap.put(regionStartKey, fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + } + } + } + + List<HBaseFragment> fragments = new ArrayList<HBaseFragment>(fragmentMap.values()); + Collections.sort(fragments); + if (!fragments.isEmpty()) { + fragments.get(fragments.size() - 1).setLast(true); + } + return (ArrayList<Fragment>) (ArrayList) fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + private byte[] serialize(ColumnMapping columnMapping, + IndexPredication indexPredication, Datum datum) throws IOException { + if (columnMapping.getIsBinaryColumns()[indexPredication.getColumnId()]) { + return HBaseBinarySerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } else { + return HBaseTextSerializerDeserializer.serialize(indexPredication.getColumn(), datum); + } + } + + @Override + public Appender getAppender(OverridableConf queryContext, + QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) + throws IOException { + if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir); + } else { + return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir); + } + } + + @Override + public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) + throws IOException { + Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta()); + HTable htable = null; + HBaseAdmin hAdmin = null; + try { + htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY)); + + org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys(); + if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { + return new ArrayList<Fragment>(1); + } + hAdmin = new HBaseAdmin(hconf); + Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>(); + + List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length); + + int start = currentPage * numFragments; + if (start >= keys.getFirst().length) { + return new ArrayList<Fragment>(1); + } + int end = (currentPage + 1) * numFragments; + if (end > keys.getFirst().length) { + end = keys.getFirst().length; + } + for (int i = start; i < end; i++) { + HRegionLocation location = htable.getRegionLocation(keys.getFirst()[i], false); + + String regionName = location.getRegionInfo().getRegionNameAsString(); + ServerLoad serverLoad = serverLoadMap.get(location.getServerName()); + if (serverLoad == null) { + serverLoad = hAdmin.getClusterStatus().getLoad(location.getServerName()); + serverLoadMap.put(location.getServerName(), serverLoad); + } + + HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(), + location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname()); + + // get region size + boolean foundLength = false; + for (Map.Entry<byte[], RegionLoad> entry : serverLoad.getRegionsLoad().entrySet()) { + if (regionName.equals(Bytes.toString(entry.getKey()))) { + RegionLoad regionLoad = entry.getValue(); + long storeLength = (regionLoad.getStorefileSizeMB() + regionLoad.getMemStoreSizeMB()) * 1024L * 1024L; + if (storeLength == 0) { + // If store size is smaller than 1 MB, storeLength is zero + storeLength = 1 * 1024 * 1024; //default 1MB + } + fragment.setLength(storeLength); + foundLength = true; + break; + } + } + + if (!foundLength) { + fragment.setLength(TajoConstants.UNKNOWN_LENGTH); + } + + fragments.add(fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments: fragment -> " + i + " -> " + fragment); + } + } + + if (!fragments.isEmpty()) { + ((HBaseFragment) fragments.get(fragments.size() - 1)).setLast(true); + } + return fragments; + } finally { + if (htable != null) { + htable.close(); + } + if (hAdmin != null) { + hAdmin.close(); + } + } + } + + public HConnection getConnection(Configuration hbaseConf) throws IOException { + synchronized(connMap) { + HConnectionKey key = new HConnectionKey(hbaseConf); + HConnection conn = connMap.get(key); + if (conn == null) { + conn = HConnectionManager.createConnection(hbaseConf); + connMap.put(key, conn); + } + + return conn; + } + } + + static class HConnectionKey { + final static String[] CONNECTION_PROPERTIES = new String[] { + HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.ZOOKEEPER_CLIENT_PORT, + HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME, + HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.HBASE_CLIENT_PREFETCH_LIMIT, + HConstants.HBASE_META_SCANNER_CACHING, + HConstants.HBASE_CLIENT_INSTANCE_ID, + HConstants.RPC_CODEC_CONF_KEY }; + + private Map<String, String> properties; + private String username; + + HConnectionKey(Configuration conf) { + Map<String, String> m = new HashMap<String, String>(); + if (conf != null) { + for (String property : CONNECTION_PROPERTIES) { + String value = conf.get(property); + if (value != null) { + m.put(property, value); + } + } + } + this.properties = Collections.unmodifiableMap(m); + + try { + UserProvider provider = UserProvider.instantiate(conf); + User currentUser = provider.getCurrent(); + if (currentUser != null) { + username = currentUser.getName(); + } + } catch (IOException ioe) { + LOG.warn("Error obtaining current user, skipping username in HConnectionKey", ioe); + } + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + if (username != null) { + result = username.hashCode(); + } + for (String property : CONNECTION_PROPERTIES) { + String value = properties.get(property); + if (value != null) { + result = prime * result + value.hashCode(); + } + } + + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + HConnectionKey that = (HConnectionKey) obj; + if (this.username != null && !this.username.equals(that.username)) { + return false; + } else if (this.username == null && that.username != null) { + return false; + } + if (this.properties == null) { + if (that.properties != null) { + return false; + } + } else { + if (that.properties == null) { + return false; + } + for (String property : CONNECTION_PROPERTIES) { + String thisValue = this.properties.get(property); + String thatValue = that.properties.get(property); + //noinspection StringEquality + if (thisValue == thatValue) { + continue; + } + if (thisValue == null || !thisValue.equals(thatValue)) { + return false; + } + } + } + return true; + } + + @Override + public String toString() { + return "HConnectionKey{" + + "properties=" + properties + + ", username='" + username + '\'' + + '}'; + } + } + + public List<IndexPredication> getIndexPredications(ColumnMapping columnMapping, + TableDesc tableDesc, ScanNode scanNode) throws IOException { + List<IndexPredication> indexPredications = new ArrayList<IndexPredication>(); + Column[] indexableColumns = getIndexableColumns(tableDesc); + if (indexableColumns != null && indexableColumns.length == 1) { + // Currently supports only single index column. + List<Set<EvalNode>> indexablePredicateList = findIndexablePredicateSet(scanNode, indexableColumns); + for (Set<EvalNode> eachEvalSet: indexablePredicateList) { + Pair<Datum, Datum> indexPredicationValues = getIndexablePredicateValue(columnMapping, eachEvalSet); + if (indexPredicationValues != null) { + IndexPredication indexPredication = new IndexPredication(); + indexPredication.setColumn(indexableColumns[0]); + indexPredication.setColumnId(tableDesc.getLogicalSchema().getColumnId(indexableColumns[0].getQualifiedName())); + indexPredication.setStartValue(indexPredicationValues.getFirst()); + indexPredication.setStopValue(indexPredicationValues.getSecond()); + + indexPredications.add(indexPredication); + } + } + } + return indexPredications; + } + + public List<Set<EvalNode>> findIndexablePredicateSet(ScanNode scanNode, Column[] indexableColumns) throws IOException { + List<Set<EvalNode>> indexablePredicateList = new ArrayList<Set<EvalNode>>(); + + // if a query statement has a search condition, try to find indexable predicates + if (indexableColumns != null && scanNode.getQual() != null) { + EvalNode[] disjunctiveForms = AlgebraicUtil.toDisjunctiveNormalFormArray(scanNode.getQual()); + + // add qualifier to schema for qual + for (Column column : indexableColumns) { + for (EvalNode disjunctiveExpr : disjunctiveForms) { + EvalNode[] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(disjunctiveExpr); + Set<EvalNode> indexablePredicateSet = Sets.newHashSet(); + for (EvalNode conjunctiveExpr : conjunctiveForms) { + if (checkIfIndexablePredicateOnTargetColumn(conjunctiveExpr, column)) { + indexablePredicateSet.add(conjunctiveExpr); + } + } + if (!indexablePredicateSet.isEmpty()) { + indexablePredicateList.add(indexablePredicateSet); + } + } + } + } + + return indexablePredicateList; + } + + private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) { + if (checkIfIndexablePredicate(evalNode) || checkIfConjunctiveButOneVariable(evalNode)) { + Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode); + // if it contains only single variable matched to a target column + return variables.size() == 1 && variables.contains(targetColumn); + } else { + return false; + } + } + + /** + * + * @param evalNode The expression to be checked + * @return true if an conjunctive expression, consisting of indexable expressions + */ + private boolean checkIfConjunctiveButOneVariable(EvalNode evalNode) { + if (evalNode.getType() == EvalType.AND) { + BinaryEval orEval = (BinaryEval) evalNode; + boolean indexable = + checkIfIndexablePredicate(orEval.getLeftExpr()) && + checkIfIndexablePredicate(orEval.getRightExpr()); + + boolean sameVariable = + EvalTreeUtil.findUniqueColumns(orEval.getLeftExpr()) + .equals(EvalTreeUtil.findUniqueColumns(orEval.getRightExpr())); + + return indexable && sameVariable; + } else { + return false; + } + } + + /** + * Check if an expression consists of one variable and one constant and + * the expression is a comparison operator. + * + * @param evalNode The expression to be checked + * @return true if an expression consists of one variable and one constant + * and the expression is a comparison operator. Other, false. + */ + private boolean checkIfIndexablePredicate(EvalNode evalNode) { + return AlgebraicUtil.containSingleVar(evalNode) && isIndexableOperator(evalNode); + } + + public static boolean isIndexableOperator(EvalNode expr) { + return expr.getType() == EvalType.EQUAL || + expr.getType() == EvalType.LEQ || + expr.getType() == EvalType.LTH || + expr.getType() == EvalType.GEQ || + expr.getType() == EvalType.GTH || + expr.getType() == EvalType.BETWEEN; + } + + public Pair<Datum, Datum> getIndexablePredicateValue(ColumnMapping columnMapping, + Set<EvalNode> evalNodes) { + Datum startDatum = null; + Datum endDatum = null; + for (EvalNode evalNode: evalNodes) { + if (evalNode instanceof BinaryEval) { + BinaryEval binaryEval = (BinaryEval) evalNode; + EvalNode left = binaryEval.getLeftExpr(); + EvalNode right = binaryEval.getRightExpr(); + + Datum constValue = null; + if (left.getType() == EvalType.CONST) { + constValue = ((ConstEval) left).getValue(); + } else if (right.getType() == EvalType.CONST) { + constValue = ((ConstEval) right).getValue(); + } + + if (constValue != null) { + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.GEQ || + evalNode.getType() == EvalType.GTH) { + if (startDatum != null) { + if (constValue.compareTo(startDatum) > 0) { + startDatum = constValue; + } + } else { + startDatum = constValue; + } + } + + if (evalNode.getType() == EvalType.EQUAL || + evalNode.getType() == EvalType.LEQ || + evalNode.getType() == EvalType.LTH) { + if (endDatum != null) { + if (constValue.compareTo(endDatum) < 0) { + endDatum = constValue; + } + } else { + endDatum = constValue; + } + } + } + } else if (evalNode instanceof BetweenPredicateEval) { + BetweenPredicateEval betweenEval = (BetweenPredicateEval) evalNode; + if (betweenEval.getBegin().getType() == EvalType.CONST && betweenEval.getEnd().getType() == EvalType.CONST) { + Datum value = ((ConstEval) betweenEval.getBegin()).getValue(); + if (startDatum != null) { + if (value.compareTo(startDatum) > 0) { + startDatum = value; + } + } else { + startDatum = value; + } + + value = ((ConstEval) betweenEval.getEnd()).getValue(); + if (endDatum != null) { + if (value.compareTo(endDatum) < 0) { + endDatum = value; + } + } else { + endDatum = value; + } + } + } + } + + if (endDatum != null && columnMapping != null && columnMapping.getNumRowKeys() > 1) { + endDatum = new TextDatum(endDatum.asChars() + + new String(new char[]{columnMapping.getRowKeyDelimiter(), Character.MAX_VALUE})); + } + if (startDatum != null || endDatum != null) { + return new Pair<Datum, Datum>(startDatum, endDatum); + } else { + return null; + } + } + + @Override + public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, + LogicalPlan plan, Schema schema, + TableDesc tableDesc) throws IOException { + if (tableDesc == null) { + throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId); + } + Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + hbaseConf.set("hbase.loadincremental.threads.max", "2"); + + JobContextImpl jobContext = new JobContextImpl(hbaseConf, + new JobID(finalEbId.getQueryId().toString(), finalEbId.getId())); + + FileOutputCommitter committer = new FileOutputCommitter(stagingResultDir, jobContext); + Path jobAttemptPath = committer.getJobAttemptPath(jobContext); + FileSystem fs = jobAttemptPath.getFileSystem(queryContext.getConf()); + if (!fs.exists(jobAttemptPath) || fs.listStatus(jobAttemptPath) == null) { + LOG.warn("No query attempt file in " + jobAttemptPath); + return stagingResultDir; + } + committer.commitJob(jobContext); + + if (tableDesc.getName() == null && tableDesc.getPath() != null) { + + // insert into location + return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false); + } else { + // insert into table + String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY); + + HTable htable = new HTable(hbaseConf, tableName); + try { + LoadIncrementalHFiles loadIncrementalHFiles = null; + try { + loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e.getMessage(), e); + } + loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable); + + return stagingResultDir; + } finally { + htable.close(); + } + } + } + + @Override + public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, + Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) + throws IOException { + try { + int[] sortKeyIndexes = new int[sortSpecs.length]; + for (int i = 0; i < sortSpecs.length; i++) { + sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName()); + } + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta()); + + HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName()); + try { + byte[][] endKeys = htable.getEndKeys(); + if (endKeys.length == 1) { + return new TupleRange[]{dataRange}; + } + List<TupleRange> tupleRanges = new ArrayList<TupleRange>(endKeys.length); + + TupleComparator comparator = new BaseTupleComparator(inputSchema, sortSpecs); + Tuple previousTuple = dataRange.getStart(); + + for (byte[] eachEndKey : endKeys) { + Tuple endTuple = new VTuple(sortSpecs.length); + byte[][] rowKeyFields; + if (sortSpecs.length > 1) { + byte[][] splitValues = BytesUtils.splitPreserveAllTokens(eachEndKey, columnMapping.getRowKeyDelimiter()); + if (splitValues.length == sortSpecs.length) { + rowKeyFields = splitValues; + } else { + rowKeyFields = new byte[sortSpecs.length][]; + for (int j = 0; j < sortSpecs.length; j++) { + if (j < splitValues.length) { + rowKeyFields[j] = splitValues[j]; + } else { + rowKeyFields[j] = null; + } + } + } + + } else { + rowKeyFields = new byte[1][]; + rowKeyFields[0] = eachEndKey; + } + + for (int i = 0; i < sortSpecs.length; i++) { + if (columnMapping.getIsBinaryColumns()[sortKeyIndexes[i]]) { + endTuple.put(i, + HBaseBinarySerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } else { + endTuple.put(i, + HBaseTextSerializerDeserializer.deserialize(inputSchema.getColumn(sortKeyIndexes[i]), + rowKeyFields[i])); + } + } + tupleRanges.add(new TupleRange(sortSpecs, previousTuple, endTuple)); + previousTuple = endTuple; + } + + // Last region endkey is empty. Tajo ignores empty key, so endkey is replaced with max data value. + if (comparator.compare(dataRange.getEnd(), tupleRanges.get(tupleRanges.size() - 1).getStart()) >= 0) { + tupleRanges.get(tupleRanges.size() - 1).setEnd(dataRange.getEnd()); + } else { + tupleRanges.remove(tupleRanges.size() - 1); + } + return tupleRanges.toArray(new TupleRange[]{}); + } finally { + htable.close(); + } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + throw new IOException(t.getMessage(), t); + } + } + + public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { + if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) { + List<RewriteRule> rules = new ArrayList<RewriteRule>(); + rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc))); + return rules; + } else { + return null; + } + } + + private Column[] getIndexColumns(TableDesc tableDesc) throws IOException { + List<Column> indexColumns = new ArrayList<Column>(); + + ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta()); + + boolean[] isRowKeys = columnMapping.getIsRowKeyMappings(); + for (int i = 0; i < isRowKeys.length; i++) { + if (isRowKeys[i]) { + indexColumns.add(tableDesc.getSchema().getColumn(i)); + } + } + + return indexColumns.toArray(new Column[]{}); + } + + @Override + public StorageProperty getStorageProperty() { + StorageProperty storageProperty = new StorageProperty(); + storageProperty.setSortedInsert(true); + storageProperty.setSupportsInsertInto(true); + return storageProperty; + } + + public void beforeInsertOrCATS(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (!cNode.isExternal()) { + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists()); + } + } + } + + @Override + public void rollbackOutputCommit(LogicalNode node) throws IOException { + if (node.getType() == NodeType.CREATE_TABLE) { + CreateTableNode cNode = (CreateTableNode)node; + if (cNode.isExternal()) { + return; + } + TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions()); + HBaseAdmin hAdmin = new HBaseAdmin(getHBaseConfiguration(conf, tableMeta)); + + try { + HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema()); + LOG.info("Delete table cause query failed:" + hTableDesc.getName()); + hAdmin.disableTable(hTableDesc.getName()); + hAdmin.deleteTable(hTableDesc.getName()); + } finally { + hAdmin.close(); + } + } + } + + @Override + public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { + if (tableDesc != null) { + Schema tableSchema = tableDesc.getSchema(); + if (tableSchema.size() != outSchema.size()) { + throw new IOException("The number of table columns is different from SELECT columns"); + } + + for (int i = 0; i < tableSchema.size(); i++) { + if (!tableSchema.getColumn(i).getDataType().equals(outSchema.getColumn(i).getDataType())) { + throw new IOException(outSchema.getColumn(i).getQualifiedName() + + "(" + outSchema.getColumn(i).getDataType().getType() + ")" + + " is different column type with " + tableSchema.getColumn(i).getSimpleName() + + "(" + tableSchema.getColumn(i).getDataType().getType() + ")"); + } + } + } + } +}
