Github user anew commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/20#discussion_r90760015
  
    --- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
    @@ -0,0 +1,289 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +package org.apache.tephra.hbase.coprocessor.janitor;
    +
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterables;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.HTableDescriptor;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.client.Admin;
    +import org.apache.hadoop.hbase.client.Connection;
    +import org.apache.hadoop.hbase.client.ConnectionFactory;
    +import org.apache.hadoop.hbase.client.Table;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.tephra.TxConstants;
    +import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
    +import org.apache.tephra.janitor.TransactionPruningPlugin;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.IOException;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.SortedSet;
    +import java.util.TreeSet;
    +
    +/**
    + * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
    + *
    + * This plugin determines the prune upper bound for transactional HBase 
tables that use
    + * coprocessor {@link TransactionProcessor}.
    + *
    + * <h3>State storage:</h3>
    + *
    + * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
    + * after every major compaction of a region. Let's call this <i>(region, 
prune upper bound)</i>.
    + * In addition, the plugin also persists the following information on a 
run at time <i>t</i>
    + * <ul>
    + *   <li>
    + *     <i>(t, set of regions)</i>: Set of transactional regions at time 
<i>t</i>.
    + *     Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
    + *     attached to them.
    + *   </li>
    + *   <li>
    + *     <i>(t, prune upper bound)</i>: This is the smallest not in-progress 
transaction that
    + *     will not have writes in any HBase regions that are created after 
time <i>t</i>.
    + *     This value is determined by the Transaction Service based on the 
transaction state at time <i>t</i>
    + *     and passed on to the plugin.
    + *   </li>
    + * </ul>
    + *
    + * <h3>Computing prune upper bound:</h3>
    + *
    + * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
    + * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
    + * Since the prune upper bound will get recorded for a region only after a 
major compaction,
    + * using only the latest set of regions we may not be able to find the
    + * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
    + * of each run of the plugin, and use historical region set for time 
<i>t</i>, <i>t - 1</i>, etc.
    + * to determine the prune upper bound.
    + *
    + * From the regions saved at time <i>t</i>, <i>t - 1</i>, etc.,
    + * the plugin tries to find the latest <i>(t, set of regions)</i> where 
all regions have been major compacted,
    + * i.e, all regions have prune upper bound recorded in <i>(region, prune 
upper bound)</i>.
    + * <br/>
    + * If such a set is found for time <i>t1</i>, the prune upper bound 
returned by the plugin is the minimum of
    + * <ul>
    + *   <li>Prune upper bounds of regions in set <i>(t1, set of 
regions)</i></li>
    + *   <li>Prune upper bound from <i>(t1, prune upper bound)</i></li>
    + * </ul>
    + *
    + * <p/>
    + * Above, when we find <i>(t1, set of regions)</i>, there may a region 
that was created after time <i>t1</i>,
    + * but has a data write from an invalid transaction that is smaller than 
the prune upper bounds of all
    + * regions in <i>(t1, set of regions)</i>. This is possible because 
<i>(region, prune upper bound)</i> persisted by
    + * TransactionProcessor is always the latest prune upper bound for a 
region.
    + * <br/>
    + * However a region created after time <i>t1</i> cannot have writes from 
an invalid transaction that is smaller than
    + * <i>min(max(invalid list), min(in-progress list) - 1)</i> at the time 
the region was created.
    + * Since we limit the plugin prune upper bound using <i>(t1, prune upper 
bound)</i>, there should be no invalid
    + * transactions smaller than the plugin prune upper bound with writes in 
any transactional region of
    + * this HBase instance.
    + *
    + * <p/>
    + * Note: If your tables uses a transactional coprocessor other than 
TransactionProcessor,
    + * then you may need to write a new plugin to compute prune upper bound 
for those tables.
    + */
    +@SuppressWarnings("WeakerAccess")
    +public class HBaseTransactionPruningPlugin implements 
TransactionPruningPlugin {
    +  public static final Logger LOG = 
LoggerFactory.getLogger(HBaseTransactionPruningPlugin.class);
    +
    +  protected Configuration conf;
    +  protected Connection connection;
    +  protected DataJanitorState dataJanitorState;
    +
    +  @Override
    +  public void initialize(Configuration conf) throws IOException {
    +    this.conf = conf;
    +    this.connection = ConnectionFactory.createConnection(conf);
    +
    +    final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
    +                                                            
TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
    +    LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
    +    this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
    +      @Override
    +      public Table get() throws IOException {
    +        return connection.getTable(stateTable);
    +      }
    +    });
    +  }
    +
    +  /**
    +   * Determines prune upper bound for the data store as mentioned above.
    +   */
    +  @Override
    +  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) 
throws IOException {
    +    LOG.debug("Fetching prune upper bound for time {} and max prune upper 
bound {}", time, pruneUpperBoundForTime);
    +    if (time < 0 || pruneUpperBoundForTime < 0) {
    +      return -1;
    +    }
    +
    +    // Get all the current transactional regions
    +    SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
    +    if (!transactionalRegions.isEmpty()) {
    +      LOG.debug("Saving {} transactional regions for time {}", 
transactionalRegions.size(), time);
    +      dataJanitorState.saveRegionsForTime(time, transactionalRegions);
    +      // Save prune upper bound for time as the final step.
    +      // We can then use its existence to make sure that the data for a 
given time is complete or not
    +      LOG.debug("Saving max prune upper bound {} for time {}", 
pruneUpperBoundForTime, time);
    +      dataJanitorState.savePruneUpperBoundForTime(time, 
pruneUpperBoundForTime);
    +    }
    +
    +    return computePruneUpperBound(new TimeRegions(time, 
transactionalRegions));
    +  }
    +
    +  /**
    +   * After invalid list has been pruned, this cleans up state information 
that is no longer required.
    +   * This includes -
    +   * <ul>
    +   *   <li>
    +   *     <i>(region, prune upper bound)</i> - prune upper bound for 
regions that are older
    +   *     than maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     <i>(t, set of regions) - Regions set that were recorded on or 
before the start time
    +   *     of maxPrunedInvalid
    +   *   </li>
    +   *   <li>
    +   *     (t, prune upper bound) - Smallest not in-progress transaction 
without any writes in new regions
    +   *     information recorded on or before the start time of 
maxPrunedInvalid
    +   *   </li>
    +   * </ul>
    +   */
    +  @Override
    +  public void pruneComplete(long time, long maxPrunedInvalid) throws 
IOException {
    +    LOG.debug("Prune complete for time {} and prune upper bound {}", time, 
maxPrunedInvalid);
    +    if (time < 0 || maxPrunedInvalid < 0) {
    +      return;
    +    }
    +
    +    // Get regions for given time, so as to not delete them
    +    TimeRegions regionsToExclude = 
dataJanitorState.getRegionsOnOrBeforeTime(time);
    +    if (regionsToExclude != null) {
    +      LOG.debug("Deleting stale region - prune upper bound record before 
{}", maxPrunedInvalid);
    +      
dataJanitorState.deleteRegionsWithPruneUpperBoundBefore(maxPrunedInvalid, 
regionsToExclude.getRegions());
    +    } else {
    +      LOG.warn("Cannot find saved regions on or before time {}", time);
    +    }
    +    long pruneTime = maxPrunedInvalid / TxConstants.MAX_TX_PER_MS;
    --- End diff --
    
    It seems that there should be a helper method to extract the time stamp 
from a transaction id?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to