http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java deleted file mode 100644 index a4d3140..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/TransactionAwareHTable.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.hbase96; - -import co.cask.tephra.AbstractTransactionAwareTable; -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionAware; -import co.cask.tephra.TxConstants; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.OperationWithAttributes; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; - -/** - * A Transaction Aware HTable implementation for HBase 0.96. Operations are committed as usual, - * but upon a failed or aborted transaction, they are rolled back to the state before the transaction - * was started. - */ -public class TransactionAwareHTable extends AbstractTransactionAwareTable - implements HTableInterface, TransactionAware { - - private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class); - private final HTableInterface hTable; - - /** - * Create a transactional aware instance of the passed HTable - * - * @param hTable underlying HBase table to use - */ - public TransactionAwareHTable(HTableInterface hTable) { - this(hTable, false); - } - - /** - * Create a transactional aware instance of the passed HTable - * - * @param hTable underlying HBase table to use - * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) - */ - public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) { - this(hTable, conflictLevel, false); - } - - /** - * Create a transactional aware instance of the passed HTable, with the option - * of allowing non-transactional operations. - * @param hTable underlying HBase table to use - * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) - * will be available, though non-transactional - */ - public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) { - this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional); - } - - /** - * Create a transactional aware instance of the passed HTable, with the option - * of allowing non-transactional operations. - * @param hTable underlying HBase table to use - * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) - * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) - * will be available, though non-transactional - */ - public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel, - boolean allowNonTransactional) { - super(conflictLevel, allowNonTransactional); - this.hTable = hTable; - } - - /* AbstractTransactionAwareTable implementation */ - - @Override - protected byte[] getTableKey() { - return getTableName(); - } - - @Override - protected boolean doCommit() throws IOException { - hTable.flushCommits(); - return true; - } - - @Override - protected boolean doRollback() throws Exception { - try { - // pre-size arraylist of deletes - int size = 0; - for (Set<ActionChange> cs : changeSets.values()) { - size += cs.size(); - } - List<Delete> rollbackDeletes = new ArrayList<>(size); - for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) { - long transactionTimestamp = entry.getKey(); - for (ActionChange change : entry.getValue()) { - byte[] row = change.getRow(); - byte[] family = change.getFamily(); - byte[] qualifier = change.getQualifier(); - Delete rollbackDelete = new Delete(row); - rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); - switch (conflictLevel) { - case ROW: - case NONE: - // issue family delete for the tx write pointer - rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp); - break; - case COLUMN: - if (family != null && qualifier == null) { - rollbackDelete.deleteFamilyVersion(family, transactionTimestamp); - } else if (family != null && qualifier != null) { - rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp); - } - break; - default: - throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); - } - rollbackDeletes.add(rollbackDelete); - } - } - hTable.delete(rollbackDeletes); - return true; - } finally { - try { - hTable.flushCommits(); - } catch (Exception e) { - LOG.error("Could not flush HTable commits", e); - } - tx = null; - changeSets.clear(); - } - } - - /* HTableInterface implementation */ - - @Override - public byte[] getTableName() { - return hTable.getTableName(); - } - - @Override - public TableName getName() { - return hTable.getName(); - } - - @Override - public Configuration getConfiguration() { - return hTable.getConfiguration(); - } - - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - return hTable.getTableDescriptor(); - } - - @Override - public boolean exists(Get get) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.exists(transactionalizeAction(get)); - } - - @Override - public Boolean[] exists(List<Get> gets) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Get> transactionalizedGets = new ArrayList<Get>(gets.size()); - for (Get get : gets) { - transactionalizedGets.add(transactionalizeAction(get)); - } - return hTable.exists(transactionalizedGets); - } - - @Override - public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.batch(transactionalizeActions(actions), results); - } - - @Override - public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.batch(transactionalizeActions(actions)); - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws - IOException, InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.batchCallback(transactionalizeActions(actions), results, callback); - } - - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, - InterruptedException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.batchCallback(transactionalizeActions(actions), callback); - } - - @Override - public Result get(Get get) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.get(transactionalizeAction(get)); - } - - @Override - public Result[] get(List<Get> gets) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - ArrayList<Get> transactionalizedGets = new ArrayList<Get>(); - for (Get get : gets) { - transactionalizedGets.add(transactionalizeAction(get)); - } - return hTable.get(transactionalizedGets); - } - - @Override - public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { - if (allowNonTransactional) { - return hTable.getRowOrBefore(row, family); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Scan scan = new Scan(); - scan.addFamily(family); - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Scan scan = new Scan(); - scan.addColumn(family, qualifier); - return hTable.getScanner(transactionalizeAction(scan)); - } - - @Override - public void put(Put put) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - Put txPut = transactionalizeAction(put); - hTable.put(txPut); - } - - @Override - public void put(List<Put> puts) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size()); - for (Put put : puts) { - Put txPut = transactionalizeAction(put); - transactionalizedPuts.add(txPut); - } - hTable.put(transactionalizedPuts); - } - - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { - if (allowNonTransactional) { - return hTable.checkAndPut(row, family, qualifier, value, put); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public void delete(Delete delete) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - hTable.delete(transactionalizeAction(delete)); - } - - @Override - public void delete(List<Delete> deletes) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size()); - for (Delete delete : deletes) { - Delete txDelete = transactionalizeAction(delete); - transactionalizedDeletes.add(txDelete); - } - hTable.delete(transactionalizedDeletes); - } - - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) - throws IOException { - if (allowNonTransactional) { - return hTable.checkAndDelete(row, family, qualifier, value, delete); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public void mutateRow(RowMutations rm) throws IOException { - if (tx == null) { - throw new IOException("Transaction not started"); - } - RowMutations transactionalMutations = new RowMutations(); - for (Mutation mutation : rm.getMutations()) { - if (mutation instanceof Put) { - transactionalMutations.add(transactionalizeAction((Put) mutation)); - } else if (mutation instanceof Delete) { - transactionalMutations.add(transactionalizeAction((Delete) mutation)); - } - } - hTable.mutateRow(transactionalMutations); - } - - @Override - public Result append(Append append) throws IOException { - if (allowNonTransactional) { - return hTable.append(append); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public Result increment(Increment increment) throws IOException { - if (allowNonTransactional) { - return hTable.increment(increment); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) - throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount, durability); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) - throws IOException { - if (allowNonTransactional) { - return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - } else { - throw new UnsupportedOperationException("Operation is not supported transactionally"); - } - } - - @Override - public boolean isAutoFlush() { - return hTable.isAutoFlush(); - } - - @Override - public void flushCommits() throws IOException { - hTable.flushCommits(); - } - - @Override - public void close() throws IOException { - hTable.close(); - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - return hTable.coprocessorService(row); - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable) - throws ServiceException, Throwable { - return hTable.coprocessorService(service, startKey, endKey, callable); - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable, Batch.Callback<R> callback) - throws ServiceException, Throwable { - hTable.coprocessorService(service, startKey, endKey, callable, callback); - } - - @Override - public void setAutoFlush(boolean autoFlush) { - setAutoFlushTo(autoFlush); - } - - @Override - public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { - hTable.setAutoFlush(autoFlush, clearBufferOnFail); - } - - @Override - public void setAutoFlushTo(boolean autoFlush) { - hTable.setAutoFlushTo(autoFlush); - } - - @Override - public long getWriteBufferSize() { - return hTable.getWriteBufferSize(); - } - - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - hTable.setWriteBufferSize(writeBufferSize); - } - - // Helpers to get copies of objects with the timestamp set to the current transaction timestamp. - - private Get transactionalizeAction(Get get) throws IOException { - addToOperation(get, tx); - return get; - } - - private Scan transactionalizeAction(Scan scan) throws IOException { - addToOperation(scan, tx); - return scan; - } - - private Put transactionalizeAction(Put put) throws IOException { - Put txPut = new Put(put.getRow(), tx.getWritePointer()); - Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet(); - if (!familyMap.isEmpty()) { - for (Map.Entry<byte[], List<Cell>> family : familyMap) { - List<Cell> familyValues = family.getValue(); - if (!familyValues.isEmpty()) { - for (Cell value : familyValues) { - txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue()); - addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier()); - } - } - } - } - for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) { - txPut.setAttribute(entry.getKey(), entry.getValue()); - } - txPut.setDurability(put.getDurability()); - addToOperation(txPut, tx); - return txPut; - } - - private Delete transactionalizeAction(Delete delete) throws IOException { - long transactionTimestamp = tx.getWritePointer(); - - byte[] deleteRow = delete.getRow(); - Delete txDelete = new Delete(deleteRow, transactionTimestamp); - - Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap(); - if (familyToDelete.isEmpty()) { - // perform a row delete if we are using row-level conflict detection - if (conflictLevel == TxConstants.ConflictDetection.ROW || - conflictLevel == TxConstants.ConflictDetection.NONE) { - // Row delete leaves delete markers in all column families of the table - // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet - for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) { - // no need to identify individual columns deleted - addToChangeSet(deleteRow, columnDescriptor.getName(), null); - } - } else { - Result result = get(new Get(delete.getRow())); - // Delete everything - NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap(); - for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) { - NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey()); - for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { - txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp); - addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey()); - } - } - } - } else { - for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) { - byte[] family = familyEntry.getKey(); - List<Cell> entries = familyEntry.getValue(); - boolean isFamilyDelete = false; - if (entries.size() == 1) { - Cell cell = entries.get(0); - isFamilyDelete = CellUtil.isDeleteFamily(cell); - } - if (isFamilyDelete) { - if (conflictLevel == TxConstants.ConflictDetection.ROW || - conflictLevel == TxConstants.ConflictDetection.NONE) { - // no need to identify individual columns deleted - // Older versions of HBase 0.96 lack HBASE-10964, so family deletes do not correctly - // inherit the common Delete timestamp, so must explicitly set the timestamp here. - txDelete.deleteFamily(family, transactionTimestamp); - addToChangeSet(deleteRow, family, null); - } else { - Result result = get(new Get(delete.getRow()).addFamily(family)); - // Delete entire family - NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family); - for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { - txDelete.deleteColumns(family, column.getKey(), transactionTimestamp); - addToChangeSet(deleteRow, family, column.getKey()); - } - } - } else { - for (Cell value : entries) { - txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp); - addToChangeSet(deleteRow, value.getFamily(), value.getQualifier()); - } - } - } - } - for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) { - txDelete.setAttribute(entry.getKey(), entry.getValue()); - } - txDelete.setDurability(delete.getDurability()); - return txDelete; - } - - private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException { - List<Row> transactionalizedActions = new ArrayList<>(actions.size()); - for (Row action : actions) { - if (action instanceof Get) { - transactionalizedActions.add(transactionalizeAction((Get) action)); - } else if (action instanceof Put) { - transactionalizedActions.add(transactionalizeAction((Put) action)); - } else if (action instanceof Delete) { - transactionalizedActions.add(transactionalizeAction((Delete) action)); - } else { - transactionalizedActions.add(action); - } - } - return transactionalizedActions; - } - - public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { - op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/CellSkipFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/CellSkipFilter.java deleted file mode 100644 index 8bbf929..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/CellSkipFilter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96.coprocessor; - -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; - -import java.io.IOException; -import java.util.List; - -/** - * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue} - * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL}, - * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different - * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}. - * Please see TEPHRA-169 for more details. - */ -public class CellSkipFilter extends FilterBase { - private final Filter filter; - // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL - private KeyValue skipColumn = null; - - public CellSkipFilter(Filter filter) { - this.filter = filter; - } - - /** - * Determines whether the current cell should be skipped. The cell will be skipped - * if the previous keyvalue had the same key as the current cell. This means filter already responded - * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL. - * @param cell the {@link Cell} to be tested for skipping - * @return true is current cell should be skipped, false otherwise - */ - private boolean skipCellVersion(Cell cell) { - return skipColumn != null - && skipColumn.matchingRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) - && skipColumn.matchingColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - } - - @Override - public ReturnCode filterKeyValue(Cell cell) throws IOException { - if (skipCellVersion(cell)) { - return ReturnCode.NEXT_COL; - } - - ReturnCode code = filter.filterKeyValue(cell); - if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) { - // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL - skipColumn = KeyValue.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), - cell.getQualifierLength()); - } else { - skipColumn = null; - } - return code; - } - - @Override - public boolean filterRow() throws IOException { - return filter.filterRow(); - } - - @Override - public Cell transformCell(Cell cell) throws IOException { - return filter.transformCell(cell); - } - - @Override - public void reset() throws IOException { - filter.reset(); - } - - @Override - public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { - return filter.filterRowKey(buffer, offset, length); - } - - @Override - public boolean filterAllRemaining() throws IOException { - return filter.filterAllRemaining(); - } - - @Override - public void filterRowCells(List<Cell> kvs) throws IOException { - filter.filterRowCells(kvs); - } - - @Override - public boolean hasFilterRow() { - return filter.hasFilterRow(); - } - - @SuppressWarnings("deprecation") - @Override - public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { - return filter.getNextKeyHint(currentKV); - } - - @Override - public Cell getNextCellHint(Cell currentKV) throws IOException { - return filter.getNextCellHint(currentKV); - } - - @Override - public boolean isFamilyEssential(byte[] name) throws IOException { - return filter.isFamilyEssential(name); - } - - @Override - public byte[] toByteArray() throws IOException { - return filter.toByteArray(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionFilters.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionFilters.java deleted file mode 100644 index 22b0ae7..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionFilters.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96.coprocessor; - -import co.cask.tephra.Transaction; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.regionserver.ScanType; - -import java.util.Map; -import javax.annotation.Nullable; - -/** - * Factory class for providing {@link Filter} instances. - */ -public class TransactionFilters { - /** - * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. - * - * @param tx the current transaction to apply. Only data visible to this transaction will be returned. - * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name - * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} - * these will be interpreted as "delete" markers and the column will be filtered out - * @param scanType the type of scan operation being performed - */ - public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); - } - - /** - * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. - * - * @param tx the current transaction to apply. Only data visible to this transaction will be returned. - * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name - * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} - * these will be interpreted as "delete" markers and the column will be filtered out - * @param scanType the type of scan operation being performed - * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by - * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then - * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. - */ - public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java deleted file mode 100644 index 8351d2e..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessor.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96.coprocessor; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TransactionCodec; -import co.cask.tephra.TxConstants; -import co.cask.tephra.coprocessor.TransactionStateCache; -import co.cask.tephra.coprocessor.TransactionStateCacheSupplier; -import co.cask.tephra.persist.TransactionVisibilityState; -import co.cask.tephra.util.TxUtils; -import com.google.common.base.Supplier; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.OperationWithAttributes; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; -import org.apache.hadoop.hbase.regionserver.wal.WALEdit; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; - -/** - * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing - * for transactions: - * <ul> - * <li>applies filtering to exclude data from invalid and in-progress transactions</li> - * <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions, - * or expired due to TTL.</li> - * </ul> - * - * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions, - * or on all user tables by adding the following to hbase-site.xml: - * {@code - * <property> - * <name>hbase.coprocessor.region.classes</name> - * <value>co.cask.tephra.hbase96.coprocessor.TransactionProcessor</value> - * </property> - * } - * </p> - * - * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation - * as an attribute: - * {@code - * Transaction t = ...; - * Get get = new Get(...); - * TransactionCodec codec = new TransactionCodec(); - * codec.addToOperation(get, t); - * } - * </p> - */ -public class TransactionProcessor extends BaseRegionObserver { - private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); - - private TransactionStateCache cache; - private final TransactionCodec txCodec; - protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; - protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; - - public TransactionProcessor() { - this.txCodec = new TransactionCodec(); - } - - /* RegionObserver implementation */ - - @Override - public void start(CoprocessorEnvironment e) throws IOException { - if (e instanceof RegionCoprocessorEnvironment) { - RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; - Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); - this.cache = cacheSupplier.get(); - - HTableDescriptor tableDesc = env.getRegion().getTableDesc(); - for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) { - String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL); - long ttl = 0; - if (columnTTL != null) { - try { - ttl = Long.parseLong(columnTTL); - LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL); - } catch (NumberFormatException nfe) { - LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() + - ", value = " + columnTTL); - } - } - ttlByFamily.put(columnDesc.getName(), ttl); - } - - this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, - TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); - this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); - if (readNonTxnData) { - LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); - } - } - } - - protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { - return new TransactionStateCacheSupplier(env.getConfiguration()); - } - - @Override - public void stop(CoprocessorEnvironment e) throws IOException { - // nothing to do - } - - @Override - public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) - throws IOException { - Transaction tx = getFromOperation(get); - if (tx != null) { - projectFamilyDeletes(get); - get.setMaxVersions(); - get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData), - TxUtils.getMaxVisibleTimestamp(tx)); - Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter()); - get.setFilter(newFilter); - } - } - - @Override - public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, - Durability durability) throws IOException { - // Translate deletes into our own delete tombstones - // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows - // us to rollback the changes (by a real delete) if the transaction fails - - // Deletes that are part of a transaction rollback do not need special handling. - // They will never be rolled back, so are performed as normal HBase deletes. - if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { - return; - } - - // Other deletes are client-initiated and need to be translated into our own tombstones - // TODO: this should delegate to the DeleteStrategy implementation. - Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp()); - for (byte[] family : delete.getFamilyCellMap().keySet()) { - List<Cell> familyCells = delete.getFamilyCellMap().get(family); - if (isFamilyDelete(familyCells)) { - deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); - } else { - int cellSize = familyCells.size(); - for (int i = 0; i < cellSize; i++) { - Cell cell = familyCells.get(i); - deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), - HConstants.EMPTY_BYTE_ARRAY); - } - } - } - for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) { - deleteMarkers.setAttribute(entry.getKey(), entry.getValue()); - } - e.getEnvironment().getRegion().put(deleteMarkers); - // skip normal delete handling - e.bypass(); - } - - private boolean isFamilyDelete(List<Cell> familyCells) { - return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); - } - - @Override - public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) - throws IOException { - Transaction tx = getFromOperation(scan); - if (tx != null) { - projectFamilyDeletes(scan); - scan.setMaxVersions(); - scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData), - TxUtils.getMaxVisibleTimestamp(tx)); - Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter()); - scan.setFilter(newFilter); - } - return s; - } - - /** - * Ensures that family delete markers are present in the columns requested for any scan operation. - * @param scan The original scan request - * @return The modified scan request with the family delete qualifiers represented - */ - private Scan projectFamilyDeletes(Scan scan) { - for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { - NavigableSet<byte[]> columns = entry.getValue(); - // wildcard scans will automatically include the delete marker, so only need to add it when we have - // explicit columns listed - if (columns != null && !columns.isEmpty()) { - scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER); - } - } - return scan; - } - - /** - * Ensures that family delete markers are present in the columns requested for any get operation. - * @param get The original get request - * @return The modified get request with the family delete qualifiers represented - */ - private Get projectFamilyDeletes(Get get) { - for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) { - NavigableSet<byte[]> columns = entry.getValue(); - // wildcard scans will automatically include the delete marker, so only need to add it when we have - // explicit columns listed - if (columns != null && !columns.isEmpty()) { - get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER); - } - } - return get; - } - - @Override - public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - KeyValueScanner memstoreScanner, InternalScanner scanner) - throws IOException { - return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, - HConstants.OLDEST_TIMESTAMP); - } - - @Override - public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, - List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, - CompactionRequest request) - throws IOException { - return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners, scanType, - earliestPutTs); - } - - protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action, - TransactionVisibilityState snapshot, Store store, - List<? extends KeyValueScanner> scanners, ScanType type, - long earliestPutTs) throws IOException { - if (snapshot == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Region " + env.getRegion().getRegionNameAsString() + - ", no current transaction state found, defaulting to normal " + action + " scanner"); - } - return null; - } - - // construct a dummy transaction from the latest snapshot - Transaction dummyTx = TxUtils.createDummyTransaction(snapshot); - Scan scan = new Scan(); - // need to see all versions, since we filter out excludes and applications may rely on multiple versions - scan.setMaxVersions(); - scan.setFilter( - new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(), - snapshot.getInvalid(), - getTransactionFilter(dummyTx, type, null))); - - return new StoreScanner(store, store.getScanInfo(), scan, scanners, - type, store.getSmallestReadPoint(), earliestPutTs); - } - - private Transaction getFromOperation(OperationWithAttributes op) throws IOException { - byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); - if (encoded != null) { - return txCodec.decode(encoded); - } - return null; - } - - /** - * Derived classes can override this method to customize the filter used to return data visible for the current - * transaction. - * - * @param tx the current transaction to apply - * @param type the type of scan operation being performed - */ - protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { - return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); - } - - /** - * Filter used to include cells visible to in-progress transactions on flush and commit. - */ - static class IncludeInProgressFilter extends FilterBase { - private final long visibilityUpperBound; - private final Set<Long> invalidIds; - private final Filter txFilter; - - public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) { - this.visibilityUpperBound = upperBound; - this.invalidIds = Sets.newHashSet(invalids); - this.txFilter = transactionFilter; - } - - @Override - public ReturnCode filterKeyValue(Cell cell) throws IOException { - // include all cells visible to in-progress transactions, except for those already marked as invalid - long ts = cell.getTimestamp(); - if (ts > visibilityUpperBound) { - // include everything that could still be in-progress except invalids - if (invalidIds.contains(ts)) { - return ReturnCode.SKIP; - } - return ReturnCode.INCLUDE; - } - return txFilter.filterKeyValue(cell); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java deleted file mode 100644 index 1266b76..0000000 --- a/tephra-hbase-compat-0.96/src/main/java/co/cask/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java +++ /dev/null @@ -1,313 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96.coprocessor; - -import co.cask.tephra.Transaction; -import co.cask.tephra.TxConstants; -import co.cask.tephra.util.TxUtils; -import com.google.common.collect.Maps; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterBase; -import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * Applies filtering of data based on transactional visibility (HBase 0.96+ specific version). - * Note: this is intended for server-side use only, as additional properties need to be set on - * any {@code Scan} or {@code Get} operation performed. - */ -public class TransactionVisibilityFilter extends FilterBase { - private final Transaction tx; - // oldest visible timestamp by column family, used to apply TTL when reading - private final Map<byte[], Long> oldestTsByFamily; - // if false, empty values will be interpreted as deletes - private final boolean allowEmptyValues; - // whether or not we can remove delete markers - // these can only be safely removed when we are traversing all storefiles - private final boolean clearDeletes; - // optional sub-filter to apply to visible cells - private final Filter cellFilter; - - // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV - private byte[] currentFamily = new byte[0]; - private long currentOldestTs; - - private DeleteTracker deleteTracker = new DeleteTracker(); - - /** - * Creates a new {@link Filter} for returning data only from visible transactions. - * - * @param tx the current transaction to apply. Only data visible to this transaction will be returned. - * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name - * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} - * these will be interpreted as "delete" markers and the column will be filtered out - * @param scanType the type of scan operation being performed - */ - public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, - ScanType scanType) { - this(tx, ttlByFamily, allowEmptyValues, scanType, null); - } - - /** - * Creates a new {@link Filter} for returning data only from visible transactions. - * - * @param tx the current transaction to apply. Only data visible to this transaction will be returned. - * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name - * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} - * these will be interpreted as "delete" markers and the column will be filtered out - * @param scanType the type of scan operation being performed - * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by - * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then - * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. - */ - public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, - ScanType scanType, @Nullable Filter cellFilter) { - this.tx = tx; - this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) { - long familyTTL = ttlEntry.getValue(); - oldestTsByFamily.put(ttlEntry.getKey(), - familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); - } - this.allowEmptyValues = allowEmptyValues; - this.clearDeletes = - scanType == ScanType.COMPACT_DROP_DELETES || - (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL); - this.cellFilter = cellFilter; - } - - @Override - public ReturnCode filterKeyValue(Cell cell) throws IOException { - if (!CellUtil.matchingFamily(cell, currentFamily)) { - // column family changed - currentFamily = CellUtil.cloneFamily(cell); - Long familyOldestTs = oldestTsByFamily.get(currentFamily); - currentOldestTs = familyOldestTs != null ? familyOldestTs : 0; - deleteTracker.reset(); - } - // need to apply TTL for the column family here - long kvTimestamp = cell.getTimestamp(); - if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) { - // passed TTL for this column, seek to next - return ReturnCode.NEXT_COL; - } else if (tx.isVisible(kvTimestamp)) { - // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL - if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) { - // cell is visible - // visibility SNAPSHOT_ALL needs all matches - return runSubFilter(ReturnCode.INCLUDE, cell); - } - if (DeleteTracker.isFamilyDelete(cell)) { - deleteTracker.addFamilyDelete(cell); - if (clearDeletes) { - return ReturnCode.NEXT_COL; - } else { - // cell is visible - // as soon as we find a KV to include we can move to the next column - return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); - } - } - // check if masked by family delete - if (deleteTracker.isDeleted(cell)) { - return ReturnCode.NEXT_COL; - } - // check for column delete - if (isColumnDelete(cell)) { - if (clearDeletes) { - // skip "deleted" cell - return ReturnCode.NEXT_COL; - } else { - // keep the marker but skip any remaining versions - return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); - } - } - // cell is visible - // as soon as we find a KV to include we can move to the next column - return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); - } else { - return ReturnCode.SKIP; - } - } - - private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException { - if (cellFilter != null) { - ReturnCode subFilterCode = cellFilter.filterKeyValue(cell); - return determineReturnCode(txFilterCode, subFilterCode); - } - return txFilterCode; - } - - /** - * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code. - * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's - * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the - * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden. - * - * @param txFilterCode return code from TransactionVisibilityFilter - * @param subFilterCode return code from sub-filter - * @return final return code - */ - protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { - // Return the more restrictive of the two filter responses - switch (subFilterCode) { - case INCLUDE: - return txFilterCode; - case INCLUDE_AND_NEXT_COL: - return ReturnCode.INCLUDE_AND_NEXT_COL; - case SKIP: - return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; - default: - return subFilterCode; - } - } - - @Override - public boolean filterRow() throws IOException { - if (cellFilter != null) { - return cellFilter.filterRow(); - } - return super.filterRow(); - } - - @Override - public Cell transformCell(Cell cell) throws IOException { - // Convert Tephra deletes back into HBase deletes - if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) { - if (DeleteTracker.isFamilyDelete(cell)) { - return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(), - KeyValue.Type.DeleteFamily); - } else if (isColumnDelete(cell)) { - // Note: in some cases KeyValue.Type.Delete is used in Delete object, - // and in some other cases KeyValue.Type.DeleteColumn is used. - // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn. - // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will - // work in both cases. - return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), - cell.getTimestamp(), KeyValue.Type.DeleteColumn); - } - } - return cell; - } - - @Override - public void reset() throws IOException { - deleteTracker.reset(); - if (cellFilter != null) { - cellFilter.reset(); - } - } - - @Override - public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { - if (cellFilter != null) { - return cellFilter.filterRowKey(buffer, offset, length); - } - return super.filterRowKey(buffer, offset, length); - } - - @Override - public boolean filterAllRemaining() throws IOException { - if (cellFilter != null) { - return cellFilter.filterAllRemaining(); - } - return super.filterAllRemaining(); - } - - @Override - public void filterRowCells(List<Cell> kvs) throws IOException { - if (cellFilter != null) { - cellFilter.filterRowCells(kvs); - } else { - super.filterRowCells(kvs); - } - } - - @Override - public boolean hasFilterRow() { - if (cellFilter != null) { - return cellFilter.hasFilterRow(); - } - return super.hasFilterRow(); - } - - @SuppressWarnings("deprecation") - @Override - public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { - if (cellFilter != null) { - return cellFilter.getNextKeyHint(currentKV); - } - return super.getNextKeyHint(currentKV); - } - - @Override - public Cell getNextCellHint(Cell currentKV) throws IOException { - if (cellFilter != null) { - return cellFilter.getNextCellHint(currentKV); - } - return super.getNextCellHint(currentKV); - } - - @Override - public boolean isFamilyEssential(byte[] name) throws IOException { - if (cellFilter != null) { - return cellFilter.isFamilyEssential(name); - } - return super.isFamilyEssential(name); - } - - @Override - public byte[] toByteArray() throws IOException { - return super.toByteArray(); - } - - private boolean isColumnDelete(Cell cell) { - return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues; - } - - private static final class DeleteTracker { - private long familyDeleteTs; - - public static boolean isFamilyDelete(Cell cell) { - return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && - CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) && - CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); - } - - public void addFamilyDelete(Cell delete) { - this.familyDeleteTs = delete.getTimestamp(); - } - - public boolean isDeleted(Cell cell) { - return cell.getTimestamp() <= familyDeleteTs; - } - - public void reset() { - this.familyDeleteTs = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/HBase96ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/HBase96ConfigurationProvider.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/HBase96ConfigurationProvider.java new file mode 100644 index 0000000..5bd3559 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/HBase96ConfigurationProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.tephra.util.ConfigurationProvider; + +/** + * HBase 0.96 version of {@link ConfigurationProvider}. + */ +public class HBase96ConfigurationProvider extends ConfigurationProvider { + @Override + public Configuration get() { + return HBaseConfiguration.create(); + } + + @Override + public Configuration get(Configuration baseConf) { + return HBaseConfiguration.create(baseConf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/SecondaryIndexTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/SecondaryIndexTable.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/SecondaryIndexTable.java new file mode 100644 index 0000000..7a22fcd --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/SecondaryIndexTable.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.hbase96; + +import com.google.common.base.Throwables; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.TransactionContext; +import org.apache.tephra.TransactionFailureException; +import org.apache.tephra.distributed.TransactionServiceClient; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A Transactional SecondaryIndexTable. + */ +public class SecondaryIndexTable { + private byte[] secondaryIndex; + private TransactionAwareHTable transactionAwareHTable; + private TransactionAwareHTable secondaryIndexTable; + private TransactionContext transactionContext; + private final TableName secondaryIndexTableName; + private static final byte[] secondaryIndexFamily = Bytes.toBytes("secondaryIndexFamily"); + private static final byte[] secondaryIndexQualifier = Bytes.toBytes('r'); + private static final byte[] DELIMITER = new byte[] {0}; + + public SecondaryIndexTable(TransactionServiceClient transactionServiceClient, HTableInterface hTable, + byte[] secondaryIndex) { + secondaryIndexTableName = TableName.valueOf(hTable.getName().getNameAsString() + ".idx"); + HTable secondaryIndexHTable = null; + HBaseAdmin hBaseAdmin = null; + try { + hBaseAdmin = new HBaseAdmin(hTable.getConfiguration()); + if (!hBaseAdmin.tableExists(secondaryIndexTableName)) { + hBaseAdmin.createTable(new HTableDescriptor(secondaryIndexTableName)); + } + secondaryIndexHTable = new HTable(hTable.getConfiguration(), secondaryIndexTableName); + } catch (Exception e) { + Throwables.propagate(e); + } finally { + try { + hBaseAdmin.close(); + } catch (Exception e) { + Throwables.propagate(e); + } + } + + this.secondaryIndex = secondaryIndex; + this.transactionAwareHTable = new TransactionAwareHTable(hTable); + this.secondaryIndexTable = new TransactionAwareHTable(secondaryIndexHTable); + this.transactionContext = new TransactionContext(transactionServiceClient, transactionAwareHTable, + secondaryIndexTable); + } + + public Result get(Get get) throws IOException { + return get(Collections.singletonList(get))[0]; + } + + public Result[] get(List<Get> gets) throws IOException { + try { + transactionContext.start(); + Result[] result = transactionAwareHTable.get(gets); + transactionContext.finish(); + return result; + } catch (Exception e) { + try { + transactionContext.abort(); + } catch (TransactionFailureException e1) { + throw new IOException("Could not rollback transaction", e1); + } + } + return null; + } + + public Result[] getByIndex(byte[] value) throws IOException { + try { + transactionContext.start(); + Scan scan = new Scan(value, Bytes.add(value, new byte[0])); + scan.addColumn(secondaryIndexFamily, secondaryIndexQualifier); + ResultScanner indexScanner = secondaryIndexTable.getScanner(scan); + + ArrayList<Get> gets = new ArrayList<Get>(); + for (Result result : indexScanner) { + for (Cell cell : result.listCells()) { + gets.add(new Get(cell.getValue())); + } + } + Result[] results = transactionAwareHTable.get(gets); + transactionContext.finish(); + return results; + } catch (Exception e) { + try { + transactionContext.abort(); + } catch (TransactionFailureException e1) { + throw new IOException("Could not rollback transaction", e1); + } + } + return null; + } + + public void put(Put put) throws IOException { + put(Collections.singletonList(put)); + } + + + public void put(List<Put> puts) throws IOException { + try { + transactionContext.start(); + ArrayList<Put> secondaryIndexPuts = new ArrayList<Put>(); + for (Put put : puts) { + List<Put> indexPuts = new ArrayList<Put>(); + Set<Map.Entry<byte[], List<KeyValue>>> familyMap = put.getFamilyMap().entrySet(); + for (Map.Entry<byte [], List<KeyValue>> family : familyMap) { + for (KeyValue value : family.getValue()) { + if (Bytes.equals(value.getQualifierArray(), value.getQualifierOffset(), value.getQualifierLength(), + secondaryIndex, 0, secondaryIndex.length)) { + byte[] secondaryRow = Bytes.add(value.getQualifier(), DELIMITER, + Bytes.add(value.getValue(), DELIMITER, + value.getRow())); + Put indexPut = new Put(secondaryRow); + indexPut.add(secondaryIndexFamily, secondaryIndexQualifier, put.getRow()); + indexPuts.add(indexPut); + } + } + } + secondaryIndexPuts.addAll(indexPuts); + } + transactionAwareHTable.put(puts); + secondaryIndexTable.put(secondaryIndexPuts); + transactionContext.finish(); + } catch (Exception e) { + try { + transactionContext.abort(); + } catch (TransactionFailureException e1) { + throw new IOException("Could not rollback transaction", e1); + } + } + } +}
