ATLAS-352 Improve write performance on type and entity creation with Hbase(sumasai)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/919120f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/919120f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/919120f6 Branch: refs/heads/master Commit: 919120f65f551f80a02d2782cb4e01f87202f7a5 Parents: 91ad021 Author: Suma Shivaprasad <[email protected]> Authored: Thu Dec 3 11:07:25 2015 +0530 Committer: Suma Shivaprasad <[email protected]> Committed: Thu Dec 3 11:07:25 2015 +0530 ---------------------------------------------------------------------- distro/pom.xml | 10 + distro/src/bin/atlas_start.py | 1 + .../src/main/assemblies/standalone-package.xml | 2 +- distro/src/test/python/scripts/TestMetadata.py | 10 +- pom.xml | 7 + release-log.txt | 1 + repository/pom.xml | 51 +- .../titan/diskstorage/hbase/AdminMask.java | 62 -- .../titan/diskstorage/hbase/ConnectionMask.java | 30 - .../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 --- .../titan/diskstorage/hbase/HBaseAdmin1_0.java | 135 --- .../titan/diskstorage/hbase/HBaseCompat.java | 61 -- .../diskstorage/hbase/HBaseCompat0_98.java | 58 -- .../titan/diskstorage/hbase/HBaseCompat1_0.java | 59 -- .../titan/diskstorage/hbase/HBaseCompat1_1.java | 58 -- .../diskstorage/hbase/HBaseCompatLoader.java | 80 -- .../hbase/HBaseKeyColumnValueStore.java | 368 ------- .../diskstorage/hbase/HBaseStoreManager.java | 925 ------------------ .../diskstorage/hbase/HBaseTransaction.java | 33 - .../diskstorage/hbase/HConnection0_98.java | 49 - .../titan/diskstorage/hbase/HConnection1_0.java | 50 - .../titan/diskstorage/hbase/HTable0_98.java | 60 -- .../titan/diskstorage/hbase/HTable1_0.java | 61 -- .../titan/diskstorage/hbase/TableMask.java | 40 - .../titan/diskstorage/solr/Solr5Index.java | 972 ------------------ .../apache/atlas/RepositoryMetadataModule.java | 1 + .../graph/GraphRepoMapperScaleTest.java | 60 +- titan/pom.xml | 105 ++ .../titan/diskstorage/hbase/AdminMask.java | 62 ++ .../titan/diskstorage/hbase/ConnectionMask.java | 30 + .../titan/diskstorage/hbase/HBaseAdmin0_98.java | 152 +++ .../titan/diskstorage/hbase/HBaseAdmin1_0.java | 135 +++ .../titan/diskstorage/hbase/HBaseCompat.java | 60 ++ .../diskstorage/hbase/HBaseCompat0_98.java | 58 ++ .../titan/diskstorage/hbase/HBaseCompat1_0.java | 58 ++ .../titan/diskstorage/hbase/HBaseCompat1_1.java | 58 ++ .../diskstorage/hbase/HBaseCompatLoader.java | 80 ++ .../hbase/HBaseKeyColumnValueStore.java | 397 ++++++++ .../diskstorage/hbase/HBaseStoreManager.java | 926 ++++++++++++++++++ .../diskstorage/hbase/HBaseTransaction.java | 75 ++ .../diskstorage/hbase/HConnection0_98.java | 49 + .../titan/diskstorage/hbase/HConnection1_0.java | 50 + .../titan/diskstorage/hbase/HTable0_98.java | 60 ++ .../titan/diskstorage/hbase/HTable1_0.java | 60 ++ .../titan/diskstorage/hbase/TableMask.java | 40 + .../diskstorage/locking/LocalLockMediator.java | 345 +++++++ .../titan/diskstorage/solr/Solr5Index.java | 973 +++++++++++++++++++ .../locking/LocalLockMediatorTest.java | 60 ++ 48 files changed, 3885 insertions(+), 3344 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/pom.xml ---------------------------------------------------------------------- diff --git a/distro/pom.xml b/distro/pom.xml index 1496eec..d88805b 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -89,6 +89,16 @@ </profiles> <build> + <outputDirectory>target/bin</outputDirectory> + <resources> + <resource> + <directory>src/bin</directory> + <filtering>true</filtering> + <includes> + <include>**/*.py</include> + </includes> + </resource> + </resources> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/bin/atlas_start.py ---------------------------------------------------------------------- diff --git a/distro/src/bin/atlas_start.py b/distro/src/bin/atlas_start.py index 08e9bdc..fc5860e 100755 --- a/distro/src/bin/atlas_start.py +++ b/distro/src/bin/atlas_start.py @@ -58,6 +58,7 @@ def main(): p = os.pathsep metadata_classpath = confdir + p \ + os.path.join(web_app_dir, "atlas", "WEB-INF", "classes" ) + p \ + + os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "atlas-titan-${project.version}.jar" ) + p \ + os.path.join(web_app_dir, "atlas", "WEB-INF", "lib", "*" ) + p \ + os.path.join(metadata_home, "libext", "*") if os.path.exists(hbase_conf_dir): http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/distro/src/main/assemblies/standalone-package.xml b/distro/src/main/assemblies/standalone-package.xml index 4530773..56fe736 100755 --- a/distro/src/main/assemblies/standalone-package.xml +++ b/distro/src/main/assemblies/standalone-package.xml @@ -48,7 +48,7 @@ </fileSet> <fileSet> - <directory>src/bin</directory> + <directory>target/bin</directory> <outputDirectory>bin</outputDirectory> <fileMode>0755</fileMode> <directoryMode>0755</directoryMode> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/distro/src/test/python/scripts/TestMetadata.py ---------------------------------------------------------------------- diff --git a/distro/src/test/python/scripts/TestMetadata.py b/distro/src/test/python/scripts/TestMetadata.py index 74a8b84..349059f 100644 --- a/distro/src/test/python/scripts/TestMetadata.py +++ b/distro/src/test/python/scripts/TestMetadata.py @@ -56,20 +56,16 @@ class TestMetadata(unittest.TestCase): java_mock.assert_called_with( 'org.apache.atlas.Atlas', ['-app', 'metadata_home\\server\\webapp\\atlas'], - 'metadata_home\\conf;metadata_home\\server\\webapp\\atlas\\WEB-INF\\classes;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;metadata_home\\libext\\*;metadata_home\\hbase\\conf', + 'metadata_home\\conf;metadata_home\\server\\webapp\\atlas\\WEB-INF\\classes;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\atlas-titan-${project.version}.jar;metadata_home\\server\\webapp\\atlas\\WEB-INF\\lib\\*;metadata_home\\libext\\*;metadata_home\\hbase\\conf', ['-Datlas.log.dir=metadata_home\\logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home\\conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home\\logs') - - - else: java_mock.assert_called_with( 'org.apache.atlas.Atlas', ['-app', 'metadata_home/server/webapp/atlas'], - 'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*:metadata_home/hbase/conf', + 'metadata_home/conf:metadata_home/server/webapp/atlas/WEB-INF/classes:metadata_home/server/webapp/atlas/WEB-INF/lib/atlas-titan-${project.version}.jar:metadata_home/server/webapp/atlas/WEB-INF/lib/*:metadata_home/libext/*:metadata_home/hbase/conf', ['-Datlas.log.dir=metadata_home/logs', '-Datlas.log.file=application.log', '-Datlas.home=metadata_home', '-Datlas.conf=metadata_home/conf', '-Xmx1024m', '-XX:MaxPermSize=512m', '-Dlog4j.configuration=atlas-log4j.xml'], 'metadata_home/logs') - - + pass def test_jar_java_lookups_fail(self): http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c09e640..ede8c53 100755 --- a/pom.xml +++ b/pom.xml @@ -409,6 +409,7 @@ <module>typesystem</module> <module>notification</module> <module>client</module> + <module>titan</module> <module>repository</module> <module>dashboard</module> <module>webapp</module> @@ -925,6 +926,12 @@ <dependency> <groupId>org.apache.atlas</groupId> + <artifactId>atlas-titan</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.atlas</groupId> <artifactId>atlas-repository</artifactId> <version>${project.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6d4873b..1c71dd1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-54 Rename configs in hive hook (shwethags) ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags) ALL CHANGES: +ATLAS-352 Improve write performance on type and entity creation with Hbase (sumasai) ATLAS-350 Document jaas config details for atlas (tbeerbower via shwethags) ATLAS-344 Document HBase permissions for secure cluster (tbeerbower via shwethags) ATLAS-335 Kerberized cluster: Atlas fails to come up with hbase as backend (sumasai via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/pom.xml ---------------------------------------------------------------------- diff --git a/repository/pom.xml b/repository/pom.xml index 28107e0..6e1baee 100755 --- a/repository/pom.xml +++ b/repository/pom.xml @@ -50,6 +50,11 @@ </dependency> <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-titan</artifactId> + </dependency> + + <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> @@ -85,52 +90,6 @@ </dependency> <dependency> - <groupId>com.thinkaurelius.titan</groupId> - <artifactId>titan-core</artifactId> - </dependency> - - <dependency> - <groupId>com.thinkaurelius.titan</groupId> - <artifactId>titan-es</artifactId> - </dependency> - - <dependency> - <groupId>com.vividsolutions</groupId> - <artifactId>jts</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.solr</groupId> - <artifactId>solr-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.solr</groupId> - <artifactId>solr-solrj</artifactId> - </dependency> - - <dependency> - <groupId>com.thinkaurelius.titan</groupId> - <artifactId>titan-berkeleyje</artifactId> - </dependency> - - <!-- Commenting out since titan-hbase classes are shaded for 1.x support --> - <!--<dependency>--> - <!--<groupId>com.thinkaurelius.titan</groupId>--> - <!--<artifactId>titan-hbase</artifactId>--> - <!--</dependency>--> - - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - </dependency> - - <dependency> - <groupId>com.thinkaurelius.titan</groupId> - <artifactId>titan-lucene</artifactId> - </dependency> - - <dependency> <groupId>com.tinkerpop.gremlin</groupId> <artifactId>gremlin-java</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java deleted file mode 100644 index e255f1b..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/AdminMask.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.Closeable; -import java.io.IOException; - -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; - -/** - * This interface hides ABI/API breaking changes that HBase has made to its Admin/HBaseAdmin over the course - * of development from 0.94 to 1.0 and beyond. - */ -public interface AdminMask extends Closeable -{ - - void clearTable(String tableName, long timestamp) throws IOException; - - HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException; - - boolean tableExists(String tableName) throws IOException; - - void createTable(HTableDescriptor desc) throws IOException; - - void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException; - - /** - * Estimate the number of regionservers in the HBase cluster. - * - * This is usually implemented by calling - * {@link HBaseAdmin#getClusterStatus()} and then - * {@link ClusterStatus#getServers()} and finally {@code size()} on the - * returned server list. - * - * @return the number of servers in the cluster or -1 if it could not be determined - */ - int getEstimatedRegionServerCount(); - - void disableTable(String tableName) throws IOException; - - void enableTable(String tableName) throws IOException; - - boolean isTableDisabled(String tableName) throws IOException; - - void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java deleted file mode 100644 index feb578b..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/ConnectionMask.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.Closeable; -import java.io.IOException; - -/** - * This interface hides ABI/API breaking changes that HBase has made to its (H)Connection class over the course - * of development from 0.94 to 1.0 and beyond. - */ -public interface ConnectionMask extends Closeable -{ - - TableMask getTable(String name) throws IOException; - - AdminMask getAdmin() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java deleted file mode 100644 index 0cd4795..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin0_98.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.thinkaurelius.titan.util.system.IOUtils; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; - -public class HBaseAdmin0_98 implements AdminMask -{ - - private static final Logger log = LoggerFactory.getLogger(HBaseAdmin0_98.class); - - private final HBaseAdmin adm; - - public HBaseAdmin0_98(HBaseAdmin adm) - { - this.adm = adm; - } - - @Override - public void clearTable(String tableName, long timestamp) throws IOException - { - if (!adm.tableExists(tableName)) { - log.debug("clearStorage() called before table {} was created, skipping.", tableName); - return; - } - - // Unfortunately, linear scanning and deleting tables is faster in HBase < 1 when running integration tests than - // disabling and deleting tables. - HTable table = null; - - try { - table = new HTable(adm.getConfiguration(), tableName); - - Scan scan = new Scan(); - scan.setBatch(100); - scan.setCacheBlocks(false); - scan.setCaching(2000); - scan.setTimeRange(0, Long.MAX_VALUE); - scan.setMaxVersions(1); - - ResultScanner scanner = null; - - try { - scanner = table.getScanner(scan); - - for (Result res : scanner) { - Delete d = new Delete(res.getRow()); - - d.setTimestamp(timestamp); - table.delete(d); - } - } finally { - IOUtils.closeQuietly(scanner); - } - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - public HTableDescriptor getTableDescriptor(String tableName) throws TableNotFoundException, IOException - { - return adm.getTableDescriptor(tableName.getBytes()); - } - - @Override - public boolean tableExists(String tableName) throws IOException - { - return adm.tableExists(tableName); - } - - @Override - public void createTable(HTableDescriptor desc) throws IOException - { - adm.createTable(desc); - } - - @Override - public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException - { - adm.createTable(desc, startKey, endKey, numRegions); - } - - @Override - public int getEstimatedRegionServerCount() - { - int serverCount = -1; - try { - serverCount = adm.getClusterStatus().getServers().size(); - log.debug("Read {} servers from HBase ClusterStatus", serverCount); - } catch (IOException e) { - log.debug("Unable to retrieve HBase cluster status", e); - } - return serverCount; - } - - @Override - public void disableTable(String tableName) throws IOException - { - adm.disableTable(tableName); - } - - @Override - public void enableTable(String tableName) throws IOException - { - adm.enableTable(tableName); - } - - @Override - public boolean isTableDisabled(String tableName) throws IOException - { - return adm.isTableDisabled(tableName); - } - - @Override - public void addColumn(String tableName, HColumnDescriptor columnDescriptor) throws IOException - { - adm.addColumn(tableName, columnDescriptor); - } - - @Override - public void close() throws IOException - { - adm.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java deleted file mode 100644 index 7e8f72d..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseAdmin1_0.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotDisabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.HBaseAdmin; - -public class HBaseAdmin1_0 implements AdminMask -{ - - private static final Logger log = LoggerFactory.getLogger(HBaseAdmin1_0.class); - - private final Admin adm; - - public HBaseAdmin1_0(HBaseAdmin adm) - { - this.adm = adm; - } - @Override - public void clearTable(String tableString, long timestamp) throws IOException - { - TableName tableName = TableName.valueOf(tableString); - - if (!adm.tableExists(tableName)) { - log.debug("Attempted to clear table {} before it exists (noop)", tableString); - return; - } - - if (!adm.isTableDisabled(tableName)) - adm.disableTable(tableName); - - if (!adm.isTableDisabled(tableName)) - throw new RuntimeException("Unable to disable table " + tableName); - - // This API call appears to both truncate and reenable the table. - log.info("Truncating table {}", tableName); - adm.truncateTable(tableName, true /* preserve splits */); - - try { - adm.enableTable(tableName); - } catch (TableNotDisabledException e) { - // This triggers seemingly every time in testing with 1.0.2. - log.debug("Table automatically reenabled by truncation: {}", tableName, e); - } - } - - @Override - public HTableDescriptor getTableDescriptor(String tableString) throws TableNotFoundException, IOException - { - return adm.getTableDescriptor(TableName.valueOf(tableString)); - } - - @Override - public boolean tableExists(String tableString) throws IOException - { - return adm.tableExists(TableName.valueOf(tableString)); - } - - @Override - public void createTable(HTableDescriptor desc) throws IOException - { - adm.createTable(desc); - } - - @Override - public void createTable(HTableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException - { - adm.createTable(desc, startKey, endKey, numRegions); - } - - @Override - public int getEstimatedRegionServerCount() - { - int serverCount = -1; - try { - serverCount = adm.getClusterStatus().getServers().size(); - log.debug("Read {} servers from HBase ClusterStatus", serverCount); - } catch (IOException e) { - log.debug("Unable to retrieve HBase cluster status", e); - } - return serverCount; - } - - @Override - public void disableTable(String tableString) throws IOException - { - adm.disableTable(TableName.valueOf(tableString)); - } - - @Override - public void enableTable(String tableString) throws IOException - { - adm.enableTable(TableName.valueOf(tableString)); - } - - @Override - public boolean isTableDisabled(String tableString) throws IOException - { - return adm.isTableDisabled(TableName.valueOf(tableString)); - } - - @Override - public void addColumn(String tableString, HColumnDescriptor columnDescriptor) throws IOException - { - adm.addColumn(TableName.valueOf(tableString), columnDescriptor); - } - - @Override - public void close() throws IOException - { - adm.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java deleted file mode 100644 index 3bc6c25..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.Delete; - -public interface HBaseCompat { - - /** - * Configure the compression scheme {@code algo} on a column family - * descriptor {@code cd}. The {@code algo} parameter is a string value - * corresponding to one of the values of HBase's Compression enum. The - * Compression enum has moved between packages as HBase has evolved, which - * is why this method has a String argument in the signature instead of the - * enum itself. - * - * @param cd - * column family to configure - * @param algo - * compression type to use - */ - public void setCompression(HColumnDescriptor cd, String algo); - - /** - * Create and return a HTableDescriptor instance with the given name. The - * constructors on this method have remained stable over HBase development - * so far, but the old HTableDescriptor(String) constructor & byte[] friends - * are now marked deprecated and may eventually be removed in favor of the - * HTableDescriptor(TableName) constructor. That constructor (and the - * TableName type) only exists in newer HBase versions. Hence this method. - * - * @param tableName - * HBase table name - * @return a new table descriptor instance - */ - public HTableDescriptor newTableDescriptor(String tableName); - - ConnectionMask createConnection(Configuration conf) throws IOException; - - void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc); - - void setTimestamp(Delete d, long timestamp); -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java deleted file mode 100644 index 2c0f3b4..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat0_98.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.io.compress.Compression; - -public class HBaseCompat0_98 implements HBaseCompat { - - @Override - public void setCompression(HColumnDescriptor cd, String algo) { - cd.setCompressionType(Compression.Algorithm.valueOf(algo)); - } - - @Override - public HTableDescriptor newTableDescriptor(String tableName) { - TableName tn = TableName.valueOf(tableName); - return new HTableDescriptor(tn); - } - - @Override - public ConnectionMask createConnection(Configuration conf) throws IOException - { - return new HConnection0_98(HConnectionManager.createConnection(conf)); - } - - @Override - public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) - { - tdesc.addFamily(cdesc); - } - - @Override - public void setTimestamp(Delete d, long timestamp) - { - d.setTimestamp(timestamp); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java deleted file mode 100644 index 633e525..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_0.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.io.compress.Compression; - -public class HBaseCompat1_0 implements HBaseCompat { - - @Override - public void setCompression(HColumnDescriptor cd, String algo) { - cd.setCompressionType(Compression.Algorithm.valueOf(algo)); - } - - @Override - public HTableDescriptor newTableDescriptor(String tableName) { - TableName tn = TableName.valueOf(tableName); - return new HTableDescriptor(tn); - } - - @Override - public ConnectionMask createConnection(Configuration conf) throws IOException - { - return new HConnection1_0(ConnectionFactory.createConnection(conf)); - } - - @Override - public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) - { - tdesc.addFamily(cdesc); - } - - @Override - public void setTimestamp(Delete d, long timestamp) - { - d.setTimestamp(timestamp); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java deleted file mode 100644 index e5c3d31..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompat1_1.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.io.compress.Compression; - -import java.io.IOException; - -public class HBaseCompat1_1 implements HBaseCompat { - - @Override - public void setCompression(HColumnDescriptor cd, String algo) { - cd.setCompressionType(Compression.Algorithm.valueOf(algo)); - } - - @Override - public HTableDescriptor newTableDescriptor(String tableName) { - TableName tn = TableName.valueOf(tableName); - return new HTableDescriptor(tn); - } - - @Override - public ConnectionMask createConnection(Configuration conf) throws IOException - { - return new HConnection1_0(ConnectionFactory.createConnection(conf)); - } - - @Override - public void addColumnFamilyToTableDescriptor(HTableDescriptor tdesc, HColumnDescriptor cdesc) - { - tdesc.addFamily(cdesc); - } - - @Override - public void setTimestamp(Delete d, long timestamp) - { - d.setTimestamp(timestamp); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java deleted file mode 100644 index 2c0d6fe..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseCompatLoader.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import java.util.Arrays; - -import org.apache.hadoop.hbase.util.VersionInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class HBaseCompatLoader { - - private static final Logger log = LoggerFactory.getLogger(HBaseCompatLoader.class); - - private static final String DEFAULT_HBASE_COMPAT_VERSION = "1.1"; - - private static final String DEFAULT_HBASE_CLASS_NAME = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat1_1"; - - private static HBaseCompat cachedCompat; - - public synchronized static HBaseCompat getCompat(String classOverride) { - - if (null != cachedCompat) { - log.debug("Returning cached HBase compatibility layer: {}", cachedCompat); - return cachedCompat; - } - - HBaseCompat compat; - String className = null; - String classNameSource = null; - - if (null != classOverride) { - className = classOverride; - classNameSource = "from explicit configuration"; - } else { - String hbaseVersion = VersionInfo.getVersion(); - for (String supportedVersion : Arrays.asList("0.94", "0.96", "0.98", "1.0", "1.1")) { - if (hbaseVersion.startsWith(supportedVersion + ".")) { - className = "com.thinkaurelius.titan.diskstorage.hbase.HBaseCompat" + supportedVersion.replaceAll("\\.", "_"); - classNameSource = "supporting runtime HBase version " + hbaseVersion; - break; - } - } - if (null == className) { - log.info("The HBase version {} is not explicitly supported by Titan. " + - "Loading Titan's compatibility layer for its most recent supported HBase version ({})", - hbaseVersion, DEFAULT_HBASE_COMPAT_VERSION); - className = DEFAULT_HBASE_CLASS_NAME; - classNameSource = " by default"; - } - } - - final String errTemplate = " when instantiating HBase compatibility class " + className; - - try { - compat = (HBaseCompat)Class.forName(className).newInstance(); - log.info("Instantiated HBase compatibility layer {}: {}", classNameSource, compat.getClass().getCanonicalName()); - } catch (IllegalAccessException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } catch (InstantiationException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } catch (ClassNotFoundException e) { - throw new RuntimeException(e.getClass().getSimpleName() + errTemplate, e); - } - - return cachedCompat = compat; - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/919120f6/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java b/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java deleted file mode 100644 index 7783a43..0000000 --- a/repository/src/main/java/com/thinkaurelius/titan/diskstorage/hbase/HBaseKeyColumnValueStore.java +++ /dev/null @@ -1,368 +0,0 @@ -/* - * Copyright 2012-2013 Aurelius LLC - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.thinkaurelius.titan.diskstorage.hbase; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.thinkaurelius.titan.diskstorage.*; -import com.thinkaurelius.titan.diskstorage.keycolumnvalue.*; -import com.thinkaurelius.titan.diskstorage.util.RecordIterator; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayBuffer; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntry; -import com.thinkaurelius.titan.diskstorage.util.StaticArrayEntryList; -import com.thinkaurelius.titan.util.system.IOUtils; - -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.filter.ColumnPaginationFilter; -import org.apache.hadoop.hbase.filter.ColumnRangeFilter; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.Closeable; -import java.io.IOException; -import java.util.*; - -/** - * Here are some areas that might need work: - * <p/> - * - batching? (consider HTable#batch, HTable#setAutoFlush(false) - * - tuning HTable#setWriteBufferSize (?) - * - writing a server-side filter to replace ColumnCountGetFilter, which drops - * all columns on the row where it reaches its limit. This requires getSlice, - * currently, to impose its limit on the client side. That obviously won't - * scale. - * - RowMutations for combining Puts+Deletes (need a newer HBase than 0.92 for this) - * - (maybe) fiddle with HTable#setRegionCachePrefetch and/or #prewarmRegionCache - * <p/> - * There may be other problem areas. These are just the ones of which I'm aware. - */ -public class HBaseKeyColumnValueStore implements KeyColumnValueStore { - - private static final Logger logger = LoggerFactory.getLogger(HBaseKeyColumnValueStore.class); - - private final String tableName; - private final HBaseStoreManager storeManager; - - // When using shortened CF names, columnFamily is the shortname and storeName is the longname - // When not using shortened CF names, they are the same - //private final String columnFamily; - private final String storeName; - // This is columnFamily.getBytes() - private final byte[] columnFamilyBytes; - private final HBaseGetter entryGetter; - - private final ConnectionMask cnx; - - HBaseKeyColumnValueStore(HBaseStoreManager storeManager, ConnectionMask cnx, String tableName, String columnFamily, String storeName) { - this.storeManager = storeManager; - this.cnx = cnx; - this.tableName = tableName; - //this.columnFamily = columnFamily; - this.storeName = storeName; - this.columnFamilyBytes = columnFamily.getBytes(); - this.entryGetter = new HBaseGetter(storeManager.getMetaDataSchema(storeName)); - } - - @Override - public void close() throws BackendException { - } - - @Override - public EntryList getSlice(KeySliceQuery query, StoreTransaction txh) throws BackendException { - Map<StaticBuffer, EntryList> result = getHelper(Arrays.asList(query.getKey()), getFilter(query)); - return Iterables.getOnlyElement(result.values(), EntryList.EMPTY_LIST); - } - - @Override - public Map<StaticBuffer,EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh) throws BackendException { - return getHelper(keys, getFilter(query)); - } - - @Override - public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException { - Map<StaticBuffer, KCVMutation> mutations = ImmutableMap.of(key, new KCVMutation(additions, deletions)); - mutateMany(mutations, txh); - } - - @Override - public void acquireLock(StaticBuffer key, - StaticBuffer column, - StaticBuffer expectedValue, - StoreTransaction txh) throws BackendException { - throw new UnsupportedOperationException(); - } - - @Override - public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh) throws BackendException { - return executeKeySliceQuery(query.getKeyStart().as(StaticBuffer.ARRAY_FACTORY), - query.getKeyEnd().as(StaticBuffer.ARRAY_FACTORY), - new FilterList(FilterList.Operator.MUST_PASS_ALL), - query); - } - - @Override - public String getName() { - return storeName; - } - - @Override - public KeyIterator getKeys(SliceQuery query, StoreTransaction txh) throws BackendException { - return executeKeySliceQuery(new FilterList(FilterList.Operator.MUST_PASS_ALL), query); - } - - public static Filter getFilter(SliceQuery query) { - byte[] colStartBytes = query.getSliceEnd().length() > 0 ? query.getSliceStart().as(StaticBuffer.ARRAY_FACTORY) : null; - byte[] colEndBytes = query.getSliceEnd().length() > 0 ? query.getSliceEnd().as(StaticBuffer.ARRAY_FACTORY) : null; - - Filter filter = new ColumnRangeFilter(colStartBytes, true, colEndBytes, false); - - if (query.hasLimit()) { - filter = new FilterList(FilterList.Operator.MUST_PASS_ALL, - filter, - new ColumnPaginationFilter(query.getLimit(), 0)); - } - - logger.debug("Generated HBase Filter {}", filter); - - return filter; - } - - private Map<StaticBuffer,EntryList> getHelper(List<StaticBuffer> keys, Filter getFilter) throws BackendException { - List<Get> requests = new ArrayList<Get>(keys.size()); - { - for (StaticBuffer key : keys) { - Get g = new Get(key.as(StaticBuffer.ARRAY_FACTORY)).addFamily(columnFamilyBytes).setFilter(getFilter); - try { - g.setTimeRange(0, Long.MAX_VALUE); - } catch (IOException e) { - throw new PermanentBackendException(e); - } - requests.add(g); - } - } - - Map<StaticBuffer,EntryList> resultMap = new HashMap<StaticBuffer,EntryList>(keys.size()); - - try { - TableMask table = null; - Result[] results = null; - - try { - table = cnx.getTable(tableName); - results = table.get(requests); - } finally { - IOUtils.closeQuietly(table); - } - - if (results == null) - return KCVSUtil.emptyResults(keys); - - assert results.length==keys.size(); - - for (int i = 0; i < results.length; i++) { - Result result = results[i]; - NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> f = result.getMap(); - - if (f == null) { // no result for this key - resultMap.put(keys.get(i), EntryList.EMPTY_LIST); - continue; - } - - // actual key with <timestamp, value> - NavigableMap<byte[], NavigableMap<Long, byte[]>> r = f.get(columnFamilyBytes); - resultMap.put(keys.get(i), (r == null) - ? EntryList.EMPTY_LIST - : StaticArrayEntryList.ofBytes(r.entrySet(), entryGetter)); - } - - return resultMap; - } catch (IOException e) { - throw new TemporaryBackendException(e); - } - } - - private void mutateMany(Map<StaticBuffer, KCVMutation> mutations, StoreTransaction txh) throws BackendException { - storeManager.mutateMany(ImmutableMap.of(storeName, mutations), txh); - } - - private KeyIterator executeKeySliceQuery(FilterList filters, @Nullable SliceQuery columnSlice) throws BackendException { - return executeKeySliceQuery(null, null, filters, columnSlice); - } - - private KeyIterator executeKeySliceQuery(@Nullable byte[] startKey, - @Nullable byte[] endKey, - FilterList filters, - @Nullable SliceQuery columnSlice) throws BackendException { - Scan scan = new Scan().addFamily(columnFamilyBytes); - - try { - scan.setTimeRange(0, Long.MAX_VALUE); - } catch (IOException e) { - throw new PermanentBackendException(e); - } - - if (startKey != null) - scan.setStartRow(startKey); - - if (endKey != null) - scan.setStopRow(endKey); - - if (columnSlice != null) { - filters.addFilter(getFilter(columnSlice)); - } - - TableMask table = null; - - try { - table = cnx.getTable(tableName); - return new RowIterator(table, table.getScanner(scan.setFilter(filters)), columnFamilyBytes); - } catch (IOException e) { - IOUtils.closeQuietly(table); - throw new PermanentBackendException(e); - } - } - - private class RowIterator implements KeyIterator { - private final Closeable table; - private final Iterator<Result> rows; - private final byte[] columnFamilyBytes; - - private Result currentRow; - private boolean isClosed; - - public RowIterator(Closeable table, ResultScanner rows, byte[] columnFamilyBytes) { - this.table = table; - this.columnFamilyBytes = Arrays.copyOf(columnFamilyBytes, columnFamilyBytes.length); - this.rows = Iterators.filter(rows.iterator(), new Predicate<Result>() { - @Override - public boolean apply(@Nullable Result result) { - if (result == null) - return false; - - try { - StaticBuffer id = StaticArrayBuffer.of(result.getRow()); - id.getLong(0); - } catch (NumberFormatException e) { - return false; - } - - return true; - } - }); - } - - @Override - public RecordIterator<Entry> getEntries() { - ensureOpen(); - - return new RecordIterator<Entry>() { - private final Iterator<Map.Entry<byte[], NavigableMap<Long, byte[]>>> kv = currentRow.getMap().get(columnFamilyBytes).entrySet().iterator(); - - @Override - public boolean hasNext() { - ensureOpen(); - return kv.hasNext(); - } - - @Override - public Entry next() { - ensureOpen(); - return StaticArrayEntry.ofBytes(kv.next(), entryGetter); - } - - @Override - public void close() { - isClosed = true; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public boolean hasNext() { - ensureOpen(); - return rows.hasNext(); - } - - @Override - public StaticBuffer next() { - ensureOpen(); - - currentRow = rows.next(); - return StaticArrayBuffer.of(currentRow.getRow()); - } - - @Override - public void close() { - IOUtils.closeQuietly(table); - isClosed = true; - logger.debug("RowIterator closed table {}", table); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - private void ensureOpen() { - if (isClosed) - throw new IllegalStateException("Iterator has been closed."); - } - } - - private static class HBaseGetter implements StaticArrayEntry.GetColVal<Map.Entry<byte[], NavigableMap<Long, byte[]>>, byte[]> { - - private final EntryMetaData[] schema; - - private HBaseGetter(EntryMetaData[] schema) { - this.schema = schema; - } - - @Override - public byte[] getColumn(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return element.getKey(); - } - - @Override - public byte[] getValue(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return element.getValue().lastEntry().getValue(); - } - - @Override - public EntryMetaData[] getMetaSchema(Map.Entry<byte[], NavigableMap<Long, byte[]>> element) { - return schema; - } - - @Override - public Object getMetaData(Map.Entry<byte[], NavigableMap<Long, byte[]>> element, EntryMetaData meta) { - switch(meta) { - case TIMESTAMP: - return element.getValue().lastEntry().getKey(); - default: - throw new UnsupportedOperationException("Unsupported meta data: " + meta); - } - } - } -}
