http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java new file mode 100644 index 0000000..f9a3ff3 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/TransactionAwareHTable.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.hbase96; + +import com.google.protobuf.Service; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.OperationWithAttributes; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.coprocessor.Batch; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.tephra.AbstractTransactionAwareTable; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionAware; +import org.apache.tephra.TxConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; + +/** + * A Transaction Aware HTable implementation for HBase 0.96. Operations are committed as usual, + * but upon a failed or aborted transaction, they are rolled back to the state before the transaction + * was started. + */ +public class TransactionAwareHTable extends AbstractTransactionAwareTable + implements HTableInterface, TransactionAware { + + private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class); + private final HTableInterface hTable; + + /** + * Create a transactional aware instance of the passed HTable + * + * @param hTable underlying HBase table to use + */ + public TransactionAwareHTable(HTableInterface hTable) { + this(hTable, false); + } + + /** + * Create a transactional aware instance of the passed HTable + * + * @param hTable underlying HBase table to use + * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) + */ + public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) { + this(hTable, conflictLevel, false); + } + + /** + * Create a transactional aware instance of the passed HTable, with the option + * of allowing non-transactional operations. + * @param hTable underlying HBase table to use + * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) + * will be available, though non-transactional + */ + public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) { + this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional); + } + + /** + * Create a transactional aware instance of the passed HTable, with the option + * of allowing non-transactional operations. + * @param hTable underlying HBase table to use + * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN}) + * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete) + * will be available, though non-transactional + */ + public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel, + boolean allowNonTransactional) { + super(conflictLevel, allowNonTransactional); + this.hTable = hTable; + } + + /* AbstractTransactionAwareTable implementation */ + + @Override + protected byte[] getTableKey() { + return getTableName(); + } + + @Override + protected boolean doCommit() throws IOException { + hTable.flushCommits(); + return true; + } + + @Override + protected boolean doRollback() throws Exception { + try { + // pre-size arraylist of deletes + int size = 0; + for (Set<ActionChange> cs : changeSets.values()) { + size += cs.size(); + } + List<Delete> rollbackDeletes = new ArrayList<>(size); + for (Map.Entry<Long, Set<ActionChange>> entry : changeSets.entrySet()) { + long transactionTimestamp = entry.getKey(); + for (ActionChange change : entry.getValue()) { + byte[] row = change.getRow(); + byte[] family = change.getFamily(); + byte[] qualifier = change.getQualifier(); + Delete rollbackDelete = new Delete(row); + rollbackDelete.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]); + switch (conflictLevel) { + case ROW: + case NONE: + // issue family delete for the tx write pointer + rollbackDelete.deleteFamilyVersion(change.getFamily(), transactionTimestamp); + break; + case COLUMN: + if (family != null && qualifier == null) { + rollbackDelete.deleteFamilyVersion(family, transactionTimestamp); + } else if (family != null && qualifier != null) { + rollbackDelete.deleteColumn(family, qualifier, transactionTimestamp); + } + break; + default: + throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel); + } + rollbackDeletes.add(rollbackDelete); + } + } + hTable.delete(rollbackDeletes); + return true; + } finally { + try { + hTable.flushCommits(); + } catch (Exception e) { + LOG.error("Could not flush HTable commits", e); + } + tx = null; + changeSets.clear(); + } + } + + /* HTableInterface implementation */ + + @Override + public byte[] getTableName() { + return hTable.getTableName(); + } + + @Override + public TableName getName() { + return hTable.getName(); + } + + @Override + public Configuration getConfiguration() { + return hTable.getConfiguration(); + } + + @Override + public HTableDescriptor getTableDescriptor() throws IOException { + return hTable.getTableDescriptor(); + } + + @Override + public boolean exists(Get get) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + return hTable.exists(transactionalizeAction(get)); + } + + @Override + public Boolean[] exists(List<Get> gets) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + List<Get> transactionalizedGets = new ArrayList<Get>(gets.size()); + for (Get get : gets) { + transactionalizedGets.add(transactionalizeAction(get)); + } + return hTable.exists(transactionalizedGets); + } + + @Override + public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + hTable.batch(transactionalizeActions(actions), results); + } + + @Override + public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + return hTable.batch(transactionalizeActions(actions)); + } + + @Override + public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws + IOException, InterruptedException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + hTable.batchCallback(transactionalizeActions(actions), results, callback); + } + + @Override + public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, + InterruptedException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + return hTable.batchCallback(transactionalizeActions(actions), callback); + } + + @Override + public Result get(Get get) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + return hTable.get(transactionalizeAction(get)); + } + + @Override + public Result[] get(List<Get> gets) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + ArrayList<Get> transactionalizedGets = new ArrayList<Get>(); + for (Get get : gets) { + transactionalizedGets.add(transactionalizeAction(get)); + } + return hTable.get(transactionalizedGets); + } + + @Override + public Result getRowOrBefore(byte[] row, byte[] family) throws IOException { + if (allowNonTransactional) { + return hTable.getRowOrBefore(row, family); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public ResultScanner getScanner(Scan scan) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + return hTable.getScanner(transactionalizeAction(scan)); + } + + @Override + public ResultScanner getScanner(byte[] family) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + Scan scan = new Scan(); + scan.addFamily(family); + return hTable.getScanner(transactionalizeAction(scan)); + } + + @Override + public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + Scan scan = new Scan(); + scan.addColumn(family, qualifier); + return hTable.getScanner(transactionalizeAction(scan)); + } + + @Override + public void put(Put put) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + Put txPut = transactionalizeAction(put); + hTable.put(txPut); + } + + @Override + public void put(List<Put> puts) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + List<Put> transactionalizedPuts = new ArrayList<Put>(puts.size()); + for (Put put : puts) { + Put txPut = transactionalizeAction(put); + transactionalizedPuts.add(txPut); + } + hTable.put(transactionalizedPuts); + } + + @Override + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { + if (allowNonTransactional) { + return hTable.checkAndPut(row, family, qualifier, value, put); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public void delete(Delete delete) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + hTable.delete(transactionalizeAction(delete)); + } + + @Override + public void delete(List<Delete> deletes) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + List<Delete> transactionalizedDeletes = new ArrayList<Delete>(deletes.size()); + for (Delete delete : deletes) { + Delete txDelete = transactionalizeAction(delete); + transactionalizedDeletes.add(txDelete); + } + hTable.delete(transactionalizedDeletes); + } + + @Override + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) + throws IOException { + if (allowNonTransactional) { + return hTable.checkAndDelete(row, family, qualifier, value, delete); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public void mutateRow(RowMutations rm) throws IOException { + if (tx == null) { + throw new IOException("Transaction not started"); + } + RowMutations transactionalMutations = new RowMutations(); + for (Mutation mutation : rm.getMutations()) { + if (mutation instanceof Put) { + transactionalMutations.add(transactionalizeAction((Put) mutation)); + } else if (mutation instanceof Delete) { + transactionalMutations.add(transactionalizeAction((Delete) mutation)); + } + } + hTable.mutateRow(transactionalMutations); + } + + @Override + public Result append(Append append) throws IOException { + if (allowNonTransactional) { + return hTable.append(append); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public Result increment(Increment increment) throws IOException { + if (allowNonTransactional) { + return hTable.increment(increment); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { + if (allowNonTransactional) { + return hTable.incrementColumnValue(row, family, qualifier, amount); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) + throws IOException { + if (allowNonTransactional) { + return hTable.incrementColumnValue(row, family, qualifier, amount, durability); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + if (allowNonTransactional) { + return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL); + } else { + throw new UnsupportedOperationException("Operation is not supported transactionally"); + } + } + + @Override + public boolean isAutoFlush() { + return hTable.isAutoFlush(); + } + + @Override + public void flushCommits() throws IOException { + hTable.flushCommits(); + } + + @Override + public void close() throws IOException { + hTable.close(); + } + + @Override + public CoprocessorRpcChannel coprocessorService(byte[] row) { + return hTable.coprocessorService(row); + } + + @Override + public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Batch.Call<T, R> callable) + throws ServiceException, Throwable { + return hTable.coprocessorService(service, startKey, endKey, callable); + } + + @Override + public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, + Batch.Call<T, R> callable, Batch.Callback<R> callback) + throws ServiceException, Throwable { + hTable.coprocessorService(service, startKey, endKey, callable, callback); + } + + @Override + public void setAutoFlush(boolean autoFlush) { + setAutoFlushTo(autoFlush); + } + + @Override + public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) { + hTable.setAutoFlush(autoFlush, clearBufferOnFail); + } + + @Override + public void setAutoFlushTo(boolean autoFlush) { + hTable.setAutoFlushTo(autoFlush); + } + + @Override + public long getWriteBufferSize() { + return hTable.getWriteBufferSize(); + } + + @Override + public void setWriteBufferSize(long writeBufferSize) throws IOException { + hTable.setWriteBufferSize(writeBufferSize); + } + + // Helpers to get copies of objects with the timestamp set to the current transaction timestamp. + + private Get transactionalizeAction(Get get) throws IOException { + addToOperation(get, tx); + return get; + } + + private Scan transactionalizeAction(Scan scan) throws IOException { + addToOperation(scan, tx); + return scan; + } + + private Put transactionalizeAction(Put put) throws IOException { + Put txPut = new Put(put.getRow(), tx.getWritePointer()); + Set<Map.Entry<byte[], List<Cell>>> familyMap = put.getFamilyCellMap().entrySet(); + if (!familyMap.isEmpty()) { + for (Map.Entry<byte[], List<Cell>> family : familyMap) { + List<Cell> familyValues = family.getValue(); + if (!familyValues.isEmpty()) { + for (Cell value : familyValues) { + txPut.add(value.getFamily(), value.getQualifier(), tx.getWritePointer(), value.getValue()); + addToChangeSet(txPut.getRow(), value.getFamily(), value.getQualifier()); + } + } + } + } + for (Map.Entry<String, byte[]> entry : put.getAttributesMap().entrySet()) { + txPut.setAttribute(entry.getKey(), entry.getValue()); + } + txPut.setDurability(put.getDurability()); + addToOperation(txPut, tx); + return txPut; + } + + private Delete transactionalizeAction(Delete delete) throws IOException { + long transactionTimestamp = tx.getWritePointer(); + + byte[] deleteRow = delete.getRow(); + Delete txDelete = new Delete(deleteRow, transactionTimestamp); + + Map<byte[], List<Cell>> familyToDelete = delete.getFamilyCellMap(); + if (familyToDelete.isEmpty()) { + // perform a row delete if we are using row-level conflict detection + if (conflictLevel == TxConstants.ConflictDetection.ROW || + conflictLevel == TxConstants.ConflictDetection.NONE) { + // Row delete leaves delete markers in all column families of the table + // Therefore get all the column families of the hTable from the HTableDescriptor and add them to the changeSet + for (HColumnDescriptor columnDescriptor : hTable.getTableDescriptor().getColumnFamilies()) { + // no need to identify individual columns deleted + addToChangeSet(deleteRow, columnDescriptor.getName(), null); + } + } else { + Result result = get(new Get(delete.getRow())); + // Delete everything + NavigableMap<byte[], NavigableMap<byte[], byte[]>> resultMap = result.getNoVersionMap(); + for (Map.Entry<byte[], NavigableMap<byte[], byte[]>> familyEntry : resultMap.entrySet()) { + NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(familyEntry.getKey()); + for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { + txDelete.deleteColumns(familyEntry.getKey(), column.getKey(), transactionTimestamp); + addToChangeSet(deleteRow, familyEntry.getKey(), column.getKey()); + } + } + } + } else { + for (Map.Entry<byte [], List<Cell>> familyEntry : familyToDelete.entrySet()) { + byte[] family = familyEntry.getKey(); + List<Cell> entries = familyEntry.getValue(); + boolean isFamilyDelete = false; + if (entries.size() == 1) { + Cell cell = entries.get(0); + isFamilyDelete = CellUtil.isDeleteFamily(cell); + } + if (isFamilyDelete) { + if (conflictLevel == TxConstants.ConflictDetection.ROW || + conflictLevel == TxConstants.ConflictDetection.NONE) { + // no need to identify individual columns deleted + // Older versions of HBase 0.96 lack HBASE-10964, so family deletes do not correctly + // inherit the common Delete timestamp, so must explicitly set the timestamp here. + txDelete.deleteFamily(family, transactionTimestamp); + addToChangeSet(deleteRow, family, null); + } else { + Result result = get(new Get(delete.getRow()).addFamily(family)); + // Delete entire family + NavigableMap<byte[], byte[]> familyColumns = result.getFamilyMap(family); + for (Map.Entry<byte[], byte[]> column : familyColumns.entrySet()) { + txDelete.deleteColumns(family, column.getKey(), transactionTimestamp); + addToChangeSet(deleteRow, family, column.getKey()); + } + } + } else { + for (Cell value : entries) { + txDelete.deleteColumns(value.getFamily(), value.getQualifier(), transactionTimestamp); + addToChangeSet(deleteRow, value.getFamily(), value.getQualifier()); + } + } + } + } + for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) { + txDelete.setAttribute(entry.getKey(), entry.getValue()); + } + txDelete.setDurability(delete.getDurability()); + return txDelete; + } + + private List<? extends Row> transactionalizeActions(List<? extends Row> actions) throws IOException { + List<Row> transactionalizedActions = new ArrayList<>(actions.size()); + for (Row action : actions) { + if (action instanceof Get) { + transactionalizedActions.add(transactionalizeAction((Get) action)); + } else if (action instanceof Put) { + transactionalizedActions.add(transactionalizeAction((Put) action)); + } else if (action instanceof Delete) { + transactionalizedActions.add(transactionalizeAction((Delete) action)); + } else { + transactionalizedActions.add(action); + } + } + return transactionalizedActions; + } + + public void addToOperation(OperationWithAttributes op, Transaction tx) throws IOException { + op.setAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY, txCodec.encode(tx)); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/CellSkipFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/CellSkipFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/CellSkipFilter.java new file mode 100644 index 0000000..526498d --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/CellSkipFilter.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; + +import java.io.IOException; +import java.util.List; + +/** + * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue} + * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL}, + * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different + * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}. + * Please see TEPHRA-169 for more details. + */ +public class CellSkipFilter extends FilterBase { + private final Filter filter; + // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL + private KeyValue skipColumn = null; + + public CellSkipFilter(Filter filter) { + this.filter = filter; + } + + /** + * Determines whether the current cell should be skipped. The cell will be skipped + * if the previous keyvalue had the same key as the current cell. This means filter already responded + * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL. + * @param cell the {@link Cell} to be tested for skipping + * @return true is current cell should be skipped, false otherwise + */ + private boolean skipCellVersion(Cell cell) { + return skipColumn != null + && skipColumn.matchingRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()) + && skipColumn.matchingColumn(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + if (skipCellVersion(cell)) { + return ReturnCode.NEXT_COL; + } + + ReturnCode code = filter.filterKeyValue(cell); + if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) { + // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL + skipColumn = KeyValue.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + } else { + skipColumn = null; + } + return code; + } + + @Override + public boolean filterRow() throws IOException { + return filter.filterRow(); + } + + @Override + public Cell transformCell(Cell cell) throws IOException { + return filter.transformCell(cell); + } + + @Override + public void reset() throws IOException { + filter.reset(); + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + return filter.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + return filter.filterAllRemaining(); + } + + @Override + public void filterRowCells(List<Cell> kvs) throws IOException { + filter.filterRowCells(kvs); + } + + @Override + public boolean hasFilterRow() { + return filter.hasFilterRow(); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + return filter.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + return filter.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + return filter.isFamilyEssential(name); + } + + @Override + public byte[] toByteArray() throws IOException { + return filter.toByteArray(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionFilters.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionFilters.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionFilters.java new file mode 100644 index 0000000..e6334ed --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionFilters.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.tephra.Transaction; + +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Factory class for providing {@link Filter} instances. + */ +public class TransactionFilters { + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + */ + public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, null)); + } + + /** + * Creates a new {@link org.apache.hadoop.hbase.filter.Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by + * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then + * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. + */ + public static Filter getVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { + return new CellSkipFilter(new TransactionVisibilityFilter(tx, ttlByFamily, allowEmptyValues, scanType, cellFilter)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java new file mode 100644 index 0000000..136dedc --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionProcessor.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import com.google.common.base.Supplier; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.OperationWithAttributes; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TransactionCodec; +import org.apache.tephra.TxConstants; +import org.apache.tephra.coprocessor.TransactionStateCache; +import org.apache.tephra.coprocessor.TransactionStateCacheSupplier; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; + +/** + * {@code org.apache.hadoop.hbase.coprocessor.RegionObserver} coprocessor that handles server-side processing + * for transactions: + * <ul> + * <li>applies filtering to exclude data from invalid and in-progress transactions</li> + * <li>overrides the scanner returned for flush and compaction to drop data written by invalidated transactions, + * or expired due to TTL.</li> + * </ul> + * + * <p>In order to use this coprocessor for transactions, configure the class on any table involved in transactions, + * or on all user tables by adding the following to hbase-site.xml: + * {@code + * <property> + * <name>hbase.coprocessor.region.classes</name> + * <value>TransactionProcessor</value> + * </property> + * } + * </p> + * + * <p>HBase {@code Get} and {@code Scan} operations should have the current transaction serialized on to the operation + * as an attribute: + * {@code + * Transaction t = ...; + * Get get = new Get(...); + * TransactionCodec codec = new TransactionCodec(); + * codec.addToOperation(get, t); + * } + * </p> + */ +public class TransactionProcessor extends BaseRegionObserver { + private static final Log LOG = LogFactory.getLog(TransactionProcessor.class); + + private TransactionStateCache cache; + private final TransactionCodec txCodec; + protected Map<byte[], Long> ttlByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + protected boolean allowEmptyValues = TxConstants.ALLOW_EMPTY_VALUES_DEFAULT; + protected boolean readNonTxnData = TxConstants.DEFAULT_READ_NON_TX_DATA; + + public TransactionProcessor() { + this.txCodec = new TransactionCodec(); + } + + /* RegionObserver implementation */ + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env); + this.cache = cacheSupplier.get(); + + HTableDescriptor tableDesc = env.getRegion().getTableDesc(); + for (HColumnDescriptor columnDesc : tableDesc.getFamilies()) { + String columnTTL = columnDesc.getValue(TxConstants.PROPERTY_TTL); + long ttl = 0; + if (columnTTL != null) { + try { + ttl = Long.parseLong(columnTTL); + LOG.info("Family " + columnDesc.getNameAsString() + " has TTL of " + columnTTL); + } catch (NumberFormatException nfe) { + LOG.warn("Invalid TTL value configured for column family " + columnDesc.getNameAsString() + + ", value = " + columnTTL); + } + } + ttlByFamily.put(columnDesc.getName(), ttl); + } + + this.allowEmptyValues = env.getConfiguration().getBoolean(TxConstants.ALLOW_EMPTY_VALUES_KEY, + TxConstants.ALLOW_EMPTY_VALUES_DEFAULT); + this.readNonTxnData = Boolean.valueOf(tableDesc.getValue(TxConstants.READ_NON_TX_DATA)); + if (readNonTxnData) { + LOG.info("Reading pre-existing data enabled for table " + tableDesc.getNameAsString()); + } + } + } + + protected Supplier<TransactionStateCache> getTransactionStateCacheSupplier(RegionCoprocessorEnvironment env) { + return new TransactionStateCacheSupplier(env.getConfiguration()); + } + + @Override + public void stop(CoprocessorEnvironment e) throws IOException { + // nothing to do + } + + @Override + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) + throws IOException { + Transaction tx = getFromOperation(get); + if (tx != null) { + projectFamilyDeletes(get); + get.setMaxVersions(); + get.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData), + TxUtils.getMaxVisibleTimestamp(tx)); + Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, get.getFilter()); + get.setFilter(newFilter); + } + } + + @Override + public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, + Durability durability) throws IOException { + // Translate deletes into our own delete tombstones + // Since HBase deletes cannot be undone, we need to translate deletes into special puts, which allows + // us to rollback the changes (by a real delete) if the transaction fails + + // Deletes that are part of a transaction rollback do not need special handling. + // They will never be rolled back, so are performed as normal HBase deletes. + if (delete.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null) { + return; + } + + // Other deletes are client-initiated and need to be translated into our own tombstones + // TODO: this should delegate to the DeleteStrategy implementation. + Put deleteMarkers = new Put(delete.getRow(), delete.getTimeStamp()); + for (byte[] family : delete.getFamilyCellMap().keySet()) { + List<Cell> familyCells = delete.getFamilyCellMap().get(family); + if (isFamilyDelete(familyCells)) { + deleteMarkers.add(family, TxConstants.FAMILY_DELETE_QUALIFIER, familyCells.get(0).getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); + } else { + int cellSize = familyCells.size(); + for (int i = 0; i < cellSize; i++) { + Cell cell = familyCells.get(i); + deleteMarkers.add(family, CellUtil.cloneQualifier(cell), cell.getTimestamp(), + HConstants.EMPTY_BYTE_ARRAY); + } + } + } + for (Map.Entry<String, byte[]> entry : delete.getAttributesMap().entrySet()) { + deleteMarkers.setAttribute(entry.getKey(), entry.getValue()); + } + e.getEnvironment().getRegion().put(deleteMarkers); + // skip normal delete handling + e.bypass(); + } + + private boolean isFamilyDelete(List<Cell> familyCells) { + return familyCells.size() == 1 && CellUtil.isDeleteFamily(familyCells.get(0)); + } + + @Override + public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) + throws IOException { + Transaction tx = getFromOperation(scan); + if (tx != null) { + projectFamilyDeletes(scan); + scan.setMaxVersions(); + scan.setTimeRange(TxUtils.getOldestVisibleTimestamp(ttlByFamily, tx, readNonTxnData), + TxUtils.getMaxVisibleTimestamp(tx)); + Filter newFilter = getTransactionFilter(tx, ScanType.USER_SCAN, scan.getFilter()); + scan.setFilter(newFilter); + } + return s; + } + + /** + * Ensures that family delete markers are present in the columns requested for any scan operation. + * @param scan The original scan request + * @return The modified scan request with the family delete qualifiers represented + */ + private Scan projectFamilyDeletes(Scan scan) { + for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) { + NavigableSet<byte[]> columns = entry.getValue(); + // wildcard scans will automatically include the delete marker, so only need to add it when we have + // explicit columns listed + if (columns != null && !columns.isEmpty()) { + scan.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER); + } + } + return scan; + } + + /** + * Ensures that family delete markers are present in the columns requested for any get operation. + * @param get The original get request + * @return The modified get request with the family delete qualifiers represented + */ + private Get projectFamilyDeletes(Get get) { + for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap().entrySet()) { + NavigableSet<byte[]> columns = entry.getValue(); + // wildcard scans will automatically include the delete marker, so only need to add it when we have + // explicit columns listed + if (columns != null && !columns.isEmpty()) { + get.addColumn(entry.getKey(), TxConstants.FAMILY_DELETE_QUALIFIER); + } + } + return get; + } + + @Override + public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + KeyValueScanner memstoreScanner, InternalScanner scanner) + throws IOException { + return createStoreScanner(c.getEnvironment(), "flush", cache.getLatestState(), store, + Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + HConstants.OLDEST_TIMESTAMP); + } + + @Override + public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, InternalScanner s, + CompactionRequest request) + throws IOException { + return createStoreScanner(c.getEnvironment(), "compaction", cache.getLatestState(), store, scanners, scanType, + earliestPutTs); + } + + protected InternalScanner createStoreScanner(RegionCoprocessorEnvironment env, String action, + TransactionVisibilityState snapshot, Store store, + List<? extends KeyValueScanner> scanners, ScanType type, + long earliestPutTs) throws IOException { + if (snapshot == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Region " + env.getRegion().getRegionNameAsString() + + ", no current transaction state found, defaulting to normal " + action + " scanner"); + } + return null; + } + + // construct a dummy transaction from the latest snapshot + Transaction dummyTx = TxUtils.createDummyTransaction(snapshot); + Scan scan = new Scan(); + // need to see all versions, since we filter out excludes and applications may rely on multiple versions + scan.setMaxVersions(); + scan.setFilter( + new IncludeInProgressFilter(dummyTx.getVisibilityUpperBound(), + snapshot.getInvalid(), + getTransactionFilter(dummyTx, type, null))); + + return new StoreScanner(store, store.getScanInfo(), scan, scanners, + type, store.getSmallestReadPoint(), earliestPutTs); + } + + private Transaction getFromOperation(OperationWithAttributes op) throws IOException { + byte[] encoded = op.getAttribute(TxConstants.TX_OPERATION_ATTRIBUTE_KEY); + if (encoded != null) { + return txCodec.decode(encoded); + } + return null; + } + + /** + * Derived classes can override this method to customize the filter used to return data visible for the current + * transaction. + * + * @param tx the current transaction to apply + * @param type the type of scan operation being performed + */ + protected Filter getTransactionFilter(Transaction tx, ScanType type, Filter filter) { + return TransactionFilters.getVisibilityFilter(tx, ttlByFamily, allowEmptyValues, type, filter); + } + + /** + * Filter used to include cells visible to in-progress transactions on flush and commit. + */ + static class IncludeInProgressFilter extends FilterBase { + private final long visibilityUpperBound; + private final Set<Long> invalidIds; + private final Filter txFilter; + + public IncludeInProgressFilter(long upperBound, Collection<Long> invalids, Filter transactionFilter) { + this.visibilityUpperBound = upperBound; + this.invalidIds = Sets.newHashSet(invalids); + this.txFilter = transactionFilter; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + // include all cells visible to in-progress transactions, except for those already marked as invalid + long ts = cell.getTimestamp(); + if (ts > visibilityUpperBound) { + // include everything that could still be in-progress except invalids + if (invalidIds.contains(ts)) { + return ReturnCode.SKIP; + } + return ReturnCode.INCLUDE; + } + return txFilter.filterKeyValue(cell); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java new file mode 100644 index 0000000..d675db7 --- /dev/null +++ b/tephra-hbase-compat-0.96/src/main/java/org/apache/tephra/hbase96/coprocessor/TransactionVisibilityFilter.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.hbase96.coprocessor; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.tephra.Transaction; +import org.apache.tephra.TxConstants; +import org.apache.tephra.util.TxUtils; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +/** + * Applies filtering of data based on transactional visibility (HBase 0.96+ specific version). + * Note: this is intended for server-side use only, as additional properties need to be set on + * any {@code Scan} or {@code Get} operation performed. + */ +public class TransactionVisibilityFilter extends FilterBase { + private final Transaction tx; + // oldest visible timestamp by column family, used to apply TTL when reading + private final Map<byte[], Long> oldestTsByFamily; + // if false, empty values will be interpreted as deletes + private final boolean allowEmptyValues; + // whether or not we can remove delete markers + // these can only be safely removed when we are traversing all storefiles + private final boolean clearDeletes; + // optional sub-filter to apply to visible cells + private final Filter cellFilter; + + // since we traverse KVs in order, cache the current oldest TS to avoid map lookups per KV + private byte[] currentFamily = new byte[0]; + private long currentOldestTs; + + private DeleteTracker deleteTracker = new DeleteTracker(); + + /** + * Creates a new {@link Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + */ + public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType) { + this(tx, ttlByFamily, allowEmptyValues, scanType, null); + } + + /** + * Creates a new {@link Filter} for returning data only from visible transactions. + * + * @param tx the current transaction to apply. Only data visible to this transaction will be returned. + * @param ttlByFamily map of time-to-live (TTL) (in milliseconds) by column family name + * @param allowEmptyValues if {@code true} cells with empty {@code byte[]} values will be returned, if {@code false} + * these will be interpreted as "delete" markers and the column will be filtered out + * @param scanType the type of scan operation being performed + * @param cellFilter if non-null, this filter will be applied to all cells visible to the current transaction, by + * calling {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)}. If null, then + * {@link Filter.ReturnCode#INCLUDE_AND_NEXT_COL} will be returned instead. + */ + public TransactionVisibilityFilter(Transaction tx, Map<byte[], Long> ttlByFamily, boolean allowEmptyValues, + ScanType scanType, @Nullable Filter cellFilter) { + this.tx = tx; + this.oldestTsByFamily = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], Long> ttlEntry : ttlByFamily.entrySet()) { + long familyTTL = ttlEntry.getValue(); + oldestTsByFamily.put(ttlEntry.getKey(), + familyTTL <= 0 ? 0 : tx.getVisibilityUpperBound() - familyTTL * TxConstants.MAX_TX_PER_MS); + } + this.allowEmptyValues = allowEmptyValues; + this.clearDeletes = + scanType == ScanType.COMPACT_DROP_DELETES || + (scanType == ScanType.USER_SCAN && tx.getVisibilityLevel() != Transaction.VisibilityLevel.SNAPSHOT_ALL); + this.cellFilter = cellFilter; + } + + @Override + public ReturnCode filterKeyValue(Cell cell) throws IOException { + if (!CellUtil.matchingFamily(cell, currentFamily)) { + // column family changed + currentFamily = CellUtil.cloneFamily(cell); + Long familyOldestTs = oldestTsByFamily.get(currentFamily); + currentOldestTs = familyOldestTs != null ? familyOldestTs : 0; + deleteTracker.reset(); + } + // need to apply TTL for the column family here + long kvTimestamp = cell.getTimestamp(); + if (TxUtils.getTimestampForTTL(kvTimestamp) < currentOldestTs) { + // passed TTL for this column, seek to next + return ReturnCode.NEXT_COL; + } else if (tx.isVisible(kvTimestamp)) { + // Return all writes done by current transaction (including deletes) for VisibilityLevel.SNAPSHOT_ALL + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL && tx.isCurrentWrite(kvTimestamp)) { + // cell is visible + // visibility SNAPSHOT_ALL needs all matches + return runSubFilter(ReturnCode.INCLUDE, cell); + } + if (DeleteTracker.isFamilyDelete(cell)) { + deleteTracker.addFamilyDelete(cell); + if (clearDeletes) { + return ReturnCode.NEXT_COL; + } else { + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // check if masked by family delete + if (deleteTracker.isDeleted(cell)) { + return ReturnCode.NEXT_COL; + } + // check for column delete + if (isColumnDelete(cell)) { + if (clearDeletes) { + // skip "deleted" cell + return ReturnCode.NEXT_COL; + } else { + // keep the marker but skip any remaining versions + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } + } + // cell is visible + // as soon as we find a KV to include we can move to the next column + return runSubFilter(ReturnCode.INCLUDE_AND_NEXT_COL, cell); + } else { + return ReturnCode.SKIP; + } + } + + private ReturnCode runSubFilter(ReturnCode txFilterCode, Cell cell) throws IOException { + if (cellFilter != null) { + ReturnCode subFilterCode = cellFilter.filterKeyValue(cell); + return determineReturnCode(txFilterCode, subFilterCode); + } + return txFilterCode; + } + + /** + * Determines the return code of TransactionVisibilityFilter based on sub-filter's return code. + * Sub-filter can only exclude cells included by TransactionVisibilityFilter, i.e., sub-filter's + * INCLUDE will be ignored. This behavior makes sure that sub-filter only sees cell versions valid for the + * given transaction. If sub-filter needs to see older versions of cell, then this method can be overridden. + * + * @param txFilterCode return code from TransactionVisibilityFilter + * @param subFilterCode return code from sub-filter + * @return final return code + */ + protected ReturnCode determineReturnCode(ReturnCode txFilterCode, ReturnCode subFilterCode) { + // Return the more restrictive of the two filter responses + switch (subFilterCode) { + case INCLUDE: + return txFilterCode; + case INCLUDE_AND_NEXT_COL: + return ReturnCode.INCLUDE_AND_NEXT_COL; + case SKIP: + return txFilterCode == ReturnCode.INCLUDE ? ReturnCode.SKIP : ReturnCode.NEXT_COL; + default: + return subFilterCode; + } + } + + @Override + public boolean filterRow() throws IOException { + if (cellFilter != null) { + return cellFilter.filterRow(); + } + return super.filterRow(); + } + + @Override + public Cell transformCell(Cell cell) throws IOException { + // Convert Tephra deletes back into HBase deletes + if (tx.getVisibilityLevel() == Transaction.VisibilityLevel.SNAPSHOT_ALL) { + if (DeleteTracker.isFamilyDelete(cell)) { + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), null, cell.getTimestamp(), + KeyValue.Type.DeleteFamily); + } else if (isColumnDelete(cell)) { + // Note: in some cases KeyValue.Type.Delete is used in Delete object, + // and in some other cases KeyValue.Type.DeleteColumn is used. + // Since Tephra cannot distinguish between the two, we return KeyValue.Type.DeleteColumn. + // KeyValue.Type.DeleteColumn makes both CellUtil.isDelete and CellUtil.isDeleteColumns return true, and will + // work in both cases. + return new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cell.getTimestamp(), KeyValue.Type.DeleteColumn); + } + } + return cell; + } + + @Override + public void reset() throws IOException { + deleteTracker.reset(); + if (cellFilter != null) { + cellFilter.reset(); + } + } + + @Override + public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { + if (cellFilter != null) { + return cellFilter.filterRowKey(buffer, offset, length); + } + return super.filterRowKey(buffer, offset, length); + } + + @Override + public boolean filterAllRemaining() throws IOException { + if (cellFilter != null) { + return cellFilter.filterAllRemaining(); + } + return super.filterAllRemaining(); + } + + @Override + public void filterRowCells(List<Cell> kvs) throws IOException { + if (cellFilter != null) { + cellFilter.filterRowCells(kvs); + } else { + super.filterRowCells(kvs); + } + } + + @Override + public boolean hasFilterRow() { + if (cellFilter != null) { + return cellFilter.hasFilterRow(); + } + return super.hasFilterRow(); + } + + @SuppressWarnings("deprecation") + @Override + public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextKeyHint(currentKV); + } + return super.getNextKeyHint(currentKV); + } + + @Override + public Cell getNextCellHint(Cell currentKV) throws IOException { + if (cellFilter != null) { + return cellFilter.getNextCellHint(currentKV); + } + return super.getNextCellHint(currentKV); + } + + @Override + public boolean isFamilyEssential(byte[] name) throws IOException { + if (cellFilter != null) { + return cellFilter.isFamilyEssential(name); + } + return super.isFamilyEssential(name); + } + + @Override + public byte[] toByteArray() throws IOException { + return super.toByteArray(); + } + + private boolean isColumnDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && cell.getValueLength() == 0 && !allowEmptyValues; + } + + private static final class DeleteTracker { + private long familyDeleteTs; + + public static boolean isFamilyDelete(Cell cell) { + return !TxUtils.isPreExistingVersion(cell.getTimestamp()) && + CellUtil.matchingQualifier(cell, TxConstants.FAMILY_DELETE_QUALIFIER) && + CellUtil.matchingValue(cell, HConstants.EMPTY_BYTE_ARRAY); + } + + public void addFamilyDelete(Cell delete) { + this.familyDeleteTs = delete.getTimestamp(); + } + + public boolean isDeleted(Cell cell) { + return cell.getTimestamp() <= familyDeleteTs; + } + + public void reset() { + this.familyDeleteTs = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/HBase96ConfigurationProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/HBase96ConfigurationProviderTest.java b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/HBase96ConfigurationProviderTest.java deleted file mode 100644 index a1a2e97..0000000 --- a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/HBase96ConfigurationProviderTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.hbase96; - -import co.cask.tephra.util.AbstractConfigurationProviderTest; -import co.cask.tephra.util.HBaseVersion; - -/** - * Test for HBase 0.96 version specific behavior. - */ -public class HBase96ConfigurationProviderTest extends AbstractConfigurationProviderTest { - @Override - protected HBaseVersion.Version getExpectedVersion() { - return HBaseVersion.Version.HBASE_96; - } -}
