[ https://issues.apache.org/jira/browse/OMID-102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565273#comment-16565273 ]
ASF GitHub Bot commented on OMID-102: ------------------------------------- Github user ohadshacham commented on a diff in the pull request: https://github.com/apache/incubator-omid/pull/41#discussion_r206861622 --- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.omid.transaction; + +import com.google.common.base.Optional; +import com.sun.istack.Nullable; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.*; + +public class TransactionVisibilityFilter extends FilterBase { + + // optional sub-filter to apply to visible cells + private final Filter userFilter; + private final SnapshotFilterImpl snapshotFilter; + private final Map<Long ,Long> shadowCellCache; + private final HBaseTransaction hbaseTransaction; + private final Map<String, Long> familyDeletionCache; + + public SnapshotFilter getSnapshotFilter() { + return snapshotFilter; + } + + public TransactionVisibilityFilter(@Nullable Filter cellFilter, + SnapshotFilterImpl snapshotFilter, + HBaseTransaction hbaseTransaction) { + this.userFilter = cellFilter; + this.snapshotFilter = snapshotFilter; + shadowCellCache = new HashMap<>(); + this.hbaseTransaction = hbaseTransaction; + familyDeletionCache = new HashMap<String, Long>(); + } + + @Override + public ReturnCode filterKeyValue(Cell v) throws IOException { + if (CellUtils.isShadowCell(v)) { + Long commitTs = Bytes.toLong(CellUtil.cloneValue(v)); + shadowCellCache.put(v.getTimestamp(), commitTs); + // Continue getting shadow cells until one of them fits this transaction + if (hbaseTransaction.getStartTimestamp() >= commitTs) { + return ReturnCode.NEXT_COL; + } else { + return ReturnCode.SKIP; + } + } else if (CellUtils.isFamilyDeleteCell(v)) { + //Delete is part of this transaction + if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp()); + return ReturnCode.NEXT_COL; + } + + if (shadowCellCache.containsKey(v.getTimestamp()) && + hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp())); + return ReturnCode.NEXT_COL; + } + + // Try to get shadow cell from region + final Get get = new Get(CellUtil.cloneRow(v)); + get.setTimeStamp(v.getTimestamp()).setMaxVersions(1); + get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER)); + Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get); + + if (!deleteFamilySC.isEmpty() && + Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){ + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0]))); + return ReturnCode.NEXT_COL; + } + + //At last go to commit table + Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(), + v, shadowCellCache); + if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) { + familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get()); + return ReturnCode.NEXT_COL; + } + + // Continue getting the next version of the delete family, + // until we get one in the snapshot or move to next cell + return ReturnCode.SKIP; + } + + if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v))) + && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) { + return ReturnCode.NEXT_COL; + } + + if (isCellInSnapshot(v)) { + + if (CellUtils.isTombstone(v)) { + return ReturnCode.NEXT_COL; + } + + if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT) { + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); + } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) { + if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) { + return runUserFilter(v, ReturnCode.INCLUDE); + } else { + return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL); --- End diff -- I would add a comment that since v is in snapshot and not in transaction then it is the last result for SNAPSHOT_ALL. > Implement visibility filter as pure HBase Filter > ------------------------------------------------ > > Key: OMID-102 > URL: https://issues.apache.org/jira/browse/OMID-102 > Project: Apache Omid > Issue Type: Sub-task > Reporter: James Taylor > Assignee: Yonatan Gottesman > Priority: Major > > The way Omid currently filters through it's own RegionScanner won't work the > way it's implemented (i.e. the way the filtering is done *after* the next > call). The reason is that the state of HBase filters get messed up since > these filters will start to see cells that it shouldn't (i.e. cells that > would be filtered based on snapshot isolation). It cannot be worked around by > manually running filters afterwards because filters may issue seek calls > which are handled during the running of scans by HBase. > > Instead, the filtering needs to be implemented as a pure HBase filter and > that filter needs to delegate to the other, delegate filter once it's > determined that the cell is visible. See Tephra's TransactionVisibilityFilter > and they way it calls the delegate filter (cellFilters) only after it's > determined that the cell is visible. You may run into TEPHRA-169 without > including the CellSkipFilter too. > Because it'll be easier if you see shadow cells *before* their corresponding > real cells you can prefix instead of suffix the column qualifiers to > guarantee that you'd see the shadow cells prior to the actual cells. Or you > could buffer cells in your filter prior to omitting them. Another issue would > be if the shadow cells aren't found and you need to consult the commit table > - I suppose if the shadow cells are first, this logic would be easier to know > when it needs to be called. > > To reproduce, see the Phoenix unit tests > FlappingTransactionIT.testInflightUpdateNotSeen() and > testInflightDeleteNotSeen(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)