KYLIN-920 & KYLIN-782 Upgrade to HBase 1.1 (with help from murkrishn <murkris...@ebay.com>)
Signed-off-by: Li, Yang <yang...@ebay.com> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a871c60 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a871c60 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a871c60 Branch: refs/heads/1.x-HBase1.1.3 Commit: 5a871c607f86d623ae3dc7f42a0c489c6e5d558a Parents: 0603a19 Author: Yang Li <liy...@apache.org> Authored: Sun Aug 16 20:22:13 2015 +0800 Committer: Li, Yang <yang...@ebay.com> Committed: Wed Dec 23 10:19:13 2015 +0800 ---------------------------------------------------------------------- .../common/persistence/HBaseConnection.java | 254 ++++---- .../common/persistence/HBaseResourceStore.java | 31 +- .../common/util/HBaseRegionSizeCalculator.java | 41 +- .../kylin/common/util/BasicHadoopTest.java | 11 +- .../kylin/job/cube/GarbageCollectionStep.java | 22 +- .../kylin/job/hadoop/cube/CubeHFileJob.java | 18 +- .../job/hadoop/cube/StorageCleanupJob.java | 26 +- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 8 +- .../hadoop/invertedindex/IICreateHFileJob.java | 22 +- .../hadoop/invertedindex/IICreateHTableJob.java | 11 +- .../apache/kylin/job/tools/CleanHtableCLI.java | 8 +- .../kylin/job/tools/CubeMigrationCLI.java | 64 +- .../kylin/job/tools/DeployCoprocessorCLI.java | 625 ++++++++++--------- .../job/tools/GridTableHBaseBenchmark.java | 37 +- .../kylin/job/tools/HtableAlterMetadataCLI.java | 8 +- .../apache/kylin/job/tools/RowCounterCLI.java | 11 +- .../org/apache/kylin/job/ExportHBaseData.java | 18 +- .../kylin/job/hadoop/hbase/TestHbaseClient.java | 13 +- .../kylin/job/tools/HBaseRowDigestTest.java | 11 +- monitor/pom.xml | 6 + .../kylin/monitor/MonitorMetaManager.java | 49 +- pom.xml | 14 +- .../apache/kylin/rest/service/AclService.java | 38 +- .../apache/kylin/rest/service/CubeService.java | 35 +- .../apache/kylin/rest/service/QueryService.java | 21 +- .../apache/kylin/rest/service/UserService.java | 27 +- .../storage/filter/BitMapFilterEvaluator.java | 1 - .../storage/hbase/CubeSegmentTupleIterator.java | 19 +- .../kylin/storage/hbase/CubeStorageEngine.java | 4 +- .../storage/hbase/HBaseClientKVIterator.java | 187 +++--- .../hbase/InvertedIndexStorageEngine.java | 114 ++-- .../kylin/storage/hbase/PingHBaseCLI.java | 179 +++--- .../storage/hbase/RegionScannerAdapter.java | 10 +- .../hbase/SerializedHBaseTupleIterator.java | 4 +- .../endpoint/EndpointTupleIterator.java | 15 +- .../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +- .../observer/AggregateRegionObserver.java | 2 +- .../observer/AggregationScanner.java | 14 +- .../observer/ObserverAggregationCache.java | 10 +- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../storage/hbase/InvertedIndexHBaseTest.java | 227 ++++--- .../observer/AggregateRegionObserverTest.java | 72 +-- .../minicluster/HiveMiniClusterTest.java | 3 +- 43 files changed, 1151 insertions(+), 1145 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java index 9c86376..3c07654 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java @@ -1,122 +1,132 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.kylin.common.util.HadoopUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - * - */ -public class HBaseConnection { - - private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); - - private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>(); - - static { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - for (HConnection conn : ConnPool.values()) { - try { - conn.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - }); - } - - public static void clearCache() { - ConnPool.clear(); - } - - public static HConnection get(String url) { - // find configuration - Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - - HConnection connection = ConnPool.get(url); - try { - // I don't use DCL since recreate a connection is not a big issue. - if (connection == null) { - connection = HConnectionManager.createConnection(conf); - ConnPool.put(url, connection); - } - } catch (Throwable t) { - throw new StorageException("Error when open connection " + url, t); - } - - return connection; - } - - public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { - createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); - } - - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - - try { - boolean tableExist = false; - try { - hbase.getTableDescriptor(TableName.valueOf(tableName)); - tableExist = true; - } catch (TableNotFoundException e) { - } - - if (tableExist) { - logger.debug("HTable '" + tableName + "' already exists"); - return; - } - - logger.debug("Creating HTable '" + tableName + "'"); - - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - - if (null != families && families.length > 0) { - for (String family : families) { - HColumnDescriptor fd = new HColumnDescriptor(family); - fd.setInMemory(true); // metadata tables are best in memory - desc.addFamily(fd); - } - } - hbase.createTable(desc); - - logger.debug("HTable '" + tableName + "' created"); - } finally { - hbase.close(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + * + */ +public class HBaseConnection { + + private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); + + private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>(); + private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + for (Connection conn : ConnPool.values()) { + try { + conn.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + }); + } + + public static void clearCache() { + ConnPool.clear(); + } + + public static Connection get() { + return get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + } + + public static Connection get(String url) { + // find configuration + Configuration conf = ConfigCache.get(url); + if (conf == null) { + conf = HadoopUtil.newHBaseConfiguration(url); + ConfigCache.put(url, conf); + } + + Connection connection = ConnPool.get(url); + try { + // I don't use DCL since recreate a connection is not a big issue. + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + ConnPool.put(url, connection); + } + } catch (Throwable t) { + throw new StorageException("Error when open connection " + url, t); + } + + return connection; + } + + public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { + createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); + } + + public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException { + Admin admin = conn.getAdmin(); + + try { + boolean tableExist = false; + try { + admin.getTableDescriptor(TableName.valueOf(tableName)); + tableExist = true; + } catch (TableNotFoundException e) { + } + + if (tableExist) { + logger.debug("HTable '" + tableName + "' already exists"); + return; + } + + logger.debug("Creating HTable '" + tableName + "'"); + + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + + if (null != families && families.length > 0) { + for (String family : families) { + HColumnDescriptor fd = new HColumnDescriptor(family); + fd.setInMemory(true); // metadata tables are best in memory + desc.addFamily(fd); + } + } + admin.createTable(desc); + + logger.debug("HTable '" + tableName + "' created"); + } finally { + admin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index ac14e7b..a3bb6e3 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -34,13 +34,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; @@ -77,7 +78,7 @@ public class HBaseResourceStore extends ResourceStore { // final Map<String, String> tableNameMap; // path prefix ==> HBase table name - private HConnection getConnection() throws IOException { + private Connection getConnection() throws IOException { return HBaseConnection.get(hbaseUrl); } @@ -114,7 +115,7 @@ public class HBaseResourceStore extends ResourceStore { ArrayList<String> result = new ArrayList<String>(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); Scan scan = new Scan(startRow, endRow); scan.setFilter(new KeyOnlyFilter()); try { @@ -149,7 +150,7 @@ public class HBaseResourceStore extends ResourceStore { scan.addColumn(B_FAMILY, B_COLUMN_TS); scan.addColumn(B_FAMILY, B_COLUMN); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); List<RawResource> result = Lists.newArrayList(); try { ResultScanner scanner = table.getScanner(scan); @@ -211,13 +212,12 @@ public class HBaseResourceStore extends ResourceStore { IOUtils.copy(content, bout); bout.close(); - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); table.put(put); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -225,7 +225,7 @@ public class HBaseResourceStore extends ResourceStore { @Override protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { byte[] row = Bytes.toBytes(resPath); byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); @@ -237,8 +237,6 @@ public class HBaseResourceStore extends ResourceStore { throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS); } - table.flushCommits(); - return newTS; } finally { IOUtils.closeQuietly(table); @@ -247,11 +245,10 @@ public class HBaseResourceStore extends ResourceStore { @Override protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { Delete del = new Delete(Bytes.toBytes(resPath)); table.delete(del); - table.flushCommits(); } finally { IOUtils.closeQuietly(table); } @@ -276,7 +273,7 @@ public class HBaseResourceStore extends ResourceStore { scan.addColumn(B_FAMILY, B_COLUMN_TS); } - HTableInterface table = getConnection().getTable(getAllInOneTableName()); + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); try { ResultScanner scanner = table.getScanner(scan); Result result = null; @@ -295,7 +292,7 @@ public class HBaseResourceStore extends ResourceStore { return endRow; } - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); FileSystem fileSystem = FileSystem.get(hconf); @@ -321,7 +318,7 @@ public class HBaseResourceStore extends ResourceStore { return redirectPath; } - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); if (content.length > kvSizeLimit) { writeLargeCellToHdfs(resPath, content, table); @@ -329,8 +326,8 @@ public class HBaseResourceStore extends ResourceStore { } Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + put.addColumn(B_FAMILY, B_COLUMN, content); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); return put; } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java index 093ac9e..ccbb6f0 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java +++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java @@ -23,19 +23,24 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,30 +58,31 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(HTable table) throws IOException { - this(table, new HBaseAdmin(table.getConfiguration())); - } - - /** Constructor for unit testing */ - HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { - + public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException { + Table table = null; + Admin admin = null; + try { + table = hbaseConnection.getTable(TableName.valueOf(tableName)); + admin = hbaseConnection.getAdmin(); + if (!enabled(table.getConfiguration())) { logger.info("Region size calculation disabled."); return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table \"" + table.getName() + "\"."); // Get regions for table. - Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); + List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation hRegionLocation : regionLocationList) { + tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); } - ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); + ClusterStatus clusterStatus = admin.getClusterStatus(); Collection<ServerName> servers = clusterStatus.getServers(); final long megaByte = 1024L * 1024L; @@ -99,7 +105,8 @@ public class HBaseRegionSizeCalculator { } } } finally { - hBaseAdmin.close(); + IOUtils.closeQuietly(table); + IOUtils.closeQuietly(admin); } } @@ -124,4 +131,4 @@ public class HBaseRegionSizeCalculator { public Map<byte[], Long> getRegionSizeMap() { return Collections.unmodifiableMap(sizeMap); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java index 6d2762c..481fc6c 100644 --- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java @@ -21,12 +21,11 @@ package org.apache.kylin.common.util; import java.io.File; import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.kylin.common.persistence.HBaseConnection; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -56,16 +55,14 @@ public class BasicHadoopTest { cf.setBlocksize(4 * 1024 * 1024); // set to 4MB tableDesc.addFamily(cf); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get().getAdmin(); admin.createTable(tableDesc); admin.close(); } @Test public void testRetriveHtableHost() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(); for (HTableDescriptor table : tableDescriptors) { String value = table.getValue("KYLIN_HOST"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index f2f1fc0..8c61a3a 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -24,14 +24,13 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.cmd.ShellCmdOutput; import org.apache.kylin.job.exception.ExecuteException; @@ -99,19 +98,18 @@ public class GarbageCollectionStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + admin = HBaseConnection.get().getAdmin(); for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 3c1e4a5..6f36eff 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -19,11 +19,15 @@ package org.apache.kylin.job.hadoop.cube; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.mapreduce.Job; @@ -31,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.BatchConstants; @@ -47,6 +52,8 @@ public class CubeHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; + Table table = null; try { options.addOption(OPTION_JOB_NAME); @@ -80,10 +87,12 @@ public class CubeHFileJob extends AbstractHadoopJob { attachKylinPropsAndMetadata(cube, job.getConfiguration()); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTable htable = new HTable(conf, tableName); + connection = HBaseConnection.get(); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); //Automatic config ! - HFileOutputFormat.configureIncrementalLoad(job, htable); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -96,6 +105,7 @@ public class CubeHFileJob extends AbstractHadoopJob { printUsage(options); throw e; } finally { + IOUtils.closeQuietly(table); if (job != null) cleanupTempConfFile(job.getConfiguration()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 3b25ee1..184b6cd 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -18,6 +18,13 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -50,13 +59,6 @@ import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - /** * @author ysong1 */ @@ -107,7 +109,7 @@ public class StorageCleanupJob extends AbstractHadoopJob { IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -141,9 +143,9 @@ public class StorageCleanupJob extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { log.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - hbaseAdmin.disableTable(htableName); - hbaseAdmin.deleteTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); log.info("Deleted HBase table " + htableName); } else { log.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index 027c0ca..9f5e062 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -25,11 +25,10 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; @@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get().getAdmin(); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob { byte[][] splitKeys = getSplits(conf, partitionFilePath); - if (admin.tableExists(tableName)) { + if (admin.tableExists(TableName.valueOf(tableName))) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java index c032bbc..fa42148 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java @@ -19,17 +19,20 @@ package org.apache.kylin.job.hadoop.invertedindex; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; @@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; + Table table = null; try { options.addOption(OPTION_JOB_NAME); @@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob { job.setMapOutputValueClass(KeyValue.class); String tableName = getOptionValue(OPTION_HTABLE_NAME); - HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); - HFileOutputFormat.configureIncrementalLoad(job, htable); + + connection = HBaseConnection.get(); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); this.deletePath(job.getConfiguration(), output); @@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob { } catch (Exception e) { printUsage(options); throw e; + } finally { + IOUtils.closeQuietly(table); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java index 32d065a..63777ef 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java @@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.invertedindex.IIInstance; @@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob { DeployCoprocessorCLI.deployCoprocessor(tableDesc); // drop the table first - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); + Admin admin = HBaseConnection.get().getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); } // create table http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java index b6e5af5..7fc1d72 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java @@ -21,11 +21,10 @@ package org.apache.kylin.job.tools; import java.io.IOException; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob { } private void clean() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java index 962a4ee..72680c9 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java @@ -18,14 +18,31 @@ package org.apache.kylin.job.tools; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.*; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.RawResource; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -46,11 +63,6 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - /** * Created by honma on 9/3/14. * <p/> @@ -70,7 +82,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private static Admin hbaseAdmin; public static void main(String[] args) throws IOException, InterruptedException { @@ -110,8 +122,7 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); - Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); + hbaseAdmin = HBaseConnection.get().getAdmin(); hdfsFS = FileSystem.get(new Configuration()); @@ -131,6 +142,8 @@ public class CubeMigrationCLI { } else { showOpts(); } + + IOUtils.closeQuietly(hbaseAdmin); } public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { @@ -294,10 +307,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); logger.info("CHANGE_HTABLE_HOST is completed"); break; } @@ -402,20 +415,19 @@ public class CubeMigrationCLI { } case COPY_ACL: { String cubeId = (String) opt.params[0]; - HTableInterface srcAclHtable = null; - HTableInterface destAclHtable = null; + Table srcAclHtable = null; + Table destAclHtable = null; try { - srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(srcConfig.getMetadataUrlPrefix() + "_acl"); - destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl"); + srcAclHtable = HBaseConnection.get(srcConfig.getMetadataUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + "_acl")); + destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl")); Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId))); for (Cell cell : result.listCells()) { Put put = new Put(Bytes.toBytes(cubeId)); - put.add(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); + put.addColumn(CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), CellUtil.cloneValue(cell)); destAclHtable.put(put); } - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(srcAclHtable); IOUtils.closeQuietly(destAclHtable); @@ -442,10 +454,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); break; } case COPY_FILE_IN_META: { @@ -474,12 +486,10 @@ public class CubeMigrationCLI { } case COPY_ACL: { String cubeId = (String) opt.params[0]; - HTableInterface destAclHtable = null; + Table destAclHtable = null; try { - destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(dstConfig.getMetadataUrlPrefix() + "_acl"); - + destAclHtable = HBaseConnection.get(dstConfig.getMetadataUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + "_acl")); destAclHtable.delete(new Delete(Bytes.toBytes(cubeId))); - destAclHtable.flushCommits(); } finally { IOUtils.closeQuietly(destAclHtable); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java index 5482684..239c7ec 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/DeployCoprocessorCLI.java @@ -1,313 +1,314 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.job.tools; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.HadoopUtil; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.CubeSegment; -import org.apache.kylin.invertedindex.IIInstance; -import org.apache.kylin.invertedindex.IIManager; -import org.apache.kylin.invertedindex.IISegment; -import org.apache.kylin.metadata.model.SegmentStatusEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - */ -public class DeployCoprocessorCLI { - - private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class); - - public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver"; - public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint"; - - public static void main(String[] args) throws IOException { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); - Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf); - - String localCoprocessorJar = new File(args[0]).getAbsolutePath(); - logger.info("Identify coprocessor jar " + localCoprocessorJar); - - List<String> tableNames = getHTableNames(kylinConfig); - logger.info("Identify tables " + tableNames); - - Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames); - logger.info("Old coprocessor jar: " + oldJarPaths); - - Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); - logger.info("New coprocessor jar: " + hdfsCoprocessorJar); - - List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); - - // Don't remove old jars, missing coprocessor jar will fail hbase - // removeOldJars(oldJarPaths, fileSystem); - - hbaseAdmin.close(); - - logger.info("Processed " + processedTables); - logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); - } - - public static void deployCoprocessor(HTableDescriptor tableDesc) { - try { - initHTableCoprocessor(tableDesc); - logger.info("hbase table " + tableDesc.getName() + " deployed with coprocessor."); - - } catch (Exception ex) { - logger.error("Error deploying coprocessor on " + tableDesc.getName(), ex); - logger.error("Will try creating the table without coprocessor."); - } - } - - private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { - KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.job.tools; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.HadoopUtil; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.invertedindex.IIInstance; +import org.apache.kylin.invertedindex.IIManager; +import org.apache.kylin.invertedindex.IISegment; +import org.apache.kylin.metadata.model.SegmentStatusEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + */ +public class DeployCoprocessorCLI { + + private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class); + + public static final String OBSERVER_CLS_NAME = "org.apache.kylin.storage.hbase.coprocessor.observer.AggregateRegionObserver"; + public static final String ENDPOINT_CLS_NAMAE = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint"; + + public static void main(String[] args) throws IOException { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); - Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null); - - DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - } - - public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException { - logger.info("Add coprocessor on " + desc.getNameAsString()); - desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null); - desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null); - } - - public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { - logger.info("Disable " + tableName); - hbaseAdmin.disableTable(tableName); - - logger.info("Unset coprocessor on " + tableName); - HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) { - desc.removeCoprocessor(OBSERVER_CLS_NAME); - } - while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) { - desc.removeCoprocessor(ENDPOINT_CLS_NAMAE); - } - - addCoprocessorOnHTable(desc, hdfsCoprocessorJar); - hbaseAdmin.modifyTable(tableName, desc); - - logger.info("Enable " + tableName); - hbaseAdmin.enableTable(tableName); - } - - private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { - List<String> processed = new ArrayList<String>(); - - for (String tableName : tableNames) { - try { - resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); - processed.add(tableName); - } catch (IOException ex) { - logger.error("Error processing " + tableName, ex); - } - } - return processed; - } - - public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException { - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config); - FileStatus newestJar = null; - for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { - if (fileStatus.getPath().toString().endsWith(".jar")) { - if (newestJar == null) { - newestJar = fileStatus; - } else { - if (newestJar.getModificationTime() < fileStatus.getModificationTime()) - newestJar = fileStatus; - } - } - } - if (newestJar == null) - return null; - - Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null); - logger.info("The newest coprocessor is " + path.toString()); - return path; - } - - public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException { - Path uploadPath = null; - File localCoprocessorFile = new File(localCoprocessorJar); - - // check existing jars - if (oldJarPaths == null) { - oldJarPaths = new HashSet<String>(); - } - Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv()); - for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { - if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) { - uploadPath = fileStatus.getPath(); - break; - } - String filename = fileStatus.getPath().toString(); - if (filename.endsWith(".jar")) { - oldJarPaths.add(filename); - } - } - - // upload if not existing - if (uploadPath == null) { - // figure out a unique new jar file name - Set<String> oldJarNames = new HashSet<String>(); - for (String path : oldJarPaths) { - oldJarNames.add(new Path(path).getName()); - } - String baseName = getBaseFileName(localCoprocessorJar); - String newName = null; - int i = 0; - while (newName == null) { - newName = baseName + "-" + (i++) + ".jar"; - if (oldJarNames.contains(newName)) - newName = null; - } - - // upload - uploadPath = new Path(coprocessorDir, newName); - FileInputStream in = null; - FSDataOutputStream out = null; - try { - in = new FileInputStream(localCoprocessorFile); - out = fileSystem.create(uploadPath); - IOUtils.copy(in, out); - } finally { - IOUtils.closeQuietly(in); - IOUtils.closeQuietly(out); - } - - fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1); - - } - - uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null); - return uploadPath; - } - - private static String getBaseFileName(String localCoprocessorJar) { - File localJar = new File(localCoprocessorJar); - String baseName = localJar.getName(); - if (baseName.endsWith(".jar")) - baseName = baseName.substring(0, baseName.length() - ".jar".length()); - return baseName; - } - - private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException { - String hdfsWorkingDirectory = config.getHdfsWorkingDirectory(); - Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor"); - fileSystem.mkdirs(coprocessorDir); - return coprocessorDir; - } - - private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException { - HashSet<String> result = new HashSet<String>(); - - for (String tableName : tableNames) { - HTableDescriptor tableDescriptor = null; - try { - tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - } catch (TableNotFoundException e) { - logger.warn("Table not found " + tableName, e); - continue; - } - - Matcher keyMatcher; - Matcher valueMatcher; - for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) { - keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get())); - if (!keyMatcher.matches()) { - continue; - } - valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get())); - if (!valueMatcher.matches()) { - continue; - } - - String jarPath = valueMatcher.group(1).trim(); - String clsName = valueMatcher.group(2).trim(); - - if (OBSERVER_CLS_NAME.equals(clsName)) { - result.add(jarPath); - } - } - } - - return result; - } - - private static List<String> getHTableNames(KylinConfig config) { - CubeManager cubeMgr = CubeManager.getInstance(config); - - ArrayList<String> result = new ArrayList<String>(); - for (CubeInstance cube : cubeMgr.listAllCubes()) { - for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) { - String tableName = seg.getStorageLocationIdentifier(); - if (StringUtils.isBlank(tableName) == false) { - result.add(tableName); - System.out.println("added new table: " + tableName); - } - } - } - - for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) { - for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) { - String tableName = seg.getStorageLocationIdentifier(); - if (StringUtils.isBlank(tableName) == false) { - result.add(tableName); - System.out.println("added new table: " + tableName); - } - } - } - - return result; - } -} + FileSystem fileSystem = FileSystem.get(hconf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); + + String localCoprocessorJar = new File(args[0]).getAbsolutePath(); + logger.info("Identify coprocessor jar " + localCoprocessorJar); + + List<String> tableNames = getHTableNames(kylinConfig); + logger.info("Identify tables " + tableNames); + + Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames); + logger.info("Old coprocessor jar: " + oldJarPaths); + + Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths); + logger.info("New coprocessor jar: " + hdfsCoprocessorJar); + + List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames); + + // Don't remove old jars, missing coprocessor jar will fail hbase + // removeOldJars(oldJarPaths, fileSystem); + + hbaseAdmin.close(); + + logger.info("Processed " + processedTables); + logger.info("Active coprocessor jar: " + hdfsCoprocessorJar); + } + + public static void deployCoprocessor(HTableDescriptor tableDesc) { + try { + initHTableCoprocessor(tableDesc); + logger.info("hbase table " + tableDesc.getTableName() + " deployed with coprocessor."); + + } catch (Exception ex) { + logger.error("Error deploying coprocessor on " + tableDesc.getTableName(), ex); + logger.error("Will try creating the table without coprocessor."); + } + } + + private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException { + KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); + Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); + FileSystem fileSystem = FileSystem.get(hconf); + + String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar(); + Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, fileSystem, null); + + DeployCoprocessorCLI.addCoprocessorOnHTable(desc, hdfsCoprocessorJar); + } + + public static void addCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException { + logger.info("Add coprocessor on " + desc.getNameAsString()); + desc.addCoprocessor(ENDPOINT_CLS_NAMAE, hdfsCoprocessorJar, 1000, null); + desc.addCoprocessor(OBSERVER_CLS_NAME, hdfsCoprocessorJar, 1001, null); + } + + public static void resetCoprocessor(String tableName, Admin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException { + logger.info("Disable " + tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); + + logger.info("Unset coprocessor on " + tableName); + HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + while (desc.hasCoprocessor(OBSERVER_CLS_NAME)) { + desc.removeCoprocessor(OBSERVER_CLS_NAME); + } + while (desc.hasCoprocessor(ENDPOINT_CLS_NAMAE)) { + desc.removeCoprocessor(ENDPOINT_CLS_NAMAE); + } + + addCoprocessorOnHTable(desc, hdfsCoprocessorJar); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + + logger.info("Enable " + tableName); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); + } + + private static List<String> resetCoprocessorOnHTables(Admin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException { + List<String> processed = new ArrayList<String>(); + + for (String tableName : tableNames) { + try { + resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar); + processed.add(tableName); + } catch (IOException ex) { + logger.error("Error processing " + tableName, ex); + } + } + return processed; + } + + public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException { + Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config); + FileStatus newestJar = null; + for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { + if (fileStatus.getPath().toString().endsWith(".jar")) { + if (newestJar == null) { + newestJar = fileStatus; + } else { + if (newestJar.getModificationTime() < fileStatus.getModificationTime()) + newestJar = fileStatus; + } + } + } + if (newestJar == null) + return null; + + Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null); + logger.info("The newest coprocessor is " + path.toString()); + return path; + } + + public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException { + Path uploadPath = null; + File localCoprocessorFile = new File(localCoprocessorJar); + + // check existing jars + if (oldJarPaths == null) { + oldJarPaths = new HashSet<String>(); + } + Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv()); + for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) { + if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) { + uploadPath = fileStatus.getPath(); + break; + } + String filename = fileStatus.getPath().toString(); + if (filename.endsWith(".jar")) { + oldJarPaths.add(filename); + } + } + + // upload if not existing + if (uploadPath == null) { + // figure out a unique new jar file name + Set<String> oldJarNames = new HashSet<String>(); + for (String path : oldJarPaths) { + oldJarNames.add(new Path(path).getName()); + } + String baseName = getBaseFileName(localCoprocessorJar); + String newName = null; + int i = 0; + while (newName == null) { + newName = baseName + "-" + (i++) + ".jar"; + if (oldJarNames.contains(newName)) + newName = null; + } + + // upload + uploadPath = new Path(coprocessorDir, newName); + FileInputStream in = null; + FSDataOutputStream out = null; + try { + in = new FileInputStream(localCoprocessorFile); + out = fileSystem.create(uploadPath); + IOUtils.copy(in, out); + } finally { + IOUtils.closeQuietly(in); + IOUtils.closeQuietly(out); + } + + fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), -1); + + } + + uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null); + return uploadPath; + } + + private static String getBaseFileName(String localCoprocessorJar) { + File localJar = new File(localCoprocessorJar); + String baseName = localJar.getName(); + if (baseName.endsWith(".jar")) + baseName = baseName.substring(0, baseName.length() - ".jar".length()); + return baseName; + } + + private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException { + String hdfsWorkingDirectory = config.getHdfsWorkingDirectory(); + Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor"); + fileSystem.mkdirs(coprocessorDir); + return coprocessorDir; + } + + private static Set<String> getCoprocessorJarPaths(Admin hbaseAdmin, List<String> tableNames) throws IOException { + HashSet<String> result = new HashSet<String>(); + + for (String tableName : tableNames) { + HTableDescriptor tableDescriptor = null; + try { + tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); + } catch (TableNotFoundException e) { + logger.warn("Table not found " + tableName, e); + continue; + } + + Matcher keyMatcher; + Matcher valueMatcher; + for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) { + keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get())); + if (!keyMatcher.matches()) { + continue; + } + valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get())); + if (!valueMatcher.matches()) { + continue; + } + + String jarPath = valueMatcher.group(1).trim(); + String clsName = valueMatcher.group(2).trim(); + + if (OBSERVER_CLS_NAME.equals(clsName)) { + result.add(jarPath); + } + } + } + + return result; + } + + private static List<String> getHTableNames(KylinConfig config) { + CubeManager cubeMgr = CubeManager.getInstance(config); + + ArrayList<String> result = new ArrayList<String>(); + for (CubeInstance cube : cubeMgr.listAllCubes()) { + for (CubeSegment seg : cube.getSegments(SegmentStatusEnum.READY)) { + String tableName = seg.getStorageLocationIdentifier(); + if (StringUtils.isBlank(tableName) == false) { + result.add(tableName); + System.out.println("added new table: " + tableName); + } + } + } + + for (IIInstance ii : IIManager.getInstance(config).listAllIIs()) { + for (IISegment seg : ii.getSegments(SegmentStatusEnum.READY)) { + String tableName = seg.getStorageLocationIdentifier(); + if (StringUtils.isBlank(tableName) == false) { + result.add(tableName); + System.out.println("added new table: " + tableName); + } + } + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java index 70e1df6..5fe5e58 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java +++ b/job/src/main/java/org/apache/kylin/job/tools/GridTableHBaseBenchmark.java @@ -28,13 +28,13 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.KeyOnlyFilter; import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.Bytes; @@ -74,8 +74,7 @@ public class GridTableHBaseBenchmark { public static void testGridTable(double hitRatio, double indexRatio) throws IOException { System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio); String hbaseUrl = "hbase"; // use hbase-site.xml on classpath - - HConnection conn = HBaseConnection.get(hbaseUrl); + Connection conn = HBaseConnection.get(hbaseUrl); createHTableIfNeeded(conn, TEST_TABLE); prepareData(conn); @@ -91,10 +90,10 @@ public class GridTableHBaseBenchmark { } - private static void testColumnScan(HConnection conn, List<Pair<Integer, Integer>> colScans) throws IOException { + private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException { Stats stats = new Stats("COLUMN_SCAN"); - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -122,20 +121,20 @@ public class GridTableHBaseBenchmark { } } - private static void testRowScanNoIndexFullScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException { fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL")); } - private static void testRowScanNoIndexSkipScan(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP")); } - private static void testRowScanWithIndex(HConnection conn, boolean[] hits) throws IOException { + private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException { jumpScan(conn, hits, new Stats("ROW_SCAN_IDX")); } - private static void fullScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -156,11 +155,11 @@ public class GridTableHBaseBenchmark { } } - private static void jumpScan(HConnection conn, boolean[] hits, Stats stats) throws IOException { + private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException { final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience - HTableInterface table = conn.getTable(TEST_TABLE); + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { stats.markStart(); @@ -204,8 +203,8 @@ public class GridTableHBaseBenchmark { } } - private static void prepareData(HConnection conn) throws IOException { - HTableInterface table = conn.getTable(TEST_TABLE); + private static void prepareData(Connection conn) throws IOException { + Table table = conn.getTable(TableName.valueOf(TEST_TABLE)); try { // check how many rows existing @@ -232,7 +231,7 @@ public class GridTableHBaseBenchmark { byte[] rowkey = Bytes.toBytes(i); Put put = new Put(rowkey); byte[] cell = randomBytes(); - put.add(CF, QN, cell); + put.addColumn(CF, QN, cell); table.put(put); nBytes += cell.length; dot(i, N_ROWS); @@ -258,8 +257,8 @@ public class GridTableHBaseBenchmark { return bytes; } - private static void createHTableIfNeeded(HConnection conn, String tableName) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); + private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException { + Admin hbase = conn.getAdmin(); try { boolean tableExist = false; http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java index 53930e3..e283748 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/HtableAlterMetadataCLI.java @@ -23,12 +23,11 @@ import java.io.IOException; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,8 +70,7 @@ public class HtableAlterMetadataCLI extends AbstractHadoopJob { } private void alter() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); hbaseAdmin.disableTable(table.getTableName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a871c60/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java index 3329d27..4d44088 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/RowCounterCLI.java @@ -22,11 +22,12 @@ import java.util.Iterator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; import org.slf4j.Logger; @@ -69,8 +70,8 @@ public class RowCounterCLI { logger.info("My Scan " + scan.toString()); - HConnection conn = HConnectionManager.createConnection(conf); - HTableInterface tableInterface = conn.getTable(htableName); + Connection conn = ConnectionFactory.createConnection(conf); + Table tableInterface = conn.getTable(TableName.valueOf(htableName)); Iterator<Result> iterator = tableInterface.getScanner(scan).iterator(); int counter = 0;