http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/pom.xml ---------------------------------------------------------------------- diff --git a/graphdb/titan0/pom.xml b/graphdb/titan0/pom.xml new file mode 100644 index 0000000..f2dc9a8 --- /dev/null +++ b/graphdb/titan0/pom.xml @@ -0,0 +1,257 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +~ Licensed to the Apache Software Foundation (ASF) under one +~ or more contributor license agreements. See the NOTICE file +~ distributed with this work for additional information +~ regarding copyright ownership. The ASF licenses this file +~ to you under the Apache License, Version 2.0 (the +~ "License"); you may not use this file except in compliance +~ with the License. You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>atlas-graphdb</artifactId> + <groupId>org.apache.atlas</groupId> + <version>0.8-incubating-SNAPSHOT</version> + </parent> + <artifactId>atlas-graphdb-titan0</artifactId> + <description>Apache Atlas Titan 0.5.4 Graph DB Impl</description> + <name>Apache Atlas Titan 0.5.4 Graph DB Impl</name> + <packaging>jar</packaging> + + <properties> + <tinkerpop.version>2.6.0</tinkerpop.version> + <titan.version>0.5.4</titan.version> + </properties> + + <dependencies> + + <!-- for graphdb interface definitions --> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-graphdb-api</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-graphdb-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.google.inject</groupId> + <artifactId>guice</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>commons-configuration</groupId> + <artifactId>commons-configuration</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-core</artifactId> + <version>${titan.version}</version> + </dependency> + + <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-core</artifactId> + <version>${tinkerpop.version}</version> + </dependency> + + <dependency> + <groupId>com.tinkerpop.gremlin</groupId> + <artifactId>gremlin-java</artifactId> + <version>${tinkerpop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + </dependency> + + <dependency> + <groupId>com.vividsolutions</groupId> + <artifactId>jts</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.solr</groupId> + <artifactId>solr-solrj</artifactId> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-es</artifactId> + <version>${titan.version}</version> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-berkeleyje</artifactId> + <version>${titan.version}</version> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-lucene</artifactId> + <version>${titan.version}</version> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + </dependency> + + <dependency> + <groupId>commons-collections</groupId> + <artifactId>commons-collections</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <!-- + Create 'uber' jar that contains all of the dependencies (except those whose scope is provided) + Only Titan 0l5l4 and its dependencies are included. The other dependencies are bundled in the war file. + --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>2.4.3</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadedArtifactAttached>false</shadedArtifactAttached> + <artifactSet> + <excludes> + <!-- these are bundled with Atlas --> + <exclude>org.slf4j:*</exclude> + </excludes> + </artifactSet> + <filters> + <filter> + <artifact>com.thinkaurelius.titan:titan-core</artifact> + <!-- force use of our custom LocalLockMediator implementation --> + <excludes> + <exclude>com/thinkaurelius/titan/diskstorage/locking/LocalLockMediator*</exclude> + </excludes> + </filter> + + </filters> + <createSourcesJar>true</createSourcesJar> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + <excludes> + <exclude>**/log4j.xml</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencyManagement> + <dependencies> + <!-- Graph DB --> + <dependency> + <groupId>com.tinkerpop.blueprints</groupId> + <artifactId>blueprints-core</artifactId> + <version>${tinkerpop.version}</version> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-core</artifactId> + <version>${titan.version}</version> + <exclusions> + <!-- rexster does not work with servlet-api --> + <exclusion> + <groupId>com.tinkerpop.rexster</groupId> + <artifactId>rexster-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.tinkerpop.rexster</groupId> + <artifactId>rexster-server</artifactId> + </exclusion> + <!-- asm 4.0 does not work with jersey asm 3.1 --> + <exclusion> + <groupId>com.tinkerpop</groupId> + <artifactId>frames</artifactId> + </exclusion> + <exclusion> + <groupId>com.esotericsoftware.reflectasm</groupId> + <artifactId>reflectasm</artifactId> + </exclusion> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> <!-- GPL license imported from ganglia --> + <groupId>org.acplt</groupId> + <artifactId>oncrpc</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-berkeleyje</artifactId> + <version>${titan.version}</version> + </dependency> + + <dependency> + <groupId>com.thinkaurelius.titan</groupId> + <artifactId>titan-hbase</artifactId> + <version>${titan.version}</version> + </dependency> + + </dependencies> + </dependencyManagement> +</project>
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java new file mode 100644 index 0000000..e255f1b --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java @@ -0,0 +1,62 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface AdminMask extends Closeable +{ + + void clearTable(String tableName, long timestamp) throws IOException; + + HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException; + + boolean tableExists(String tableName) throws IOException; + + void createTable(HTableDescriptor desc) throws IOException; + + void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException; + + /** + * Estimate the number of regionservers in the HBase cluster. + * + * This is usually implemented by calling + * {@link HBaseAdmin#getClusterStatus()} and then + * {@link ClusterStatus#getServers()} and finally {@code size()} on the + * returned server list. + * + * @return the number of servers in the cluster or -1 if it could not be determined + */ + int getEstimatedRegionServerCount(); + + void disableTable(String tableName) throws IOException; + + void enableTable(String tableName) throws IOException; + + boolean isTableDisabled(String tableName) throws IOException; + + void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java new file mode 100644 index 0000000..feb578b --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java @@ -0,0 +1,30 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.Closeable; +import java.io.IOException; + +/** + * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course + * of development from 0.94 to 1.0 and beyond. + */ +public interface ConnectionMask extends Closeable +{ + + TableMask getTable(String name) throws IOException; + + AdminMask getAdmin() throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java new file mode 100644 index 0000000..0cd4795 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java @@ -0,0 +1,152 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.thinkaurelius.titan.util.system.IOUtils; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; + +public class HBaseAdmin0_98 implements AdminMask +{ + + private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class); + + private final HBaseAdmin adm; + + public HBaseAdmin0_98(HBaseAdmin adm) + { + this.adm = adm; + } + + @Override + public void clearTable(String tableName, long timestamp) throws IOException + { + if (!adm.tableExists(tableName)) { + log.debug("clearStorage() called before table {} was created, skipping.", tableName); + return; + } + + // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than + // disabling and deleting tables. + HTable table = null; + + try { + table = new HTable(adm.getConfiguration(), tableName); + + Scan scan = new Scan(); + scan.setBatch(100); + scan.setCacheBlocks(false); + scan.setCaching(2000); + scan.setTimeRange(0, Long.MAX_VALUE); + scan.setMaxVersions(1); + + ResultScanner scanner = null; + + try { + scanner = table.getScanner(scan); + + for (Result res : scanner) { + Delete d = new Delete(res.getRow()); + + d.setTimestamp(timestamp); + table.delete(d); + } + } finally { + IOUtils.closeQuietly(scanner); + } + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException + { + return adm.getTableDescriptor(tableName.getBytes()); + } + + @Override + public boolean tableExists(String tableName) throws IOException + { + return adm.tableExists(tableName); + } + + @Override + public void createTable(HTableDescriptor desc) throws IOException + { + adm.createTable(desc); + } + + @Override + public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException + { + adm.createTable(desc, startKey, endKey, numRegions); + } + + @Override + public int getEstimatedRegionServerCount() + { + int serverCount = -1; + try { + serverCount = adm.getClusterStatus().getServers().size(); + log.debug("Read {} servers from HBase ClusterStatus", serverCount); + } catch (IOException e) { + log.debug("Unable to retrieve HBase cluster status", e); + } + return serverCount; + } + + @Override + public void disableTable(String tableName) throws IOException + { + adm.disableTable(tableName); + } + + @Override + public void enableTable(String tableName) throws IOException + { + adm.enableTable(tableName); + } + + @Override + public boolean isTableDisabled(String tableName) throws IOException + { + return adm.isTableDisabled(tableName); + } + + @Override + public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException + { + adm.addColumn(tableName, columnDescriptor); + } + + @Override + public void close() throws IOException + { + adm.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java new file mode 100644 index 0000000..7e8f72d --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java @@ -0,0 +1,135 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotDisabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.HBaseAdmin; + +public class HBaseAdmin1_0 implements AdminMask +{ + + private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class); + + private final Admin adm; + + public HBaseAdmin1_0(HBaseAdmin adm) + { + this.adm = adm; + } + @Override + public void clearTable(String tableString, long timestamp) throws IOException + { + TableName tableName = TableName.valueOf(tableString); + + if (!adm.tableExists(tableName)) { + log.debug("Attempted to clear table {} before it exists (noop)", tableString); + return; + } + + if (!adm.isTableDisabled(tableName)) + adm.disableTable(tableName); + + if (!adm.isTableDisabled(tableName)) + throw new RuntimeException("Unable to disable table " + tableName); + + // This API call appears to both truncate and reenable the table. + log.info("Truncating table {}", tableName); + adm.truncateTable(tableName, true /* preserve splits */); + + try { + adm.enableTable(tableName); + } catch (TableNotDisabledException e) { + // This triggers seemingly every time in testing with 1.0.2. + log.debug("Table automatically reenabled by truncation: {}", tableName, e); + } + } + + @Override + public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException + { + return adm.getTableDescriptor(TableName.valueOf(tableString)); + } + + @Override + public boolean tableExists(String tableString) throws IOException + { + return adm.tableExists(TableName.valueOf(tableString)); + } + + @Override + public void createTable(HTableDescriptor desc) throws IOException + { + adm.createTable(desc); + } + + @Override + public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException + { + adm.createTable(desc, startKey, endKey, numRegions); + } + + @Override + public int getEstimatedRegionServerCount() + { + int serverCount = -1; + try { + serverCount = adm.getClusterStatus().getServers().size(); + log.debug("Read {} servers from HBase ClusterStatus", serverCount); + } catch (IOException e) { + log.debug("Unable to retrieve HBase cluster status", e); + } + return serverCount; + } + + @Override + public void disableTable(String tableString) throws IOException + { + adm.disableTable(TableName.valueOf(tableString)); + } + + @Override + public void enableTable(String tableString) throws IOException + { + adm.enableTable(TableName.valueOf(tableString)); + } + + @Override + public boolean isTableDisabled(String tableString) throws IOException + { + return adm.isTableDisabled(TableName.valueOf(tableString)); + } + + @Override + public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException + { + adm.addColumn(TableName.valueOf(tableString), columnDescriptor); + } + + @Override + public void close() throws IOException + { + adm.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java new file mode 100644 index 0000000..c9b03aa --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java @@ -0,0 +1,60 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; + +public interface HBaseCompat { + + /** + * Configure the compression scheme {@code algo} on a column family + * descriptor {@code cd}. The {@code algo} parameter is a string value + * corresponding to one of the values of HBase's Compression enum. The + * Compression enum has moved between packages as HBase has evolved, which + * is why this method has a String argument in the signature instead of the + * enum itself. + * + * @param cd + * column family to configure + * @param algo + * compression type to use + */ + public void setCompression(HColumnDescriptor cd, String algo); + + /** + * Create and return a HTableDescriptor instance with the given name. The + * constructors on this method have remained stable over HBase development + * so far, but the old HTableDescriptor(String) constructor & byte[] friends + * are now marked deprecated and may eventually be removed in favor of the + * HTableDescriptor(TableName) constructor. That constructor (and the + * TableName type) only exists in newer HBase versions. Hence this method. + * + * @param tableName + * HBase table name + * @return a new table descriptor instance + */ + public HTableDescriptor newTableDescriptor(String tableName); + + ConnectionMask createConnection(Configuration conf) throws IOException; + + void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc); + + void setTimestamp(Delete d, long timestamp); +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java new file mode 100644 index 0000000..2c0f3b4 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.io.compress.Compression; + +public class HBaseCompat0_98 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection0_98(HConnectionManager.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java new file mode 100644 index 0000000..bb3fb3b --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.io.compress.Compression; + +public class HBaseCompat1_0 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection1_0(ConnectionFactory.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java new file mode 100644 index 0000000..e5c3d31 --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java @@ -0,0 +1,58 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.io.compress.Compression; + +import java.io.IOException; + +public class HBaseCompat1_1 implements HBaseCompat { + + @Override + public void setCompression(HColumnDescriptor cd, String algo) { + cd.setCompressionType(Compression.Algorithm.valueOf(algo)); + } + + @Override + public HTableDescriptor newTableDescriptor(String tableName) { + TableName tn = TableName.valueOf(tableName); + return new HTableDescriptor(tn); + } + + @Override + public ConnectionMask createConnection(Configuration conf) throws IOException + { + return new HConnection1_0(ConnectionFactory.createConnection(conf)); + } + + @Override + public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) + { + tdesc.addFamily(cdesc); + } + + @Override + public void setTimestamp(Delete d, long timestamp) + { + d.setTimestamp(timestamp); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java new file mode 100644 index 0000000..2c0d6fe --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java @@ -0,0 +1,80 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import java.util.Arrays; + +import org.apache.hadoop.hbase.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HBaseCompatLoader { + + private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class); + + private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1"; + + private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1"; + + private static HBaseCompat cachedCompat; + + public synchronized static HBaseCompat getCompat(String classOverride) { + + if (null != cachedCompat) { + log.debug("Returning cached HBase compatibility layer: {}", cachedCompat); + return cachedCompat; + } + + HBaseCompat compat; + String className = null; + String classNameSource = null; + + if (null != classOverride) { + className = classOverride; + classNameSource = "from explicit configuration"; + } else { + String hbaseVersion = VersionInfo.getVersion(); + for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) { + if (hbaseVersion.startsWith(supportedVersion + ".")) { + className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_"); + classNameSource = "supporting runtime HBase version " + hbaseVersion; + break; + } + } + if (null == className) { + log.info("The HBase version {} is not explicitly supported by Titan. " + + "Loading Titan's compatibility layer for its most recent supported HBase version ({})", + hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION); + className = DEFAULT_HBASE_CLASS_NAME; + classNameSource = " by default"; + } + } + + final String errTemplate = " when instantiating HBase compatibility class " + className; + + try { + compat = (HBaseCompat)Class.forName(className).newInstance(); + log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName()); + } catch (IllegalAccessException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } catch (InstantiationException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); + } + + return cachedCompat = compat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/4fa10b6a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java new file mode 100644 index 0000000..c5f6e0d --- /dev/null +++ b/graphdb/titan0/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java @@ -0,0 +1,425 @@ +/* + * Copyright 2012-2013 Aurelius LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.thinkaurelius.titan.diskstorage.hbase; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.thinkaurelius.titan.core.attribute.Duration; +import com.thinkaurelius.titan.diskstorage.*; +import com.thinkaurelius.titan.diskstorage.configuration.Configuration; +import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; +import com.thinkaurelius.titan.diskstorage.locking.LocalLockMediator; +import com.thinkaurelius.titan.diskstorage.locking.PermanentLockingException; +import com.thinkaurelius.titan.diskstorage.util.KeyColumn; +import com.thinkaurelius.titan.diskstorage.util.RecordIterator; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; +import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; +import com.thinkaurelius.titan.diskstorage.util.time.Timepoint; +import com.thinkaurelius.titan.diskstorage.util.time.Timestamps; +import com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration; +import com.thinkaurelius.titan.util.system.IOUtils; + +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; +import org.apache.hadoop.hbase.filter.ColumnRangeFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterList; +import org.apache.hadoop.hbase.util.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Here are some areas that might need work: + * <p/> + * - batching? (consider HTable#batch, HTable#setAutoFlush(false) + * - tuning HTable#setWriteBufferSize (?) + * - writing a server-side filter to replace ColumnCountGetFilter, which drops + * all columns on the row where it reaches its limit. This requires getSlice, + * currently, to impose its limit on the client side. That obviously won't + * scale. + * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this) + * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache + * <p/> + * There may be other problem areas. These are just the ones of which I'm aware. + */ +public class HBaseKeyColumnValueStore implements KeyColumnValueStore { + + private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class); + + private final String tableName; + private final HBaseStoreManager storeManager; + + // When using shortened CF names, columnFamily is the shortname and storeName is the longname + // When not using shortened CF names, they are the same + //private final String columnFamily; + private final String storeName; + // This is columnFamily.getBytes() + private final byte[] columnFamilyBytes; + private final HBaseGetter entryGetter; + + private final ConnectionMask cnx; + + private LocalLockMediator<StoreTransaction> localLockMediator; + + private final Duration lockExpiryTimeMs; + private final Duration lockMaxWaitTimeMs; + private final Integer lockMaxRetries; + + HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName, LocalLockMediator<StoreTransaction> llm) { + this.storeManager = storeManager; + this.cnx = cnx; + this.tableName = tableName; + //this.columnFamily = columnFamily; + this.storeName = storeName; + this.columnFamilyBytes = columnFamily.getBytes(); + this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); + this.localLockMediator = llm; + Configuration storageConfig = storeManager.getStorageConfig(); + this.lockExpiryTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_EXPIRE); + this.lockMaxWaitTimeMs = storageConfig.get(GraphDatabaseConfiguration.LOCK_WAIT); + this.lockMaxRetries = storageConfig.get(GraphDatabaseConfiguration.LOCK_RETRY); + } + + @Override + public void close() throws BackendException { + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query)); + return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); + } + + @Override + public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { + return getHelper(keys, getFilter(query)); + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { + Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); + mutateMany(mutations, txh); + } + + @Override + public void acquireLock(StaticBuffer key, + StaticBuffer column, + StaticBuffer expectedValue, + StoreTransaction txh) throws BackendException { + + KeyColumn lockID = new KeyColumn(key, column); + logger.debug("Attempting to acquireLock on {} ", lockID); + int trialCount = 0; + boolean locked; + while (trialCount < lockMaxRetries) { + final Timepoint lockStartTime = Timestamps.MILLI.getTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + locked = localLockMediator.lock(lockID, txh, lockStartTime.add(lockExpiryTimeMs)); + trialCount++; + if (!locked) { + handleLockFailure(txh, lockID, trialCount); + } else { + logger.debug("Acquired lock on {}, {}", lockID, txh); + break; + } + } + ((HBaseTransaction) txh).updateLocks(lockID, expectedValue); + } + + void handleLockFailure(StoreTransaction txh, KeyColumn lockID, int trialCount) throws PermanentLockingException { + if (trialCount < lockMaxRetries) { + try { + Thread.sleep(lockMaxWaitTimeMs.getLength(TimeUnit.DAYS.MILLISECONDS)); + } catch (InterruptedException e) { + throw new PermanentLockingException( + "Interrupted while waiting for acquiring lock for transaction " + + txh + " lockID " + lockID + " on retry " + trialCount, e); + } + } else { + throw new PermanentLockingException("Could not lock the keyColumn " + + lockID + " on CF {} " + Bytes.toString(columnFamilyBytes)); + } + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), + query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY), + new FilterList(FilterList.Operator.MUST_PASS_ALL), + query); + } + + @Override + public String getName() { + return storeName; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { + return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query); + } + + public static Filter getFilter(SliceQuery query) { + byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null; + byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null; + + Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false); + + if (query.hasLimit()) { + filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, + filter, + new ColumnPaginationFilter(query.getLimit(), 0)); + } + + logger.debug("Generated HBase Filter {}", filter); + + return filter; + } + + private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException { + List<Get> requests = new ArrayList<Get>(keys.size()); + { + for (StaticBuffer key : keys) { + Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter); + try { + g.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + requests.add(g); + } + } + + Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size()); + + try { + TableMask table = null; + Result[] results = null; + + try { + table = cnx.getTable(tableName); + logger.debug("Get requests {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); + results = table.get(requests); + logger.debug("Get requests finished {} {} ", Bytes.toString(columnFamilyBytes), requests.size()); + } finally { + IOUtils.closeQuietly(table); + } + + if (results == null) + return KCVSUtil.emptyResults(keys); + + assert results.length==keys.size(); + + for (int i = 0; i < results.length; i++) { + Result result = results[i]; + NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap(); + + if (f == null) { // no result for this key + resultMap.put(keys.get(i), EntryList.EMPTY_LIST); + continue; + } + + // actual key with <timestamp, value> + NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes); + resultMap.put(keys.get(i), (r == null) + ? EntryList.EMPTY_LIST + : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter)); + } + + return resultMap; + } catch (IOException e) { + throw new TemporaryBackendException(e); + } + } + + private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { + storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh); + } + + private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException { + return executeKeySliceQuery(null, null, filters, columnSlice); + } + + private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey, + @Nullable byte[] endKey, + FilterList filters, + @Nullable SliceQuery columnSlice) throws BackendException { + Scan scan = new Scan().addFamily(columnFamilyBytes); + + try { + scan.setTimeRange(0, Long.MAX_VALUE); + } catch (IOException e) { + throw new PermanentBackendException(e); + } + + if (startKey != null) + scan.setStartRow(startKey); + + if (endKey != null) + scan.setStopRow(endKey); + + if (columnSlice != null) { + filters.addFilter(getFilter(columnSlice)); + } + + TableMask table = null; + + logger.debug("Scan for row keys {} {} ", Bytes.toString(startKey), Bytes.toString(endKey)); + try { + table = cnx.getTable(tableName); + return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes); + } catch (IOException e) { + IOUtils.closeQuietly(table); + throw new PermanentBackendException(e); + } + } + + private class RowIterator implements KeyIterator { + private final Closeable table; + private final Iterator<Result> rows; + private final byte[] columnFamilyBytes; + + private Result currentRow; + private boolean isClosed; + + public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) { + this.table = table; + this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length); + this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() { + @Override + public boolean apply(@Nullable Result result) { + if (result == null) + return false; + + try { + StaticBuffer id = StaticArrayBuffer.of(result.getRow()); + id.getLong(0); + } catch (NumberFormatException e) { + return false; + } + + return true; + } + }); + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + return new RecordIterator<Entry>() { + private final NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> currentMap = currentRow.getMap(); + private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentMap == null ? null : currentMap.get(columnFamilyBytes).entrySet().iterator(); + + @Override + public boolean hasNext() { + ensureOpen(); + return kv == null ? false : kv.hasNext(); + } + + @Override + public Entry next() { + ensureOpen(); + return kv == null ? null : StaticArrayEntry.ofBytes(kv.next(), entryGetter); + } + + @Override + public void close() { + isClosed = true; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public boolean hasNext() { + ensureOpen(); + return rows.hasNext(); + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + currentRow = rows.next(); + return StaticArrayBuffer.of(currentRow.getRow()); + } + + @Override + public void close() { + IOUtils.closeQuietly(table); + isClosed = true; + logger.debug("RowIterator closed table {}", table); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + private void ensureOpen() { + if (isClosed) + throw new IllegalStateException("Iterator has been closed."); + } + } + + private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> { + + private final EntryMetaData[] schema; + + private HBaseGetter(EntryMetaData[] schema) { + this.schema = schema; + } + + @Override + public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { + return element.getKey(); + } + + @Override + public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { + return element.getValue().lastEntry().getValue(); + } + + @Override + public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { + return schema; + } + + @Override + public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) { + switch(meta) { + case TIMESTAMP: + return element.getValue().lastEntry().getKey(); + default: + throw new UnsupportedOperationException("Unsupported meta data: " + meta); + } + } + } +}
