Repository: phoenix Updated Branches: refs/heads/5.x-HBase-2.0 62027bff1 -> d85e9165a
PHOENIX-4303 Remove HTable and Use Table APIs(Rajeshbabu) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d85e9165 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d85e9165 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d85e9165 Branch: refs/heads/5.x-HBase-2.0 Commit: d85e9165a7113449efb30cc9ab645e51da89629d Parents: 62027bf Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Thu Nov 9 14:01:08 2017 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Thu Nov 9 14:01:08 2017 +0530 ---------------------------------------------------------------------- .../wal/WALRecoveryRegionPostOpenIT.java | 13 ++++++---- ...ReplayWithIndexWritesAndCompressedWALIT.java | 9 ++++--- .../phoenix/end2end/AggregateQueryIT.java | 20 ++++++++++----- .../phoenix/end2end/FlappingLocalIndexIT.java | 7 +++--- .../end2end/NamespaceSchemaMappingIT.java | 8 +++--- .../apache/phoenix/end2end/RowTimestampIT.java | 14 +++++++---- .../apache/phoenix/end2end/StoreNullsIT.java | 9 ++++--- .../org/apache/phoenix/end2end/UseSchemaIT.java | 6 +++-- .../phoenix/end2end/index/DropColumnIT.java | 18 +++++++------- .../index/IndexWithTableSchemaChangeIT.java | 6 ++--- .../phoenix/end2end/index/LocalIndexIT.java | 14 ++++++----- .../index/MutableIndexReplicationIT.java | 8 +++--- ...erRegionServerIndexRpcControllerFactory.java | 3 +-- ...egionServerMetadataRpcControllerFactory.java | 3 +-- .../IndexHalfStoreFileReaderGenerator.java | 8 +++--- .../UngroupedAggregateRegionObserver.java | 1 - .../apache/phoenix/execute/DelegateHTable.java | 26 ++++++++++++++++++++ .../phoenix/mapreduce/AbstractBulkLoadTool.java | 25 ++++++++++++------- .../mapreduce/MultiHfileOutputFormat.java | 20 +++++++++------ .../phoenix/mapreduce/index/IndexTool.java | 25 ++++++++++++------- .../transaction/PhoenixTransactionalTable.java | 10 ++++---- .../phoenix/hbase/index/IndexTestingUtils.java | 10 ++++---- .../index/write/TestWALRecoveryCaching.java | 11 +++++---- 23 files changed, 174 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java index d74ddb2..20d59a7 100644 --- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java +++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALRecoveryRegionPostOpenIT.java @@ -44,12 +44,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; @@ -192,7 +194,8 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest { this.assertRegionServerDifferent(miniHBaseCluster); Scan scan = new Scan(); - HTable primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration()); + Table primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME)); ResultScanner resultScanner = primaryTable.getScanner(scan); int count = 0; for (Result result : resultScanner) { @@ -244,7 +247,7 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest { // the index table is one row - HTable indexTable = new HTable(getUtility().getConfiguration(), INDEX_TABLE_NAME); + Table indexTable = hbaseConn.getTable(TableName.valueOf(INDEX_TABLE_NAME)); resultScanner = indexTable.getScanner(scan); count = 0; for (Result result : resultScanner) { @@ -256,8 +259,8 @@ public class WALRecoveryRegionPostOpenIT extends BaseTest { scan = new Scan(); primaryTable.close(); - primaryTable = new HTable(getUtility().getConfiguration(), DATA_TABLE_NAME); - primaryTable.getConnection().clearRegionCache(); + primaryTable = hbaseConn.getTable(TableName.valueOf(DATA_TABLE_NAME)); + ((ClusterConnection)hbaseConn).clearRegionCache(); resultScanner = primaryTable.getScanner(scan); count = 0; for (Result result : resultScanner) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java index dfff8fe..b504acd 100644 --- a/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java +++ b/phoenix-core/src/it/java/org/apache/hadoop/hbase/regionserver/wal/WALReplayWithIndexWritesAndCompressedWALIT.java @@ -38,13 +38,14 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; @@ -223,9 +224,11 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { // initialize the region - this should replay the WALEdits from the WAL region1.initialize(); + org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(UTIL.getConfiguration()); // now check to ensure that we wrote to the index table - HTable index = new HTable(UTIL.getConfiguration(), INDEX_TABLE_NAME); + Table index = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(INDEX_TABLE_NAME)); int indexSize = getKeyValueCount(index); assertEquals("Index wasn't propertly updated from WAL replay!", 1, indexSize); Get g = new Get(rowkey); @@ -290,7 +293,7 @@ public class WALReplayWithIndexWritesAndCompressedWALIT { } @SuppressWarnings("deprecation") -private int getKeyValueCount(HTable table) throws IOException { +private int getKeyValueCount(Table table) throws IOException { Scan scan = new Scan(); scan.setMaxVersions(Integer.MAX_VALUE - 1); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java index 6c85774..437ee4f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AggregateQueryIT.java @@ -35,8 +35,13 @@ import java.sql.ResultSet; import java.util.Collection; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ClusterConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.util.ByteUtil; @@ -100,17 +105,20 @@ public class AggregateQueryIT extends BaseQueryIT { byte[] tableNameBytes = Bytes.toBytes(tableName); admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin(); - HTable htable = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); - htable.clearRegionCache(); - int nRegions = htable.getRegionLocations().size(); + Table htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(tableNameBytes); + Configuration configuration = conn.unwrap(PhoenixConnection.class).getQueryServices().getConfiguration(); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(configuration); + ((ClusterConnection)hbaseConn).clearRegionCache(TableName.valueOf(tableName)); + RegionLocator regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName)); + int nRegions = regionLocator.getAllRegionLocations().size(); admin.split(tableNameBytes, ByteUtil.concat(Bytes.toBytes(tenantId), Bytes.toBytes("00A3"))); int retryCount = 0; do { Thread.sleep(2000); retryCount++; //htable.clearRegionCache(); - } while (retryCount < 10 && htable.getRegionLocations().size() == nRegions); - assertNotEquals(nRegions, htable.getRegionLocations().size()); + } while (retryCount < 10 && regionLocator.getAllRegionLocations().size() == nRegions); + assertNotEquals(nRegions, regionLocator.getAllRegionLocations().size()); statement.setString(1, tenantId); rs = statement.executeQuery(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java index e2f3970..0d64be0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FlappingLocalIndexIT.java @@ -32,10 +32,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -286,8 +286,9 @@ public class FlappingLocalIndexIT extends BaseLocalIndexIT { assertTrue(rs.next()); assertEquals(4, rs.getInt(1)); HBaseAdmin admin = driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES).getAdmin(); - HTable indexTable = new HTable(admin.getConfiguration(),Bytes.toBytes(indexPhysicalTableName)); - Pair<byte[][], byte[][]> startEndKeys = indexTable.getStartEndKeys(); + org.apache.hadoop.hbase.client.Connection hbaseConn = admin.getConnection(); + Table indexTable = hbaseConn.getTable(TableName.valueOf(indexPhysicalTableName)); + Pair<byte[][], byte[][]> startEndKeys = hbaseConn.getRegionLocator(TableName.valueOf(indexPhysicalTableName)).getStartEndKeys(); byte[][] startKeys = startEndKeys.getFirst(); byte[][] endKeys = startEndKeys.getSecond(); for (int i = 0; i < startKeys.length; i++) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java index 0dfd550..d9a27f5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/NamespaceSchemaMappingIT.java @@ -30,8 +30,8 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; @@ -71,14 +71,14 @@ public class NamespaceSchemaMappingIT extends ParallelStatsDisabledIT { Put put = new Put(PVarchar.INSTANCE.toBytes(phoenixFullTableName)); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - HTable phoenixSchematable = new HTable(admin.getConfiguration(), phoenixFullTableName); + Table phoenixSchematable = admin.getConnection().getTable(TableName.valueOf(phoenixFullTableName)); phoenixSchematable.put(put); phoenixSchematable.close(); put = new Put(PVarchar.INSTANCE.toBytes(hbaseFullTableName)); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); phoenixSchematable.close(); - HTable namespaceMappedtable = new HTable(admin.getConfiguration(), hbaseFullTableName); + Table namespaceMappedtable = admin.getConnection().getTable(TableName.valueOf(hbaseFullTableName)); namespaceMappedtable.put(put); namespaceMappedtable.close(); Properties props = new Properties(); @@ -92,7 +92,7 @@ public class NamespaceSchemaMappingIT extends ParallelStatsDisabledIT { assertTrue(rs.next()); assertEquals(phoenixFullTableName, rs.getString(1)); - HTable metatable = new HTable(admin.getConfiguration(), + Table metatable = admin.getConnection().getTable( SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, (conn.unwrap(PhoenixConnection.class).getQueryServices().getProps()))); Put p = new Put(SchemaUtil.getTableKey(null, schemaName, tableName)); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java index 458cc38..930092d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RowTimestampIT.java @@ -32,10 +32,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixStatement; @@ -143,14 +145,15 @@ public class RowTimestampIT extends ParallelStatsDisabledIT { // verify that the timestamp of the keyvalues matches the ROW_TIMESTAMP column value Scan scan = new Scan(); byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst(); - HTable hTable = new HTable(getUtility().getConfiguration(), tableName); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration()); + Table hTable = hbaseConn.getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = hTable.getScanner(scan); for (Result result : resultScanner) { long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp(); assertEquals(rowTimestampDate.getTime(), timeStamp); } if (!mutable) { - hTable = new HTable(getUtility().getConfiguration(), indexName); + hTable = hbaseConn.getTable(TableName.valueOf(indexName)); resultScanner = hTable.getScanner(scan); for (Result result : resultScanner) { long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp(); @@ -253,14 +256,15 @@ public class RowTimestampIT extends ParallelStatsDisabledIT { // verify that the timestamp of the keyvalues matches the ROW_TIMESTAMP column value Scan scan = new Scan(); byte[] emptyKVQualifier = EncodedColumnsUtil.getEmptyKeyValueInfo(true).getFirst(); - HTable hTable = new HTable(getUtility().getConfiguration(), tableName); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(getUtility().getConfiguration()); + Table hTable = hbaseConn.getTable(TableName.valueOf(tableName)); ResultScanner resultScanner = hTable.getScanner(scan); for (Result result : resultScanner) { long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp(); assertEquals(rowTimestampDate.getTime(), timeStamp); } if (!mutable) { - hTable = new HTable(getUtility().getConfiguration(), indexName); + hTable = hbaseConn.getTable(TableName.valueOf(indexName)); resultScanner = hTable.getScanner(scan); for (Result result : resultScanner) { long timeStamp = result.getColumnLatest(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, emptyKVQualifier).getTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java index 378a9ed..63f127c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StoreNullsIT.java @@ -33,10 +33,12 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.SingleCellColumnExpression; @@ -133,8 +135,9 @@ public class StoreNullsIT extends ParallelStatsDisabledIT { rs1.next(); assertNull(rs1.getString(1)); rs1.next(); - - HTable htable = new HTable(getUtility().getConfiguration(), dataTableName); + Table htable = + ConnectionFactory.createConnection(getUtility().getConfiguration()).getTable( + TableName.valueOf(dataTableName)); Scan s = new Scan(); s.setRaw(true); ResultScanner scanner = htable.getScanner(s); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java index 07ae77e..a578bd3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UseSchemaIT.java @@ -30,9 +30,11 @@ import java.util.Properties; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; @@ -172,7 +174,7 @@ public class UseSchemaIT extends ParallelStatsDisabledIT { Put put = new Put(PVarchar.INSTANCE.toBytes(fullTablename)); put.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.EMPTY_COLUMN_BYTES, QueryConstants.EMPTY_COLUMN_VALUE_BYTES); - HTable phoenixSchematable = new HTable(admin.getConfiguration(), fullTablename); + Table phoenixSchematable = admin.getConnection().getTable(TableName.valueOf(fullTablename)); phoenixSchematable.put(put); phoenixSchematable.close(); conn.createStatement().execute("CREATE VIEW " + tableName + " (tablename VARCHAR PRIMARY KEY)"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java index badb2a6..766e924 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropColumnIT.java @@ -35,10 +35,10 @@ import java.util.Collection; import java.util.Properties; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -198,7 +198,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { scan.setRaw(true); scan.setStartRow(key); scan.setStopRow(key); - HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); ResultScanner results = table.getScanner(scan); Result result = results.next(); assertNotNull(result); @@ -209,7 +209,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { // key value for v2 should have been deleted from the global index table scan = new Scan(); scan.setRaw(true); - table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes()); + table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes()); results = table.getScanner(scan); result = results.next(); assertNotNull(result); @@ -220,7 +220,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { scan = new Scan(); scan.setRaw(true); scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES); - table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); + table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); results = table.getScanner(scan); result = results.next(); assertNotNull(result); @@ -248,7 +248,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { byte[] key = Bytes.toBytes("a"); scan.setStartRow(key); scan.setStopRow(key); - HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); ResultScanner results = table.getScanner(scan); Result result = results.next(); assertNotNull(result); @@ -268,7 +268,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { // key value for v2 should exist in the global index table scan = new Scan(); scan.setRaw(true); - table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes()); + table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(indexTableName.getBytes()); results = table.getScanner(scan); result = results.next(); assertNotNull(result); @@ -288,7 +288,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { scan = new Scan(); scan.setRaw(true); scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES); - table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); + table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(dataTableName.getBytes()); results = table.getScanner(scan); result = results.next(); assertNotNull(result); @@ -379,7 +379,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { // there should be a single row belonging to localIndexTableName2 Scan scan = new Scan(); scan.addFamily(QueryConstants.DEFAULT_LOCAL_INDEX_COLUMN_FAMILY_BYTES); - HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(localIndexTablePhysicalName.getBytes()); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(localIndexTablePhysicalName.getBytes()); ResultScanner results = table.getScanner(scan); Result result = results.next(); assertNotNull(result); @@ -502,7 +502,7 @@ public class DropColumnIT extends ParallelStatsDisabledIT { // scan the physical table and verify there is a single row for the second local index Scan scan = new Scan(); - HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(viewIndexPhysicalTable); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(viewIndexPhysicalTable); ResultScanner results = table.getScanner(scan); Result result = results.next(); assertNotNull(result); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java index aad7f73..7fc1a3a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexWithTableSchemaChangeIT.java @@ -31,10 +31,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Properties; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -526,7 +526,7 @@ public class IndexWithTableSchemaChangeIT extends ParallelStatsDisabledIT { // verify data table rows Scan scan = new Scan(); - HTable table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); + Table table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName)); ResultScanner results = table.getScanner(scan); for (Result res : results) { assertNull("Column value was not deleted",res.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("V2"))); @@ -541,7 +541,7 @@ public class IndexWithTableSchemaChangeIT extends ParallelStatsDisabledIT { // verify index table rows scan = new Scan(); - table = (HTable) conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexTableFullName)); + table = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(indexTableFullName)); results = table.getScanner(scan); for (Result res : results) { assertNull("Column value was not deleted",res.getValue(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, Bytes.toBytes("0:V2"))); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 6ea96a9..615d2aa 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -44,8 +44,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -174,11 +175,12 @@ public class LocalIndexIT extends BaseLocalIndexIT { HTableDescriptor htd = admin .getTableDescriptor(Bytes.toBytes(indexPhysicalTableName)); assertEquals(IndexRegionSplitPolicy.class.getName(), htd.getValue(HTableDescriptor.SPLIT_POLICY)); - try (HTable userTable = new HTable(admin.getConfiguration(), - SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped))) { - try (HTable indexTable = new HTable(admin.getConfiguration(), Bytes.toBytes(indexPhysicalTableName))) { - assertArrayEquals("Both user table and index table should have same split keys.", - userTable.getStartKeys(), indexTable.getStartKeys()); + try(org.apache.hadoop.hbase.client.Connection c = ConnectionFactory.createConnection(admin.getConfiguration())) { + try (RegionLocator userTable= c.getRegionLocator(SchemaUtil.getPhysicalTableName(tableName.getBytes(), isNamespaceMapped))) { + try (RegionLocator indxTable = c.getRegionLocator(TableName.valueOf(indexPhysicalTableName))) { + assertArrayEquals("Both user table and index table should have same split keys.", + userTable.getStartKeys(), indxTable.getStartKeys()); + } } } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java index 48265ed..9c6923c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java @@ -42,11 +42,12 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; @@ -245,7 +246,8 @@ public class MutableIndexReplicationIT extends BaseTest { // lookup tables. For right now, we just go through an HTable LOG.info("Looking up tables in replication target"); TableName[] tables = admin2.listTableNames(); - HTable remoteTable = new HTable(utility2.getConfiguration(), tables[0]); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(utility2.getConfiguration()); + Table remoteTable = hbaseConn.getTable(tables[0]); for (int i = 0; i < REPLICATION_RETRIES; i++) { if (i >= REPLICATION_RETRIES - 1) { fail("Waited too much time for put replication on table " + remoteTable @@ -261,7 +263,7 @@ public class MutableIndexReplicationIT extends BaseTest { remoteTable.close(); } - private boolean ensureAnyRows(HTable remoteTable) throws IOException { + private boolean ensureAnyRows(Table remoteTable) throws IOException { Scan scan = new Scan(); scan.setRaw(true); ResultScanner scanner = remoteTable.getScanner(scan); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java index 5a7f75f..47b6c40 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerIndexRpcControllerFactory.java @@ -22,12 +22,11 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** - * {@link RpcControllerFactory} that should only be used when creating {@link HTable} for + * {@link RpcControllerFactory} that should only be used when creating {@link Table} for * making remote RPCs to the region servers hosting global mutable index table regions. * This controller factory shouldn't be globally configured anywhere and is meant to be used * only internally by Phoenix indexing code. http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java index 37f3927..3f63ac3 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InterRegionServerMetadataRpcControllerFactory.java @@ -22,12 +22,11 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; /** - * {@link RpcControllerFactory} that should only be used when creating {@link HTable} for + * {@link RpcControllerFactory} that should only be used when creating {@link Table} for * making remote RPCs to the region servers hosting Phoenix SYSTEM tables. */ public class InterRegionServerMetadataRpcControllerFactory extends RpcControllerFactory { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java index 88154a7..992e65f 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java @@ -36,10 +36,11 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -98,10 +99,11 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver { } if(scvf != null) scan.setFilter(scvf); byte[] regionStartKeyInHFile = null; - HTable metaTable = null; + Connection connection = ctx.getEnvironment().getConnection(); + Table metaTable = null; PhoenixConnection conn = null; try { - metaTable = new HTable(ctx.getEnvironment().getConfiguration(), TableName.META_TABLE_NAME); + metaTable = connection.getTable(TableName.META_TABLE_NAME)); ResultScanner scanner = null; Result result = null; try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index e68f95e..ab6309c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java index 444bb5d..15d5cf6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/DelegateHTable.java @@ -20,6 +20,7 @@ package org.apache.phoenix.execute; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; @@ -294,4 +295,29 @@ public class DelegateHTable implements Table { public void setWriteRpcTimeout(int writeRpcTimeout) { delegate.setWriteRpcTimeout(writeRpcTimeout); } + + @Override + public boolean[] exists(List<Get> gets) throws IOException { + return delegate.existsAll(gets); + } + + @Override + public long getRpcTimeout(TimeUnit unit) { + return delegate.getRpcTimeout(); + } + + @Override + public long getReadRpcTimeout(TimeUnit unit) { + return delegate.getReadRpcTimeout(unit); + } + + @Override + public long getWriteRpcTimeout(TimeUnit unit) { + return delegate.getWriteRpcTimeout(unit); + } + + @Override + public long getOperationTimeout(TimeUnit unit) { + return delegate.getOperationTimeout(unit); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java index f717647..22e5c3c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java @@ -41,7 +41,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; @@ -289,13 +293,15 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { job.setOutputValueClass(KeyValue.class); job.setReducerClass(FormatToKeyValueReducer.class); byte[][] splitKeysBeforeJob = null; - HTable table = null; + org.apache.hadoop.hbase.client.Connection hbaseConn = + ConnectionFactory.createConnection(job.getConfiguration()); + RegionLocator regionLocator = null; if(hasLocalIndexes) { try{ - table = new HTable(job.getConfiguration(), qualifiedTableName); - splitKeysBeforeJob = table.getRegionLocator().getStartKeys(); + regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName)); + splitKeysBeforeJob = regionLocator.getStartKeys(); } finally { - if(table != null )table.close(); + if(regionLocator != null )regionLocator.close(); } } MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded); @@ -315,8 +321,8 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { if (success) { if (hasLocalIndexes) { try { - table = new HTable(job.getConfiguration(), qualifiedTableName); - if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, table.getRegionLocator().getStartKeys())) { + regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(qualifiedTableName)); + if(!IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) { LOG.error("The table " + qualifiedTableName + " has local indexes and there is split key mismatch before and" @@ -325,7 +331,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { return -1; } } finally { - if (table != null) table.close(); + if (regionLocator != null) regionLocator.close(); } } LOG.info("Loading HFiles from {}", outputPath); @@ -350,9 +356,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String tableName = table.getPhysicalName(); Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName); - HTable htable = new HTable(conf,tableName); + org.apache.hadoop.hbase.client.Connection hbaseConn = ConnectionFactory.createConnection(conf); + Table htable = hbaseConn.getTable(TableName.valueOf(tableName)); LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath); - loader.doBulkLoad(tableOutputPath, htable); + loader.doBulkLoad(tableOutputPath, hbaseConn.getAdmin(), htable, hbaseConn.getRegionLocator(TableName.valueOf(tableName))); LOG.info("Incremental load complete for table=" + tableName); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java index 30f21ce..c888b7d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/MultiHfileOutputFormat.java @@ -40,7 +40,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.compress.Compression; @@ -650,6 +652,7 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce * @param tablesToBeLoaded * @throws IOException */ + @SuppressWarnings("deprecation") public static void configureIncrementalLoad(Job job, List<TargetTableRef> tablesToBeLoaded) throws IOException { Configuration conf = job.getConfiguration(); @@ -662,13 +665,16 @@ public class MultiHfileOutputFormat extends FileOutputFormat<TableRowkeyPair, Ce Set<TableRowkeyPair> tablesStartKeys = Sets.newTreeSet(); for(TargetTableRef table : tablesToBeLoaded) { final String tableName = table.getPhysicalName(); - try(HTable htable = new HTable(conf,tableName);){ - Set<TableRowkeyPair> startKeys = getRegionStartKeys(tableName , htable.getRegionLocator()); + try(Connection hbaseConn = ConnectionFactory.createConnection(conf);){ + Set<TableRowkeyPair> startKeys = + getRegionStartKeys(tableName, + hbaseConn.getRegionLocator(TableName.valueOf(tableName))); tablesStartKeys.addAll(startKeys); - String compressionConfig = configureCompression(htable.getTableDescriptor()); - String bloomTypeConfig = configureBloomType(htable.getTableDescriptor()); - String blockSizeConfig = configureBlockSize(htable.getTableDescriptor()); - String blockEncodingConfig = configureDataBlockEncoding(htable.getTableDescriptor()); + HTableDescriptor tableDescriptor = hbaseConn.getTable(TableName.valueOf(tableName)).getTableDescriptor(); + String compressionConfig = configureCompression(tableDescriptor); + String bloomTypeConfig = configureBloomType(tableDescriptor); + String blockSizeConfig = configureBlockSize(tableDescriptor); + String blockEncodingConfig = configureDataBlockEncoding(tableDescriptor); Map<String,String> tableConfigs = Maps.newHashMap(); if(StringUtils.isNotBlank(compressionConfig)) { tableConfigs.put(COMPRESSION_FAMILIES_CONF_KEY, compressionConfig); http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index af080b4..cf13075 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -48,11 +48,10 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; @@ -479,7 +478,8 @@ public class IndexTool extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Connection connection = null; - HTable htable = null; + Table htable = null; + RegionLocator regionLocator = null; try { CommandLine cmdLine = null; try { @@ -508,11 +508,14 @@ public class IndexTool extends Configured implements Tool { } pindexTable = PhoenixRuntime.getTable(connection, schemaName != null && !schemaName.isEmpty() ? SchemaUtil.getQualifiedTableName(schemaName, indexTable) : indexTable); - htable = (HTable)connection.unwrap(PhoenixConnection.class).getQueryServices() + htable = connection.unwrap(PhoenixConnection.class).getQueryServices() .getTable(pindexTable.getPhysicalName().getBytes()); + regionLocator = + ConnectionFactory.createConnection(configuration).getRegionLocator( + TableName.valueOf(pindexTable.getPhysicalName().getBytes())); if (IndexType.LOCAL.equals(pindexTable.getIndexType())) { isLocalIndexBuild = true; - splitKeysBeforeJob = htable.getRegionLocator().getStartKeys(); + splitKeysBeforeJob = regionLocator.getStartKeys(); } } @@ -539,11 +542,12 @@ public class IndexTool extends Configured implements Tool { if (result) { if (!useDirectApi && indexTable != null) { if (isLocalIndexBuild) { - validateSplitForLocalIndex(splitKeysBeforeJob, htable); + validateSplitForLocalIndex(splitKeysBeforeJob, regionLocator); } LOG.info("Loading HFiles from {}", outputPath); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); - loader.doBulkLoad(outputPath, htable); + loader.doBulkLoad(outputPath, connection.unwrap(PhoenixConnection.class) + .getQueryServices().getAdmin(), htable, regionLocator); htable.close(); // Without direct API, we need to update the index state to ACTIVE from client. IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE); @@ -566,6 +570,9 @@ public class IndexTool extends Configured implements Tool { if (htable != null) { htable.close(); } + if(regionLocator != null) { + regionLocator.close(); + } } catch (SQLException sqle) { LOG.error("Failed to close connection ", sqle.getMessage()); throw new RuntimeException("Failed to close connection"); @@ -575,9 +582,9 @@ public class IndexTool extends Configured implements Tool { - private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, HTable htable) throws Exception { + private boolean validateSplitForLocalIndex(byte[][] splitKeysBeforeJob, RegionLocator regionLocator) throws Exception { if (splitKeysBeforeJob != null - && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, htable.getRegionLocator().getStartKeys())) { + && !IndexUtil.matchingSplitKeys(splitKeysBeforeJob, regionLocator.getStartKeys())) { String errMsg = "The index to build is local index and the split keys are not matching" + " before and after running the job. Please rerun the job otherwise" + " there may be inconsistencies between actual data and index data"; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java index 1293a21..ef3a8fc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/transaction/PhoenixTransactionalTable.java @@ -117,27 +117,27 @@ public interface PhoenixTransactionalTable extends Table { public void delete(List<Delete> deletes) throws IOException; /** - * Delegates to {@link HTable#setAutoFlush(boolean autoFlush)} + * Delegates to {@link Table#setAutoFlush(boolean autoFlush)} */ public void setAutoFlush(boolean autoFlush); /** - * Delegates to {@link HTable#isAutoFlush()} + * Delegates to {@link Table#isAutoFlush()} */ public boolean isAutoFlush(); /** - * Delegates to see HTable.getWriteBufferSize() + * Delegates to see Table.getWriteBufferSize() */ public long getWriteBufferSize(); /** - * Delegates to see HTable.setWriteBufferSize() + * Delegates to see Table.setWriteBufferSize() */ public void setWriteBufferSize(long writeBufferSize) throws IOException; /** - * Delegates to see HTable.flushCommits() + * Delegates to see Table.flushCommits() */ public void flushCommits() throws IOException; http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java index 7fa9c8e..5868103 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/IndexTestingUtils.java @@ -28,10 +28,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Bytes; @@ -63,9 +63,9 @@ public class IndexTestingUtils { * @throws IOException */ @SuppressWarnings("javadoc") - public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected, + public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected, long start, long end, byte[] startKey, byte[] endKey) throws IOException { - LOG.debug("Scanning " + Bytes.toString(index1.getTableName()) + " between times (" + start + LOG.debug("Scanning " + index1.getName().getNameAsString() + " between times (" + start + ", " + end + "] and keys: [" + Bytes.toString(startKey) + ", " + Bytes.toString(endKey) + "]."); Scan s = new Scan(startKey, endKey); @@ -82,12 +82,12 @@ public class IndexTestingUtils { assertEquals("Didn't get the expected kvs from the index table!", expected, received); } - public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected, long ts, + public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected, long ts, byte[] startKey) throws IOException { IndexTestingUtils.verifyIndexTableAtTimestamp(index1, expected, ts, startKey, HConstants.EMPTY_END_ROW); } - public static void verifyIndexTableAtTimestamp(HTable index1, List<KeyValue> expected, long start, + public static void verifyIndexTableAtTimestamp(Table index1, List<KeyValue> expected, long start, byte[] startKey, byte[] endKey) throws IOException { verifyIndexTableAtTimestamp(index1, expected, start, start + 1, startKey, endKey); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d85e9165/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java index faee74a..62cb24e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java @@ -39,13 +39,15 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -192,9 +194,9 @@ public class TestWALRecoveryCaching { // load some data into the table Put p = new Put(Bytes.toBytes("row")); p.addColumn(family, qual, Bytes.toBytes("value")); - HTable primary = new HTable(conf, testTable.getTableName()); + Connection hbaseConn = ConnectionFactory.createConnection(conf); + Table primary = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(testTable.getTableName())); primary.put(p); - primary.flushCommits(); // turn on the recovery latch allowIndexTableToRecover = new CountDownLatch(1); @@ -236,7 +238,6 @@ public class TestWALRecoveryCaching { Put p2 = new Put(p.getRow()); p2.addColumn(nonIndexedFamily, Bytes.toBytes("Not indexed"), Bytes.toBytes("non-indexed value")); primary.put(p2); - primary.flushCommits(); // make sure that we actually failed the write once (within a 5 minute window) assertTrue("Didn't find an error writing to index table within timeout!", @@ -245,7 +246,7 @@ public class TestWALRecoveryCaching { // scan the index to make sure it has the one entry, (that had to be replayed from the WAL, // since we hard killed the server) Scan s = new Scan(); - HTable index = new HTable(conf, getIndexTableName()); + Table index = hbaseConn.getTable(org.apache.hadoop.hbase.TableName.valueOf(getIndexTableName())); ResultScanner scanner = index.getScanner(s); int count = 0; for (Result r : scanner) {