http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java new file mode 100644 index 0000000..a94a7e4 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseStoreManager.java @@ -0,0 +1,935 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.thinkaurelius.titan.diskstorage.Backend; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigElement; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.CustomizeStoreKCVSManager; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediators; +import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.BiMap; +import com.google.common.collect.ImmutableBiMap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.thinkaurelius.titan.core.TitanException; +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.Entry; +import com.thinkaurelius.titan.diskstorage.PermanentBackendException; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.TemporaryBackendException; +import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigNamespace; +import com.thinkaurelius.titan.diskstorage.configuration.ConfigOption; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRange; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StandardStoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.diskstorage.util.BufferUtil; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.graphdb.configuration.PreInitializeConfigOptions; +import com.thinkaurelius.titan.util.system.IOUtils; +import com.thinkaurelius.titan.util.system.NetworkUtil; + +/** + * Storage Manager for HBase + * + * @author Dan LaRocque <[email protected]> + */ +@PreInitializeConfigOptions +public class HBaseStoreManager extends DistributedStoreManager implements KeyColumnValueStoreManager, CustomizeStoreKCVSManager { + + private static final Logger logger = LoggerFactory.getLogger(HBaseStoreManager.class); + + public static final ConfigNamespace HBASE_NS = + new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, "hbase", "HBase storage options"); + + public static final ConfigOption<Boolean> SHORT_CF_NAMES = + new ConfigOption<Boolean>(HBASE_NS, "short-cf-names", + "Whether to shorten the names of Titan's column families to one-character mnemonics " + + "to conserve storage space", ConfigOption.Type.FIXED, true); + + public static final String COMPRESSION_DEFAULT = "-DEFAULT-"; + + public static final ConfigOption<String> COMPRESSION = + new ConfigOption<String>(HBASE_NS, "compression-algorithm", + "An HBase Compression.Algorithm enum string which will be applied to newly created column families. " + + "The compression algorithm must be installed and available on the HBase cluster. Titan cannot install " + + "and configure new compression algorithms on the HBase cluster by itself.", + ConfigOption.Type.MASKABLE, "GZ"); + + public static final ConfigOption<Boolean> SKIP_SCHEMA_CHECK = + new ConfigOption<Boolean>(HBASE_NS, "skip-schema-check", + "Assume that Titan's HBase table and column families already exist. " + + "When this is true, Titan will not check for the existence of its table/CFs, " + + "nor will it attempt to create them under any circumstances. This is useful " + + "when running Titan without HBase admin privileges.", + ConfigOption.Type.MASKABLE, false); + + public static final ConfigOption<String> HBASE_TABLE = + new ConfigOption<String>(HBASE_NS, "table", + "The name of the table Titan will use. When " + ConfigElement.getPath(SKIP_SCHEMA_CHECK) + + " is false, Titan will automatically create this table if it does not already exist.", + ConfigOption.Type.LOCAL, "titan"); + + /** + * Related bug fixed in 0.98.0, 0.94.7, 0.95.0: + * + * https://issues.apache.org/jira/browse/HBASE-8170 + */ + public static final int MIN_REGION_COUNT = 3; + + /** + * The total number of HBase regions to create with Titan's table. This + * setting only effects table creation; this normally happens just once when + * Titan connects to an HBase backend for the first time. + */ + public static final ConfigOption<Integer> REGION_COUNT = + new ConfigOption<Integer>(HBASE_NS, "region-count", + "The number of initial regions set when creating Titan's HBase table", + ConfigOption.Type.MASKABLE, Integer.class, new Predicate<Integer>() { + @Override + public boolean apply(Integer input) { + return null != input && MIN_REGION_COUNT <= input; + } + } + ); + + /** + * This setting is used only when {@link #REGION_COUNT} is unset. + * <p/> + * If Titan's HBase table does not exist, then it will be created with total + * region count = (number of servers reported by ClusterStatus) * (this + * value). + * <p/> + * The Apache HBase manual suggests an order-of-magnitude range of potential + * values for this setting: + * + * <ul> + * <li> + * <a href="https://hbase.apache.org/book/important_configurations.html#disable.splitting">2.5.2.7. Managed Splitting</a>: + * <blockquote> + * What's the optimal number of pre-split regions to create? Mileage will + * vary depending upon your application. You could start low with 10 + * pre-split regions / server and watch as data grows over time. It's + * better to err on the side of too little regions and rolling split later. + * </blockquote> + * </li> + * <li> + * <a href="https://hbase.apache.org/book/regions.arch.html">9.7 Regions</a>: + * <blockquote> + * In general, HBase is designed to run with a small (20-200) number of + * relatively large (5-20Gb) regions per server... Typically you want to + * keep your region count low on HBase for numerous reasons. Usually + * right around 100 regions per RegionServer has yielded the best results. + * </blockquote> + * </li> + * </ul> + * + * These considerations may differ for other HBase implementations (e.g. MapR). + */ + public static final ConfigOption<Integer> REGIONS_PER_SERVER = + new ConfigOption<Integer>(HBASE_NS, "regions-per-server", + "The number of regions per regionserver to set when creating Titan's HBase table", + ConfigOption.Type.MASKABLE, Integer.class); + + /** + * If this key is present in either the JVM system properties or the process + * environment (checked in the listed order, first hit wins), then its value + * must be the full package and class name of an implementation of + * {@link HBaseCompat} that has a no-arg public constructor. + * <p> + * When this <b>is not</b> set, Titan attempts to automatically detect the + * HBase runtime version by calling {@link VersionInfo#getVersion()}. Titan + * then checks the returned version string against a hard-coded list of + * supported version prefixes and instantiates the associated compat layer + * if a match is found. + * <p> + * When this <b>is</b> set, Titan will not call + * {@code VersionInfo.getVersion()} or read its hard-coded list of supported + * version prefixes. Titan will instead attempt to instantiate the class + * specified (via the no-arg constructor which must exist) and then attempt + * to cast it to HBaseCompat and use it as such. Titan will assume the + * supplied implementation is compatible with the runtime HBase version and + * make no attempt to verify that assumption. + * <p> + * Setting this key incorrectly could cause runtime exceptions at best or + * silent data corruption at worst. This setting is intended for users + * running exotic HBase implementations that don't support VersionInfo or + * implementations which return values from {@code VersionInfo.getVersion()} + * that are inconsistent with Apache's versioning convention. It may also be + * useful to users who want to run against a new release of HBase that Titan + * doesn't yet officially support. + * + */ + public static final ConfigOption<String> COMPAT_CLASS = + new ConfigOption<String>(HBASE_NS, "compat-class", + "The package and class name of the HBaseCompat implementation. HBaseCompat masks version-specific HBase API differences. " + + "When this option is unset, Titan calls HBase's VersionInfo.getVersion() and loads the matching compat class " + + "at runtime. Setting this option forces Titan to instead reflectively load and instantiate the specified class.", + ConfigOption.Type.MASKABLE, String.class); + + public static final int PORT_DEFAULT = 9160; + + public static final Timestamps PREFERRED_TIMESTAMPS = Timestamps.MILLI; + + public static final ConfigNamespace HBASE_CONFIGURATION_NAMESPACE = + new ConfigNamespace(HBASE_NS, "ext", "Overrides for hbase-{site,default}.xml options", true); + + private static final BiMap<String, String> SHORT_CF_NAME_MAP = + ImmutableBiMap.<String, String>builder() + .put(Backend.INDEXSTORE_NAME, "g") + .put(Backend.INDEXSTORE_NAME + Backend.LOCK_STORE_SUFFIX, "h") + .put(Backend.ID_STORE_NAME, "i") + .put(Backend.EDGESTORE_NAME, "e") + .put(Backend.EDGESTORE_NAME + Backend.LOCK_STORE_SUFFIX, "f") + .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME, "s") + .put(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME + Backend.LOCK_STORE_SUFFIX, "t") + .put(Backend.SYSTEM_MGMT_LOG_NAME, "m") + .put(Backend.SYSTEM_TX_LOG_NAME, "l") + .build(); + + private static final StaticBuffer FOUR_ZERO_BYTES = BufferUtil.zeroBuffer(4); + + static { + // Verify that shortCfNameMap is injective + // Should be guaranteed by Guava BiMap, but it doesn't hurt to check + Preconditions.checkArgument(null != SHORT_CF_NAME_MAP); + Collection<String> shorts = SHORT_CF_NAME_MAP.values(); + Preconditions.checkArgument(Sets.newHashSet(shorts).size() == shorts.size()); + } + + // Immutable instance fields + private final String tableName; + private final String compression; + private final int regionCount; + private final int regionsPerServer; + private final ConnectionMask cnx; + private final org.apache.hadoop.conf.Configuration hconf; + private final boolean shortCfNames; + private final boolean skipSchemaCheck; + private final String compatClass; + private final HBaseCompat compat; + + private static final ConcurrentHashMap<HBaseStoreManager, Throwable> openManagers = + new ConcurrentHashMap<HBaseStoreManager, Throwable>(); + + // Mutable instance state + private final ConcurrentMap<String, HBaseKeyColumnValueStore> openStores; + + private LocalLockMediator<StoreTransaction> llm; + + public HBaseStoreManager(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) throws BackendException { + super(config, PORT_DEFAULT); + + checkConfigDeprecation(config); + + this.tableName = config.get(HBASE_TABLE); + this.compression = config.get(COMPRESSION); + this.regionCount = config.has(REGION_COUNT) ? config.get(REGION_COUNT) : -1; + this.regionsPerServer = config.has(REGIONS_PER_SERVER) ? config.get(REGIONS_PER_SERVER) : -1; + this.skipSchemaCheck = config.get(SKIP_SCHEMA_CHECK); + this.compatClass = config.has(COMPAT_CLASS) ? config.get(COMPAT_CLASS) : null; + this.compat = HBaseCompatLoader.getCompat(compatClass); + + /* + * Specifying both region count options is permitted but may be + * indicative of a misunderstanding, so issue a warning. + */ + if (config.has(REGIONS_PER_SERVER) && config.has(REGION_COUNT)) { + logger.warn("Both {} and {} are set in Titan's configuration, but " + + "the former takes precedence and the latter will be ignored.", + REGION_COUNT, REGIONS_PER_SERVER); + } + + /* This static factory calls HBaseConfiguration.addHbaseResources(), + * which in turn applies the contents of hbase-default.xml and then + * applies the contents of hbase-site.xml. + */ + this.hconf = HBaseConfiguration.create(); + + // Copy a subset of our commons config into a Hadoop config + int keysLoaded=0; + Map<String,Object> configSub = config.getSubset(HBASE_CONFIGURATION_NAMESPACE); + for (Map.Entry<String,Object> entry : configSub.entrySet()) { + logger.info("HBase configuration: setting {}={}", entry.getKey(), entry.getValue()); + if (entry.getValue()==null) continue; + hconf.set(entry.getKey(), entry.getValue().toString()); + keysLoaded++; + } + + // Special case for STORAGE_HOSTS + if (config.has(GraphDatabaseConfiguration.STORAGE_HOSTS)) { + String zkQuorumKey = "hbase.zookeeper.quorum"; + String csHostList = Joiner.on(",").join(config.get(GraphDatabaseConfiguration.STORAGE_HOSTS)); + hconf.set(zkQuorumKey, csHostList); + logger.info("Copied host list from {} to {}: {}", GraphDatabaseConfiguration.STORAGE_HOSTS, zkQuorumKey, csHostList); + } + + logger.debug("HBase configuration: set a total of {} configuration values", keysLoaded); + + this.shortCfNames = config.get(SHORT_CF_NAMES); + + try { + //this.cnx = HConnectionManager.createConnection(hconf); + this.cnx = compat.createConnection(hconf); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (logger.isTraceEnabled()) { + openManagers.put(this, new Throwable("Manager Opened")); + dumpOpenManagers(); + } + + logger.debug("Dumping HBase config key=value pairs"); + for (Map.Entry<String, String> entry : hconf) { + logger.debug("[HBaseConfig] " + entry.getKey() + "=" + entry.getValue()); + } + logger.debug("End of HBase config key=value pairs"); + + openStores = new ConcurrentHashMap<String, HBaseKeyColumnValueStore>(); + } + + @Override + public Deployment getDeployment() { + return Deployment.REMOTE; + + /* If just one of the regions for titan table is in the localhost, + * this method returns Deployment.LOCAL - which does not sound right. + * + List<KeyRange> local; + try { + local = getLocalKeyPartition(); + return null != local && !local.isEmpty() ? Deployment.LOCAL : Deployment.REMOTE; + } catch (BackendException e) { + // propagating StorageException might be a better approach + throw new RuntimeException(e); + } + * + */ + } + + @Override + public String toString() { + return "hbase[" + tableName + "@" + super.toString() + "]"; + } + + public void dumpOpenManagers() { + int estimatedSize = openManagers.size(); + logger.trace("---- Begin open HBase store manager list ({} managers) ----", estimatedSize); + for (HBaseStoreManager m : openManagers.keySet()) { + logger.trace("Manager {} opened at:", m, openManagers.get(m)); + } + logger.trace("---- End open HBase store manager list ({} managers) ----", estimatedSize); + } + + @Override + public void close() { + openStores.clear(); + if (logger.isTraceEnabled()) + openManagers.remove(this); + IOUtils.closeQuietly(cnx); + } + + @Override + public StoreFeatures getFeatures() { + + Configuration c = GraphDatabaseConfiguration.buildConfiguration(); + + StandardStoreFeatures.Builder fb = new StandardStoreFeatures.Builder() + .orderedScan(true).unorderedScan(true).batchMutation(true) + .multiQuery(true).distributed(true).keyOrdered(true).storeTTL(true) + .timestamps(true).preferredTimestamps(PREFERRED_TIMESTAMPS) + .locking(true) + .keyConsistent(c); + + try { + fb.localKeyPartition(getDeployment() == Deployment.LOCAL); + } catch (Exception e) { + logger.warn("Unexpected exception during getDeployment()", e); + } + + return fb.build(); + } + + @Override + public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException { + logger.debug("Enter mutateMany"); + final MaskedTimestamp commitTime = new MaskedTimestamp(txh); + // In case of an addition and deletion with identical timestamps, the + // deletion tombstone wins. + // http://hbase.apache.org/book/versions.html#d244e4250 + Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = + convertToCommands( + mutations, + commitTime.getAdditionTime(times.getUnit()), + commitTime.getDeletionTime(times.getUnit())); + + List<Row> batch = new ArrayList<Row>(commandsPerKey.size()); // actual batch operation + + // convert sorted commands into representation required for 'batch' operation + for (Pair<Put, Delete> commands : commandsPerKey.values()) { + if (commands.getFirst() != null) + batch.add(commands.getFirst()); + + if (commands.getSecond() != null) + batch.add(commands.getSecond()); + } + + try { + TableMask table = null; + + try { + table = cnx.getTable(tableName); + logger.debug("mutateMany : batch mutate started size {} ", batch.size()); + table.batch(batch, new Object[batch.size()]); + logger.debug("mutateMany : batch mutate finished {} ", batch.size()); + } finally { + IOUtils.closeQuietly(table); + } + } catch (IOException e) { + throw new TemporaryBackendException(e); + } catch (InterruptedException e) { + throw new TemporaryBackendException(e); + } + + sleepAfterWrite(txh, commitTime); + } + + @Override + public KeyColumnValueStore openDatabase(String longName) throws BackendException { + + return openDatabase(longName, -1); + } + + @Override + public KeyColumnValueStore openDatabase(final String longName, int ttlInSeconds) throws BackendException { + + HBaseKeyColumnValueStore store = openStores.get(longName); + + if (store == null) { + final String cfName = shortCfNames ? shortenCfName(longName) : longName; + + final String llmPrefix = getName(); + llm = LocalLockMediators.INSTANCE.<StoreTransaction>get(llmPrefix, times); + HBaseKeyColumnValueStore newStore = new HBaseKeyColumnValueStore(this, cnx, tableName, cfName, longName, llm); + + store = openStores.putIfAbsent(longName, newStore); // nothing bad happens if we loose to other thread + + if (store == null) { + if (!skipSchemaCheck) + ensureColumnFamilyExists(tableName, cfName, ttlInSeconds); + + store = newStore; + } + logger.info("Loaded 1.x Hbase Client Store Manager"); + } + + return store; + } + + + @Override + public StoreTransaction beginTransaction(final BaseTransactionConfig config) throws BackendException { + return new HBaseTransaction(config, llm); + } + + @Override + public String getName() { + return tableName; + } + + /** + * Deletes the specified table with all its columns. + * ATTENTION: Invoking this method will delete the table if it exists and therefore causes data loss. + */ + @Override + public void clearStorage() throws BackendException { + try (AdminMask adm = getAdminInterface()) { + adm.clearTable(tableName, times.getTime().getNativeTimestamp()); + } catch (IOException e) + { + throw new TemporaryBackendException(e); + } + } + + @Override + public List<KeyRange> getLocalKeyPartition() throws BackendException { + + List<KeyRange> result = new LinkedList<KeyRange>(); + + TableMask table = null; + try { + ensureTableExists(tableName, getCfNameForStoreName(GraphDatabaseConfiguration.SYSTEM_PROPERTIES_STORE_NAME), 0); + + table = cnx.getTable(tableName); + + HTable hTable = (HTable)table.getTableObject(); + + Map<KeyRange, ServerName> normed = + normalizeKeyBounds(hTable.getRegionLocations()); + + for (Map.Entry<KeyRange, ServerName> e : normed.entrySet()) { + if (NetworkUtil.isLocalConnection(e.getValue().getHostname())) { + result.add(e.getKey()); + logger.debug("Found local key/row partition {} on host {}", e.getKey(), e.getValue()); + } else { + logger.debug("Discarding remote {}", e.getValue()); + } + } + } catch (MasterNotRunningException e) { + logger.warn("Unexpected MasterNotRunningException", e); + } catch (ZooKeeperConnectionException e) { + logger.warn("Unexpected ZooKeeperConnectionException", e); + } catch (IOException e) { + logger.warn("Unexpected IOException", e); + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + /** + * Given a map produced by {@link HTable#getRegionLocations()}, transform + * each key from an {@link HRegionInfo} to a {@link KeyRange} expressing the + * region's start and end key bounds using Titan-partitioning-friendly + * conventions (start inclusive, end exclusive, zero bytes appended where + * necessary to make all keys at least 4 bytes long). + * <p/> + * This method iterates over the entries in its map parameter and performs + * the following conditional conversions on its keys. "Require" below means + * either a {@link Preconditions} invocation or an assertion. HRegionInfo + * sometimes returns start and end keys of zero length; this method replaces + * zero length keys with null before doing any of the checks described + * below. The parameter map and the values it contains are only read and + * never modified. + * + * <ul> + * <li>If an entry's HRegionInfo has null start and end keys, then first + * require that the parameter map is a singleton, and then return a + * single-entry map whose {@code KeyRange} has start and end buffers that + * are both four bytes of zeros.</li> + * <li>If the entry has a null end key (but non-null start key), put an + * equivalent entry in the result map with a start key identical to the + * input, except that zeros are appended to values less than 4 bytes long, + * and an end key that is four bytes of zeros. + * <li>If the entry has a null start key (but non-null end key), put an + * equivalent entry in the result map where the start key is four bytes of + * zeros, and the end key has zeros appended, if necessary, to make it at + * least 4 bytes long, after which one is added to the padded value in + * unsigned 32-bit arithmetic with overflow allowed.</li> + * <li>Any entry which matches none of the above criteria results in an + * equivalent entry in the returned map, except that zeros are appended to + * both keys to make each at least 4 bytes long, and the end key is then + * incremented as described in the last bullet point.</li> + * </ul> + * + * After iterating over the parameter map, this method checks that it either + * saw no entries with null keys, one entry with a null start key and a + * different entry with a null end key, or one entry with both start and end + * keys null. If any null keys are observed besides these three cases, the + * method will die with a precondition failure. + * + * @param raw + * A map of HRegionInfo and ServerName from HBase + * @return Titan-friendly expression of each region's rowkey boundaries + */ + private Map<KeyRange, ServerName> normalizeKeyBounds(NavigableMap<HRegionInfo, ServerName> raw) { + + Map.Entry<HRegionInfo, ServerName> nullStart = null; + Map.Entry<HRegionInfo, ServerName> nullEnd = null; + + ImmutableMap.Builder<KeyRange, ServerName> b = ImmutableMap.builder(); + + for (Map.Entry<HRegionInfo, ServerName> e : raw.entrySet()) { + HRegionInfo regionInfo = e.getKey(); + byte startKey[] = regionInfo.getStartKey(); + byte endKey[] = regionInfo.getEndKey(); + + if (0 == startKey.length) { + startKey = null; + logger.trace("Converted zero-length HBase startKey byte array to null"); + } + + if (0 == endKey.length) { + endKey = null; + logger.trace("Converted zero-length HBase endKey byte array to null"); + } + + if (null == startKey && null == endKey) { + Preconditions.checkState(1 == raw.size()); + logger.debug("HBase table {} has a single region {}", tableName, regionInfo); + // Choose arbitrary shared value = startKey = endKey + return b.put(new KeyRange(FOUR_ZERO_BYTES, FOUR_ZERO_BYTES), e.getValue()).build(); + } else if (null == startKey) { + logger.debug("Found HRegionInfo with null startKey on server {}: {}", e.getValue(), regionInfo); + Preconditions.checkState(null == nullStart); + nullStart = e; + // I thought endBuf would be inclusive from the HBase javadoc, but in practice it is exclusive + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + // Replace null start key with zeroes + b.put(new KeyRange(FOUR_ZERO_BYTES, endBuf), e.getValue()); + } else if (null == endKey) { + logger.debug("Found HRegionInfo with null endKey on server {}: {}", e.getValue(), regionInfo); + Preconditions.checkState(null == nullEnd); + nullEnd = e; + // Replace null end key with zeroes + b.put(new KeyRange(StaticArrayBuffer.of(zeroExtend(startKey)), FOUR_ZERO_BYTES), e.getValue()); + } else { + Preconditions.checkState(null != startKey); + Preconditions.checkState(null != endKey); + + // Convert HBase's inclusive end keys into exclusive Titan end keys + StaticBuffer startBuf = StaticArrayBuffer.of(zeroExtend(startKey)); + StaticBuffer endBuf = StaticArrayBuffer.of(zeroExtend(endKey)); + + KeyRange kr = new KeyRange(startBuf, endBuf); + b.put(kr, e.getValue()); + logger.debug("Found HRegionInfo with non-null end and start keys on server {}: {}", e.getValue(), regionInfo); + } + } + + // Require either no null key bounds or a pair of them + Preconditions.checkState(!(null == nullStart ^ null == nullEnd)); + + // Check that every key in the result is at least 4 bytes long + Map<KeyRange, ServerName> result = b.build(); + for (KeyRange kr : result.keySet()) { + Preconditions.checkState(4 <= kr.getStart().length()); + Preconditions.checkState(4 <= kr.getEnd().length()); + } + + return result; + } + + /** + * If the parameter is shorter than 4 bytes, then create and return a new 4 + * byte array with the input array's bytes followed by zero bytes. Otherwise + * return the parameter. + * + * @param dataToPad non-null but possibly zero-length byte array + * @return either the parameter or a new array + */ + private final byte[] zeroExtend(byte[] dataToPad) { + assert null != dataToPad; + + final int targetLength = 4; + + if (targetLength <= dataToPad.length) + return dataToPad; + + byte padded[] = new byte[targetLength]; + + for (int i = 0; i < dataToPad.length; i++) + padded[i] = dataToPad[i]; + + for (int i = dataToPad.length; i < padded.length; i++) + padded[i] = (byte)0; + + return padded; + } + + public static String shortenCfName(String longName) throws PermanentBackendException { + final String s; + if (SHORT_CF_NAME_MAP.containsKey(longName)) { + s = SHORT_CF_NAME_MAP.get(longName); + Preconditions.checkNotNull(s); + logger.debug("Substituted default CF name \"{}\" with short form \"{}\" to reduce HBase KeyValue size", longName, s); + } else { + if (SHORT_CF_NAME_MAP.containsValue(longName)) { + String fmt = "Must use CF long-form name \"%s\" instead of the short-form name \"%s\" when configured with %s=true"; + String msg = String.format(fmt, SHORT_CF_NAME_MAP.inverse().get(longName), longName, SHORT_CF_NAMES.getName()); + throw new PermanentBackendException(msg); + } + s = longName; + logger.debug("Kept default CF name \"{}\" because it has no associated short form", s); + } + return s; + } + + private HTableDescriptor ensureTableExists(String tableName, String initialCFName, int ttlInSeconds) throws BackendException { + AdminMask adm = null; + + HTableDescriptor desc; + + try { // Create our table, if necessary + adm = getAdminInterface(); + /* + * Some HBase versions/impls respond badly to attempts to create a + * table without at least one CF. See #661. Creating a CF along with + * the table avoids HBase carping. + */ + if (adm.tableExists(tableName)) { + desc = adm.getTableDescriptor(tableName); + } else { + desc = createTable(tableName, initialCFName, ttlInSeconds, adm); + } + } catch (IOException e) { + throw new TemporaryBackendException(e); + } finally { + IOUtils.closeQuietly(adm); + } + + return desc; + } + + private HTableDescriptor createTable(String tableName, String cfName, int ttlInSeconds, AdminMask adm) throws IOException { + HTableDescriptor desc = compat.newTableDescriptor(tableName); + + HColumnDescriptor cdesc = new HColumnDescriptor(cfName); + setCFOptions(cdesc, ttlInSeconds); + + compat.addColumnFamilyToTableDescriptor(desc, cdesc); + + int count; // total regions to create + String src; + + if (MIN_REGION_COUNT <= (count = regionCount)) { + src = "region count configuration"; + } else if (0 < regionsPerServer && + MIN_REGION_COUNT <= (count = regionsPerServer * adm.getEstimatedRegionServerCount())) { + src = "ClusterStatus server count"; + } else { + count = -1; + src = "default"; + } + + if (MIN_REGION_COUNT < count) { + adm.createTable(desc, getStartKey(count), getEndKey(count), count); + logger.debug("Created table {} with region count {} from {}", tableName, count, src); + } else { + adm.createTable(desc); + logger.debug("Created table {} with default start key, end key, and region count", tableName); + } + + return desc; + } + + /** + * This method generates the second argument to + * {@link HBaseAdmin#createTable(HTableDescriptor, byte[], byte[], int)}. + * <p/> + * From the {@code createTable} javadoc: + * "The start key specified will become the end key of the first region of + * the table, and the end key specified will become the start key of the + * last region of the table (the first region has a null start key and + * the last region has a null end key)" + * <p/> + * To summarize, the {@code createTable} argument called "startKey" is + * actually the end key of the first region. + */ + private byte[] getStartKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount)).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + /** + * Companion to {@link #getStartKey(int)}. See its javadoc for details. + */ + private byte[] getEndKey(int regionCount) { + ByteBuffer regionWidth = ByteBuffer.allocate(4); + regionWidth.putInt((int)(((1L << 32) - 1L) / regionCount * (regionCount - 1))).flip(); + return StaticArrayBuffer.of(regionWidth).getBytes(0, 4); + } + + private void ensureColumnFamilyExists(String tableName, String columnFamily, int ttlInSeconds) throws BackendException { + AdminMask adm = null; + try { + adm = getAdminInterface(); + HTableDescriptor desc = ensureTableExists(tableName, columnFamily, ttlInSeconds); + + Preconditions.checkNotNull(desc); + + HColumnDescriptor cf = desc.getFamily(columnFamily.getBytes()); + + // Create our column family, if necessary + if (cf == null) { + try { + if (!adm.isTableDisabled(tableName)) { + adm.disableTable(tableName); + } + } catch (TableNotEnabledException e) { + logger.debug("Table {} already disabled", tableName); + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + + try { + HColumnDescriptor cdesc = new HColumnDescriptor(columnFamily); + + setCFOptions(cdesc, ttlInSeconds); + + adm.addColumn(tableName, cdesc); + + logger.debug("Added HBase ColumnFamily {}, waiting for 1 sec. to propogate.", columnFamily); + + adm.enableTable(tableName); + } catch (TableNotFoundException ee) { + logger.error("TableNotFoundException", ee); + throw new PermanentBackendException(ee); + } catch (org.apache.hadoop.hbase.TableExistsException ee) { + logger.debug("Swallowing exception {}", ee); + } catch (IOException ee) { + throw new TemporaryBackendException(ee); + } + } + } finally { + IOUtils.closeQuietly(adm); + } + } + + private void setCFOptions(HColumnDescriptor cdesc, int ttlInSeconds) { + if (null != compression && !compression.equals(COMPRESSION_DEFAULT)) + compat.setCompression(cdesc, compression); + + if (ttlInSeconds > 0) + cdesc.setTimeToLive(ttlInSeconds); + + cdesc.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF); + } + + /** + * Convert Titan internal Mutation representation into HBase native commands. + * + * @param mutations Mutations to convert into HBase commands. + * @param putTimestamp The timestamp to use for Put commands. + * @param delTimestamp The timestamp to use for Delete commands. + * @return Commands sorted by key converted from Titan internal representation. + * @throws com.thinkaurelius.titan.diskstorage.PermanentBackendException + */ + private Map<StaticBuffer, Pair<Put, Delete>> convertToCommands(Map<String, Map<StaticBuffer, KCVMutation>> mutations, + final long putTimestamp, + final long delTimestamp) throws PermanentBackendException { + Map<StaticBuffer, Pair<Put, Delete>> commandsPerKey = new HashMap<StaticBuffer, Pair<Put, Delete>>(); + + for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> entry : mutations.entrySet()) { + + String cfString = getCfNameForStoreName(entry.getKey()); + byte[] cfName = cfString.getBytes(); + + for (Map.Entry<StaticBuffer, KCVMutation> m : entry.getValue().entrySet()) { + byte[] key = m.getKey().as(StaticBuffer.ARRAY_FACTORY); + KCVMutation mutation = m.getValue(); + + Pair<Put, Delete> commands = commandsPerKey.get(m.getKey()); + + if (commands == null) { + commands = new Pair<Put, Delete>(); + commandsPerKey.put(m.getKey(), commands); + } + + if (mutation.hasDeletions()) { + if (commands.getSecond() == null) { + Delete d = new Delete(key); + compat.setTimestamp(d, delTimestamp); + commands.setSecond(d); + } + + for (StaticBuffer b : mutation.getDeletions()) { + commands.getSecond().deleteColumns(cfName, b.as(StaticBuffer.ARRAY_FACTORY), delTimestamp); + } + } + + if (mutation.hasAdditions()) { + if (commands.getFirst() == null) { + Put p = new Put(key, putTimestamp); + commands.setFirst(p); + } + + for (Entry e : mutation.getAdditions()) { + commands.getFirst().add(cfName, + e.getColumnAs(StaticBuffer.ARRAY_FACTORY), + putTimestamp, + e.getValueAs(StaticBuffer.ARRAY_FACTORY)); + } + } + } + } + + return commandsPerKey; + } + + private String getCfNameForStoreName(String storeName) throws PermanentBackendException { + return shortCfNames ? shortenCfName(storeName) : storeName; + } + + private void checkConfigDeprecation(com.thinkaurelius.titan.diskstorage.configuration.Configuration config) { + if (config.has(GraphDatabaseConfiguration.STORAGE_PORT)) { + logger.warn("The configuration property {} is ignored for HBase. Set hbase.zookeeper.property.clientPort in hbase-site.xml or {}.hbase.zookeeper.property.clientPort in Titan's configuration file.", + ConfigElement.getPath(GraphDatabaseConfiguration.STORAGE_PORT), ConfigElement.getPath(HBASE_CONFIGURATION_NAMESPACE)); + } + } + + private AdminMask getAdminInterface() { + try { + return cnx.getAdmin(); + } catch (IOException e) { + throw new TitanException(e); + } + } + + /** + * Similar to {@link Function}, except that the {@code apply} method is allowed + * to throw {@link BackendException}. + */ + private static interface BackendFunction<F, T> { + + T apply(F input) throws BackendException; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java new file mode 100644 index 0000000..e13593f --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseTransaction.java @@ -0,0 +1,75 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import com.thinkaurelius.titan.diskstorage.BackendException; +import com.thinkaurelius.titan.diskstorage.BaseTransactionConfig; +import com.thinkaurelius.titan.diskstorage.StaticBuffer; +import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashSet; +import java.util.Set; + +/** + * This class overrides and adds nothing compared with + * {@link com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction}; however, it creates a transaction type specific + * to HBase, which lets us check for user errors like passing a Cassandra + * transaction into a HBase method. + * + * @author Dan LaRocque <[email protected]> + */ +public class HBaseTransaction extends AbstractStoreTransaction { + + private static final Logger log = LoggerFactory.getLogger(HBaseTransaction.class); + + LocalLockMediator<StoreTransaction> llm; + + Set<KeyColumn> keyColumnLocks = new LinkedHashSet<>(); + + public HBaseTransaction(final BaseTransactionConfig config, LocalLockMediator<StoreTransaction> llm) { + super(config); + this.llm = llm; + } + + @Override + public synchronized void rollback() throws BackendException { + super.rollback(); + log.debug("Rolled back transaction"); + deleteAllLocks(); + } + + @Override + public synchronized void commit() throws BackendException { + super.commit(); + log.debug("Committed transaction"); + deleteAllLocks(); + } + + public void updateLocks(KeyColumn lockID, StaticBuffer expectedValue) { + keyColumnLocks.add(lockID); + } + + private void deleteAllLocks() { + for(KeyColumn kc : keyColumnLocks) { + log.debug("Removed lock {} ", kc); + llm.unlock(kc, this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java new file mode 100644 index 0000000..8660644 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection0_98.java @@ -0,0 +1,49 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; + +public class HConnection0_98 implements ConnectionMask +{ + + private final HConnection cnx; + + public HConnection0_98(HConnection cnx) + { + this.cnx = cnx; + } + + @Override + public TableMask getTable(String name) throws IOException + { + return new HTable0_98(cnx.getTable(name)); + } + + @Override + public AdminMask getAdmin() throws IOException + { + return new HBaseAdmin0_98(new HBaseAdmin(cnx)); + } + + @Override + public void close() throws IOException + { + cnx.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java new file mode 100644 index 0000000..91e5026 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HConnection1_0.java @@ -0,0 +1,50 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class HConnection1_0 implements ConnectionMask +{ + + private final Connection cnx; + + public HConnection1_0(Connection cnx) + { + this.cnx = cnx; + } + + @Override + public TableMask getTable(String name) throws IOException + { + return new HTable1_0(cnx.getTable(TableName.valueOf(name))); + } + + @Override + public AdminMask getAdmin() throws IOException + { + return new HBaseAdmin1_0(new HBaseAdmin(cnx)); + } + + @Override + public void close() throws IOException + { + cnx.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java new file mode 100644 index 0000000..b11532a --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable0_98.java @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; + +public class HTable0_98 implements TableMask +{ + private final HTableInterface table; + + public HTable0_98(HTableInterface table) + { + this.table = table; + } + + @Override + public ResultScanner getScanner(Scan filter) throws IOException + { + return table.getScanner(filter); + } + + @Override + public Result[] get(List<Get> gets) throws IOException + { + return table.get(gets); + } + + @Override + public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException + { + table.batch(writes, results); + table.flushCommits(); + } + + @Override + public void close() throws IOException + { + table.close(); + } + + @Override + public Object getTableObject() { + return table; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java new file mode 100644 index 0000000..5c90617 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HTable1_0.java @@ -0,0 +1,65 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; + +public class HTable1_0 implements TableMask +{ + private final Table table; + + public HTable1_0(Table table) + { + this.table = table; + } + + @Override + public ResultScanner getScanner(Scan filter) throws IOException + { + return table.getScanner(filter); + } + + @Override + public Result[] get(List<Get> gets) throws IOException + { + return table.get(gets); + } + + @Override + public void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException + { + table.batch(writes, results); + /* table.flushCommits(); not needed anymore */ + } + + @Override + public void close() throws IOException + { + table.close(); + } + + @Override + public Object getTableObject() { + return table; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java new file mode 100644 index 0000000..54f8743 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/TableMask.java @@ -0,0 +1,41 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its Table/HTableInterface over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface TableMask extends Closeable +{ + + ResultScanner getScanner(Scan filter) throws IOException; + + Result[] get(List<Get> gets) throws IOException; + + void batch(List<Row> writes, Object[] results) throws IOException, InterruptedException; + + Object getTableObject(); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java new file mode 100644 index 0000000..20c59e1 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator.java @@ -0,0 +1,345 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.locking; + +import com.google.common.base.Preconditions; +import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; +import com.thinkaurelius.titan.diskstorage.util.time.TimestampProvider; +import com.thinkaurelius.titan.diskstorage.locking.consistentkey.ExpectedValueCheckingTransaction; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * This class resolves lock contention between two transactions on the same JVM. + * <p/> + * This is not just an optimization to reduce network traffic. Locks written by + * Titan to a distributed key-value store contain an identifier, the "Rid", + * which is unique only to the process level. The Rid can't tell which + * transaction in a process holds any given lock. This class prevents two + * transactions in a single process from concurrently writing the same lock to a + * distributed key-value store. + * + * @author Dan LaRocque <[email protected]> + */ + +public class LocalLockMediator<T> { + + private static final Logger log = LoggerFactory + .getLogger(LocalLockMediator.class); + + /** + * Namespace for which this mediator is responsible + * + * @see LocalLockMediatorProvider + */ + private final String name; + + private final TimestampProvider times; + + private DelayQueue<ExpirableKeyColumn> expiryQueue = new DelayQueue<>(); + + private ExecutorService lockCleanerService = Executors.newFixedThreadPool(1, new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + Thread thread = Executors.defaultThreadFactory().newThread(runnable); + thread.setDaemon(true); + return thread; + } + }); + + + + /** + * Maps a ({@code key}, {@code column}) pair to the local transaction + * holding a lock on that pair. Values in this map may have already expired + * according to {@link AuditRecord#expires}, in which case the lock should + * be considered invalid. + */ + private final ConcurrentHashMap<KeyColumn, AuditRecord<T>> locks = new ConcurrentHashMap<KeyColumn, AuditRecord<T>>(); + + public LocalLockMediator(String name, TimestampProvider times) { + this.name = name; + this.times = times; + + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(times); + lockCleanerService.submit(new LockCleaner()); + } + + /** + * Acquire the lock specified by {@code kc}. + * <p/> + * <p/> + * For any particular key-column, whatever value of {@code requestor} is + * passed to this method must also be passed to the associated later call to + * {@link #unlock(KeyColumn, ExpectedValueCheckingTransaction)}. + * <p/> + * If some requestor {@code r} calls this method on a KeyColumn {@code k} + * and this method returns true, then subsequent calls to this method by + * {@code r} on {@code l} merely attempt to update the {@code expiresAt} + * timestamp. This differs from typical lock reentrance: multiple successful + * calls to this method do not require an equal number of calls to + * {@code #unlock()}. One {@code #unlock()} call is enough, no matter how + * many times a {@code requestor} called {@code lock} beforehand. Note that + * updating the timestamp may fail, in which case the lock is considered to + * have expired and the calling context should assume it no longer holds the + * lock specified by {@code kc}. + * <p/> + * The number of nanoseconds elapsed since the UNIX Epoch is not readily + * available within the JVM. When reckoning expiration times, this method + * uses the approximation implemented by + * {@link com.thinkaurelius.titan.diskstorage.util.NanoTime#getApproxNSSinceEpoch(false)}. + * <p/> + * The current implementation of this method returns true when given an + * {@code expiresAt} argument in the past. Future implementations may return + * false instead. + * + * @param kc lock identifier + * @param requestor the object locking {@code kc} + * @param expires instant at which this lock will automatically expire + * @return true if the lock is acquired, false if it was not acquired + */ + public boolean lock(KeyColumn kc, T requestor, Timepoint expires) { + assert null != kc; + assert null != requestor; + + AuditRecord<T> audit = new AuditRecord<T>(requestor, expires); + AuditRecord<T> inmap = locks.putIfAbsent(kc, audit); + + boolean success = false; + + if (null == inmap) { + // Uncontended lock succeeded + if (log.isTraceEnabled()) { + log.trace("New local lock created: {} namespace={} txn={}", + new Object[]{kc, name, requestor}); + } + success = true; + } else if (inmap.equals(audit)) { + // requestor has already locked kc; update expiresAt + success = locks.replace(kc, inmap, audit); + if (log.isTraceEnabled()) { + if (success) { + log.trace( + "Updated local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", + new Object[]{kc, name, requestor, inmap.expires, + audit.expires}); + } else { + log.trace( + "Failed to update local lock expiration: {} namespace={} txn={} oldexp={} newexp={}", + new Object[]{kc, name, requestor, inmap.expires, + audit.expires}); + } + } + } else if (0 > inmap.expires.compareTo(times.getTime())) { + // the recorded lock has expired; replace it + success = locks.replace(kc, inmap, audit); + if (log.isTraceEnabled()) { + log.trace( + "Discarding expired lock: {} namespace={} txn={} expired={}", + new Object[]{kc, name, inmap.holder, inmap.expires}); + } + } else { + // we lost to a valid lock + if (log.isTraceEnabled()) { + log.trace( + "Local lock failed: {} namespace={} txn={} (already owned by {})", + new Object[]{kc, name, requestor, inmap}); + } + } + + if (success) { + expiryQueue.add(new ExpirableKeyColumn(kc, expires)); + } + return success; + } + + /** + * Release the lock specified by {@code kc} and which was previously + * locked by {@code requestor}, if it is possible to release it. + * + * @param kc lock identifier + * @param requestor the object which previously locked {@code kc} + */ + public boolean unlock(KeyColumn kc, T requestor) { + + if (!locks.containsKey(kc)) { + log.info("Local unlock failed: no locks found for {}", kc); + return false; + } + + AuditRecord<T> unlocker = new AuditRecord<T>(requestor, null); + + AuditRecord<T> holder = locks.get(kc); + + if (!holder.equals(unlocker)) { + log.error("Local unlock of {} by {} failed: it is held by {}", + new Object[]{kc, unlocker, holder}); + return false; + } + + boolean removed = locks.remove(kc, unlocker); + + if (removed) { + expiryQueue.remove(kc); + if (log.isTraceEnabled()) { + log.trace("Local unlock succeeded: {} namespace={} txn={}", + new Object[]{kc, name, requestor}); + } + } else { + log.warn("Local unlock warning: lock record for {} disappeared " + + "during removal; this suggests the lock either expired " + + "while we were removing it, or that it was erroneously " + + "unlocked multiple times.", kc); + } + + // Even if !removed, we're finished unlocking, so return true + return true; + } + + public String toString() { + return "LocalLockMediator [" + name + ", ~" + locks.size() + + " current locks]"; + } + + /** + * A record containing the local transaction that holds a lock and the + * lock's expiration time. + */ + private static class AuditRecord<T> { + + /** + * The local transaction that holds/held the lock. + */ + private final T holder; + /** + * The expiration time of a the lock. + */ + private final Timepoint expires; + /** + * Cached hashCode. + */ + private int hashCode; + + private AuditRecord(T holder, Timepoint expires) { + this.holder = holder; + this.expires = expires; + } + + /** + * This implementation depends only on the lock holder and not on the + * lock expiration time. + */ + @Override + public int hashCode() { + if (0 == hashCode) + hashCode = holder.hashCode(); + + return hashCode; + } + + /** + * This implementation depends only on the lock holder and not on the + * lock expiration time. + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + /* + * This warning suppression is harmless because we are only going to + * call other.holder.equals(...), and since equals(...) is part of + * Object, it is guaranteed to be defined no matter the concrete + * type of parameter T. + */ + @SuppressWarnings("rawtypes") + AuditRecord other = (AuditRecord) obj; + if (holder == null) { + if (other.holder != null) + return false; + } else if (!holder.equals(other.holder)) + return false; + return true; + } + + @Override + public String toString() { + return "AuditRecord [txn=" + holder + ", expires=" + expires + "]"; + } + + } + + private class LockCleaner implements Runnable { + + @Override + public void run() { + try { + while (true) { + log.debug("Lock Cleaner service started"); + ExpirableKeyColumn lock = expiryQueue.take(); + log.debug("Expiring key column " + lock.getKeyColumn()); + locks.remove(lock.getKeyColumn()); + } + } catch (InterruptedException e) { + log.debug("Received interrupt. Exiting"); + } + } + } + + private static class ExpirableKeyColumn implements Delayed { + + private Timepoint expiryTime; + private KeyColumn kc; + + public ExpirableKeyColumn(KeyColumn keyColumn, Timepoint expiryTime) { + this.kc = keyColumn; + this.expiryTime = expiryTime; + } + + @Override + public long getDelay(TimeUnit unit) { + return expiryTime.getTimestamp(TimeUnit.NANOSECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) < ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { + return -1; + } + if (this.expiryTime.getTimestamp(TimeUnit.NANOSECONDS) > ((ExpirableKeyColumn) o).expiryTime.getTimestamp(TimeUnit.NANOSECONDS)) { + return 1; + } + return 0; + } + + public KeyColumn getKeyColumn() { + return kc; + } + } +}
