Github user ohadshacham commented on a diff in the pull request: https://github.com/apache/incubator-omid/pull/41#discussion_r206862264 --- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.base.Optional; +import com.sun.istack.Nullable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.*; + +public class TransactionVisibilityFilter extends FilterBase { + + // optional sub-filter to apply to visible cells + private final Filter userFilter; + private final SnapshotFilterImpl snapshotFilter; + private final Map<Long ,Long> shadowCellCache; + private final HBaseTransaction hbaseTransaction; + private final Map<String, Long> familyDeletionCache; + + public SnapshotFilter getSnapshotFilter() { + return snapshotFilter; + } + + public TransactionVisibilityFilter(@Nullable Filter cellFilter, + SnapshotFilterImpl snapshotFilter, + HBaseTransaction hbaseTransaction) { + this.userFilter = cellFilter; + this.snapshotFilter = snapshotFilter; + shadowCellCache = new HashMap<>(); + this.hbaseTransaction = hbaseTransaction; + familyDeletionCache = new HashMap<String, Long>(); + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + if (CellUtils.isShadowCell(v)) { + Long commitTs = Bytes.toLong(CellUtil.cloneValue(v)); + shadowCellCache.put(v.getTimestamp(), commitTs); + // Continue getting shadow cells until one of them fits this transaction + if (hbaseTransaction.getStartTimestamp() >= commitTs) { + return ReturnCode.NEXT_COL; + } else { + return ReturnCode.SKIP; + } + } else if (CellUtils.isFamilyDeleteCell(v)) { + //Delete is part of this transaction + if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp()); + return ReturnCode.NEXT_COL; + } + + if (shadowCellCache.containsKey(v.getTimestamp()) && + hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp())); + return ReturnCode.NEXT_COL; + } + + // Try to get shadow cell from region + final Get get = new Get(CellUtil.cloneRow(v)); + get.setTimeStamp(v.getTimestamp()).setMaxVersions(1); + get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER)); + Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get); + + if (!deleteFamilySC.isEmpty() && + Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){ + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0]))); + return ReturnCode.NEXT_COL; + } + + //At last go to commit table + Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(), + v, shadowCellCache); + if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get()); + return ReturnCode.NEXT_COL; + } + + // Continue getting the next version of the delete family, + // until we get one in the snapshot or move to next cell + return ReturnCode.SKIP; + } + + if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v))) + && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) { + return ReturnCode.NEXT_COL; + } + + if (isCellInSnapshot(v)) { + + if (CellUtils.isTombstone(v)) { + return ReturnCode.NEXT_COL; --- End diff -- What about tombstones in snapshot_all? I assume it should be SKIP.
---