Github user ohadshacham commented on a diff in the pull request:

    https://github.com/apache/incubator-omid/pull/41#discussion_r206866085
  
    --- Diff: 
hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
 ---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.omid.transaction;
    +
    +import com.google.common.base.Optional;
    +import com.sun.istack.Nullable;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.CellUtil;
    +import org.apache.hadoop.hbase.KeyValue;
    +import org.apache.hadoop.hbase.client.Get;
    +import org.apache.hadoop.hbase.client.Result;
    +import org.apache.hadoop.hbase.filter.Filter;
    +import org.apache.hadoop.hbase.filter.FilterBase;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +import java.io.IOException;
    +import java.util.*;
    +
    +public class TransactionVisibilityFilter extends FilterBase {
    +
    +    // optional sub-filter to apply to visible cells
    +    private final Filter userFilter;
    +    private final SnapshotFilterImpl snapshotFilter;
    +    private final Map<Long ,Long> shadowCellCache;
    +    private final HBaseTransaction hbaseTransaction;
    +    private final Map<String, Long> familyDeletionCache;
    +
    +    public SnapshotFilter getSnapshotFilter() {
    +        return snapshotFilter;
    +    }
    +
    +    public TransactionVisibilityFilter(@Nullable Filter cellFilter,
    +                                       SnapshotFilterImpl snapshotFilter,
    +                                       HBaseTransaction hbaseTransaction) {
    +        this.userFilter = cellFilter;
    +        this.snapshotFilter = snapshotFilter;
    +        shadowCellCache = new HashMap<>();
    +        this.hbaseTransaction = hbaseTransaction;
    +        familyDeletionCache = new HashMap<String, Long>();
    +    }
    +
    +    @Override
    +    public ReturnCode filterKeyValue(Cell v) throws IOException {
    +        if (CellUtils.isShadowCell(v)) {
    +            Long commitTs =  Bytes.toLong(CellUtil.cloneValue(v));
    +            shadowCellCache.put(v.getTimestamp(), commitTs);
    +            // Continue getting shadow cells until one of them fits this 
transaction
    +            if (hbaseTransaction.getStartTimestamp() >= commitTs) {
    +                return ReturnCode.NEXT_COL;
    +            } else {
    +                return ReturnCode.SKIP;
    +            }
    +        } else if (CellUtils.isFamilyDeleteCell(v)) {
    +            //Delete is part of this transaction
    +            if (snapshotFilter.getTSIfInTransaction(v, 
hbaseTransaction).isPresent()) {
    +                
familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), 
v.getTimestamp());
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            if (shadowCellCache.containsKey(v.getTimestamp()) &&
    +                    hbaseTransaction.getStartTimestamp() >= 
shadowCellCache.get(v.getTimestamp())) {
    +                
familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), 
shadowCellCache.get(v.getTimestamp()));
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            // Try to get shadow cell from region
    +            final Get get = new Get(CellUtil.cloneRow(v));
    +            get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
    +            get.addColumn(CellUtil.cloneFamily(v), 
CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
    +            Result deleteFamilySC = 
snapshotFilter.getTableAccessWrapper().get(get);
    +
    +            if (!deleteFamilySC.isEmpty() &&
    +                    
Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < 
hbaseTransaction.getStartTimestamp()){
    +                
familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), 
Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            //At last go to commit table
    +            Optional<Long> commitTimestamp = 
snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
    +                    v, shadowCellCache);
    +            if (commitTimestamp.isPresent() && 
hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
    +                
familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), 
commitTimestamp.get());
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            // Continue getting the next version of the delete family,
    +            // until we get one in the snapshot or move to next cell
    +            return ReturnCode.SKIP;
    +        }
    +
    +        if 
(familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
    +                && 
familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= 
v.getTimestamp()) {
    +            return ReturnCode.NEXT_COL;
    +        }
    +
    +        if (isCellInSnapshot(v)) {
    +
    +            if (CellUtils.isTombstone(v)) {
    +                return ReturnCode.NEXT_COL;
    +            }
    +
    +            if (hbaseTransaction.getVisibilityLevel() == 
AbstractTransaction.VisibilityLevel.SNAPSHOT) {
    +                return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
    +            } else if (hbaseTransaction.getVisibilityLevel() == 
AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
    +                if (snapshotFilter.getTSIfInTransaction(v, 
hbaseTransaction).isPresent()) {
    +                    return runUserFilter(v, ReturnCode.INCLUDE);
    +                } else {
    +                    return runUserFilter(v, 
ReturnCode.INCLUDE_AND_NEXT_COL);
    +                }
    +            }
    +        }
    +        return ReturnCode.SKIP;
    +    }
    +
    +
    +    private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
    +            throws IOException {
    +
    +        if (userFilter == null) {
    +            return snapshotReturn;
    +        }
    +
    +        ReturnCode userRes = userFilter.filterKeyValue(v);
    +        switch (userRes) {
    +            case INCLUDE:
    +                return snapshotReturn;
    +            case SKIP:
    +                return (snapshotReturn == ReturnCode.INCLUDE) ? 
ReturnCode.SKIP: ReturnCode.NEXT_COL;
    +            default:
    +                return userRes;
    +        }
    +
    +    }
    +
    +
    +    private boolean isCellInSnapshot(Cell v) throws IOException {
    +        if (shadowCellCache.containsKey(v.getTimestamp()) &&
    --- End diff --
    
    two collection calls.


---

Reply via email to