KYLIN-920 & KYLIN-782 Upgrade to HBase 1.1 (with help from murkrishn <[email protected]>)
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/5063c386 Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/5063c386 Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/5063c386 Branch: refs/heads/1.x-HBase1.1.3 Commit: 5063c386ffd87ae49f3325f271708bd16f44dc93 Parents: 41ad16e Author: Yang Li <[email protected]> Authored: Sun Aug 16 20:22:13 2015 +0800 Committer: Li, Yang <[email protected]> Committed: Tue Nov 3 18:18:01 2015 +0800 ---------------------------------------------------------------------- .../common/persistence/HBaseConnection.java | 251 +++---- .../common/persistence/HBaseResourceStore.java | 665 +++++++++---------- .../common/util/HBaseRegionSizeCalculator.java | 41 +- .../kylin/common/util/BasicHadoopTest.java | 11 +- .../kylin/job/cube/GarbageCollectionStep.java | 22 +- .../kylin/job/hadoop/cube/CubeHFileJob.java | 18 +- .../job/hadoop/cube/StorageCleanupJob.java | 26 +- .../kylin/job/hadoop/hbase/CreateHTableJob.java | 8 +- .../hadoop/invertedindex/IICreateHFileJob.java | 22 +- .../hadoop/invertedindex/IICreateHTableJob.java | 11 +- .../apache/kylin/job/tools/CleanHtableCLI.java | 8 +- .../kylin/job/tools/CubeMigrationCLI.java | 24 +- .../kylin/job/tools/DeployCoprocessorCLI.java | 625 ++++++++--------- .../job/tools/GridTableHBaseBenchmark.java | 37 +- .../kylin/job/tools/HtableAlterMetadataCLI.java | 8 +- .../apache/kylin/job/tools/RowCounterCLI.java | 11 +- .../org/apache/kylin/job/ExportHBaseData.java | 18 +- .../kylin/job/hadoop/hbase/TestHbaseClient.java | 13 +- .../kylin/job/tools/HBaseRowDigestTest.java | 11 +- monitor/pom.xml | 6 + .../kylin/monitor/MonitorMetaManager.java | 49 +- pom.xml | 17 +- .../apache/kylin/rest/service/AclService.java | 38 +- .../apache/kylin/rest/service/CubeService.java | 35 +- .../apache/kylin/rest/service/QueryService.java | 21 +- .../apache/kylin/rest/service/UserService.java | 27 +- .../storage/hbase/CubeSegmentTupleIterator.java | 21 +- .../kylin/storage/hbase/CubeStorageEngine.java | 4 +- .../storage/hbase/HBaseClientKVIterator.java | 187 +++--- .../hbase/InvertedIndexStorageEngine.java | 114 ++-- .../kylin/storage/hbase/PingHBaseCLI.java | 179 ++--- .../storage/hbase/RegionScannerAdapter.java | 10 +- .../hbase/SerializedHBaseTupleIterator.java | 4 +- .../endpoint/EndpointTupleIterator.java | 15 +- .../hbase/coprocessor/endpoint/IIEndpoint.java | 2 +- .../observer/AggregateRegionObserver.java | 2 +- .../observer/AggregationScanner.java | 14 +- .../observer/ObserverAggregationCache.java | 10 +- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../storage/hbase/InvertedIndexHBaseTest.java | 227 ++++--- .../observer/AggregateRegionObserverTest.java | 72 +- .../minicluster/HiveMiniClusterTest.java | 3 +- 42 files changed, 1442 insertions(+), 1449 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java index c4d0314..85a08a1 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseConnection.java @@ -1,123 +1,128 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.kylin.common.util.HadoopUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * @author yangli9 - * - */ -public class HBaseConnection { - - private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); - - private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>(); - private static final Map<String, HConnection> ConnPool = new ConcurrentHashMap<String, HConnection>(); - - static { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - for (HConnection conn : ConnPool.values()) { - try { - conn.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - } - }); - } - - public static HConnection get(String url) { - // find configuration - Configuration conf = ConfigCache.get(url); - if (conf == null) { - conf = HadoopUtil.newHBaseConfiguration(url); - ConfigCache.put(url, conf); - } - - HConnection connection = ConnPool.get(url); - try { - // I don't use DCL since recreate a connection is not a big issue. - if (connection == null) { - connection = HConnectionManager.createConnection(conf); - ConnPool.put(url, connection); - } - } catch (Throwable t) { - throw new StorageException("Error when open connection " + url, t); - } - - return connection; - } - - public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { - createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); - } - - public static void createHTableIfNeeded(HConnection conn, String tableName, String... families) throws IOException { - HBaseAdmin hbase = new HBaseAdmin(conn); - - try { - boolean tableExist = false; - try { - hbase.getTableDescriptor(TableName.valueOf(tableName)); - tableExist = true; - } catch (TableNotFoundException e) { - } - - if (tableExist) { - logger.debug("HTable '" + tableName + "' already exists"); - return; - } - - logger.debug("Creating HTable '" + tableName + "'"); - - HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); - - if (null != families && families.length > 0) { - for (String family : families) { - HColumnDescriptor fd = new HColumnDescriptor(family); - fd.setInMemory(true); // metadata tables are best in memory - desc.addFamily(fd); - } - } - hbase.createTable(desc); - - logger.debug("HTable '" + tableName + "' created"); - } finally { - hbase.close(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangli9 + * + */ +public class HBaseConnection { + + private static final Logger logger = LoggerFactory.getLogger(HBaseConnection.class); + + private static final Map<String, Configuration> ConfigCache = new ConcurrentHashMap<String, Configuration>(); + private static final Map<String, Connection> ConnPool = new ConcurrentHashMap<String, Connection>(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + for (Connection conn : ConnPool.values()) { + try { + conn.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + }); + } + + public static Connection get() { + return get(KylinConfig.getInstanceFromEnv().getStorageUrl()); + } + + public static Connection get(String url) { + // find configuration + Configuration conf = ConfigCache.get(url); + if (conf == null) { + conf = HadoopUtil.newHBaseConfiguration(url); + ConfigCache.put(url, conf); + } + + Connection connection = ConnPool.get(url); + try { + // I don't use DCL since recreate a connection is not a big issue. + if (connection == null) { + connection = ConnectionFactory.createConnection(conf); + ConnPool.put(url, connection); + } + } catch (Throwable t) { + throw new StorageException("Error when open connection " + url, t); + } + + return connection; + } + + public static void createHTableIfNeeded(String hbaseUrl, String tableName, String... families) throws IOException { + createHTableIfNeeded(HBaseConnection.get(hbaseUrl), tableName, families); + } + + public static void createHTableIfNeeded(Connection conn, String tableName, String... families) throws IOException { + Admin admin = conn.getAdmin(); + + try { + boolean tableExist = false; + try { + admin.getTableDescriptor(TableName.valueOf(tableName)); + tableExist = true; + } catch (TableNotFoundException e) { + } + + if (tableExist) { + logger.debug("HTable '" + tableName + "' already exists"); + return; + } + + logger.debug("Creating HTable '" + tableName + "'"); + + HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName)); + + if (null != families && families.length > 0) { + for (String family : families) { + HColumnDescriptor fd = new HColumnDescriptor(family); + fd.setInMemory(true); // metadata tables are best in memory + desc.addFamily(fd); + } + } + admin.createTable(desc); + + logger.debug("HTable '" + tableName + "' created"); + } finally { + admin.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java index 1c4a7ba..8360ff1 100644 --- a/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java +++ b/common/src/main/java/org/apache/kylin/common/persistence/HBaseResourceStore.java @@ -1,337 +1,334 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kylin.common.persistence; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.filter.KeyOnlyFilter; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; -import org.apache.kylin.common.util.BytesUtil; -import org.apache.kylin.common.util.HadoopUtil; - -import com.google.common.collect.Lists; - -public class HBaseResourceStore extends ResourceStore { - - private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; - private static final String FAMILY = "f"; - private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); - private static final String COLUMN = "c"; - private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); - private static final String COLUMN_TS = "t"; - private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); - - private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>(); - - static { - TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube"); - TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict"); - TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex"); - TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job"); - TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output"); - TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj"); - TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot"); - TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE - } - - final String tableNameBase; - final String hbaseUrl; - - // final Map<String, String> tableNameMap; // path prefix ==> HBase table name - - private HConnection getConnection() throws IOException { - return HBaseConnection.get(hbaseUrl); - } - - public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { - super(kylinConfig); - - String metadataUrl = kylinConfig.getMetadataUrl(); - // split TABLE@HBASE_URL - int cut = metadataUrl.indexOf('@'); - tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); - hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); - - createHTableIfNeeded(getAllInOneTableName()); - - // tableNameMap = new LinkedHashMap<String, String>(); - // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) { - // String pathPrefix = entry.getKey(); - // String tableName = tableNameBase + entry.getValue(); - // tableNameMap.put(pathPrefix, tableName); - // createHTableIfNeeded(tableName); - // } - - } - - private void createHTableIfNeeded(String tableName) throws IOException { - HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); - } - - private String getAllInOneTableName() { - return tableNameBase; - } - - @Override - protected ArrayList<String> listResourcesImpl(String resPath) throws IOException { - assert resPath.startsWith("/"); - String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; - byte[] startRow = Bytes.toBytes(lookForPrefix); - byte[] endRow = Bytes.toBytes(lookForPrefix); - endRow[endRow.length - 1]++; - - ArrayList<String> result = new ArrayList<String>(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - Scan scan = new Scan(startRow, endRow); - scan.setFilter(new KeyOnlyFilter()); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - String path = Bytes.toString(r.getRow()); - assert path.startsWith(lookForPrefix); - int cut = path.indexOf('/', lookForPrefix.length()); - String child = cut < 0 ? path : path.substring(0, cut); - if (result.contains(child) == false) - result.add(child); - } - } finally { - IOUtils.closeQuietly(table); - } - // return null to indicate not a folder - return result.isEmpty() ? null : result; - } - - @Override - protected boolean existsImpl(String resPath) throws IOException { - Result r = getByScan(resPath, null, null); - return r != null; - } - - @Override - protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { - byte[] startRow = Bytes.toBytes(rangeStart); - byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); - - Scan scan = new Scan(startRow, endRow); - scan.addColumn(B_FAMILY, B_COLUMN_TS); - scan.addColumn(B_FAMILY, B_COLUMN); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - List<RawResource> result = Lists.newArrayList(); - try { - ResultScanner scanner = table.getScanner(scan); - for (Result r : scanner) { - result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); - } - } catch (IOException e) { - for (RawResource rawResource : result) { - IOUtils.closeQuietly(rawResource.resource); - } - throw e; - } finally { - IOUtils.closeQuietly(table); - } - return result; - } - - private InputStream getInputStream(String resPath, Result r) throws IOException { - if (r == null) { - return null; - } - byte[] value = r.getValue(B_FAMILY, B_COLUMN); - if (value.length == 0) { - Path redirectPath = bigCellHDFSPath(resPath); +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.persistence; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.KeyOnlyFilter; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.BytesUtil; +import org.apache.kylin.common.util.HadoopUtil; + +import com.google.common.collect.Lists; + +public class HBaseResourceStore extends ResourceStore { + + private static final String DEFAULT_TABLE_NAME = "kylin_metadata"; + private static final String FAMILY = "f"; + private static final byte[] B_FAMILY = Bytes.toBytes(FAMILY); + private static final String COLUMN = "c"; + private static final byte[] B_COLUMN = Bytes.toBytes(COLUMN); + private static final String COLUMN_TS = "t"; + private static final byte[] B_COLUMN_TS = Bytes.toBytes(COLUMN_TS); + + private static final Map<String, String> TABLE_SUFFIX_MAP = new LinkedHashMap<String, String>(); + + static { + TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube"); + TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict"); + TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex"); + TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job"); + TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output"); + TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj"); + TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot"); + TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE + } + + final String tableNameBase; + final String hbaseUrl; + + // final Map<String, String> tableNameMap; // path prefix ==> HBase table name + + private Connection getConnection() throws IOException { + return HBaseConnection.get(hbaseUrl); + } + + public HBaseResourceStore(KylinConfig kylinConfig) throws IOException { + super(kylinConfig); + + String metadataUrl = kylinConfig.getMetadataUrl(); + // split TABLE@HBASE_URL + int cut = metadataUrl.indexOf('@'); + tableNameBase = cut < 0 ? DEFAULT_TABLE_NAME : metadataUrl.substring(0, cut); + hbaseUrl = cut < 0 ? metadataUrl : metadataUrl.substring(cut + 1); + + createHTableIfNeeded(getAllInOneTableName()); + + // tableNameMap = new LinkedHashMap<String, String>(); + // for (Entry<String, String> entry : TABLE_SUFFIX_MAP.entrySet()) { + // String pathPrefix = entry.getKey(); + // String tableName = tableNameBase + entry.getValue(); + // tableNameMap.put(pathPrefix, tableName); + // createHTableIfNeeded(tableName); + // } + + } + + private void createHTableIfNeeded(String tableName) throws IOException { + HBaseConnection.createHTableIfNeeded(getConnection(), tableName, FAMILY); + } + + private String getAllInOneTableName() { + return tableNameBase; + } + + @Override + protected ArrayList<String> listResourcesImpl(String resPath) throws IOException { + assert resPath.startsWith("/"); + String lookForPrefix = resPath.endsWith("/") ? resPath : resPath + "/"; + byte[] startRow = Bytes.toBytes(lookForPrefix); + byte[] endRow = Bytes.toBytes(lookForPrefix); + endRow[endRow.length - 1]++; + + ArrayList<String> result = new ArrayList<String>(); + + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + Scan scan = new Scan(startRow, endRow); + scan.setFilter(new KeyOnlyFilter()); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + String path = Bytes.toString(r.getRow()); + assert path.startsWith(lookForPrefix); + int cut = path.indexOf('/', lookForPrefix.length()); + String child = cut < 0 ? path : path.substring(0, cut); + if (result.contains(child) == false) + result.add(child); + } + } finally { + IOUtils.closeQuietly(table); + } + // return null to indicate not a folder + return result.isEmpty() ? null : result; + } + + @Override + protected boolean existsImpl(String resPath) throws IOException { + Result r = getByScan(resPath, null, null); + return r != null; + } + + @Override + protected List<RawResource> getAllResources(String rangeStart, String rangeEnd) throws IOException { + byte[] startRow = Bytes.toBytes(rangeStart); + byte[] endRow = plusZero(Bytes.toBytes(rangeEnd)); + + Scan scan = new Scan(startRow, endRow); + scan.addColumn(B_FAMILY, B_COLUMN_TS); + scan.addColumn(B_FAMILY, B_COLUMN); + + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + List<RawResource> result = Lists.newArrayList(); + try { + ResultScanner scanner = table.getScanner(scan); + for (Result r : scanner) { + result.add(new RawResource(getInputStream(Bytes.toString(r.getRow()), r), getTimestamp(r))); + } + } catch (IOException e) { + for (RawResource rawResource : result) { + IOUtils.closeQuietly(rawResource.resource); + } + throw e; + } finally { + IOUtils.closeQuietly(table); + } + return result; + } + + private InputStream getInputStream(String resPath, Result r) throws IOException { + if (r == null) { + return null; + } + byte[] value = r.getValue(B_FAMILY, B_COLUMN); + if (value.length == 0) { + Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - return fileSystem.open(redirectPath); - } else { - return new ByteArrayInputStream(value); - } - } - - private long getTimestamp(Result r) { - if (r == null) { - return 0; - } else { - return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS)); - } - } - - @Override - protected InputStream getResourceImpl(String resPath) throws IOException { - Result r = getByScan(resPath, B_FAMILY, B_COLUMN); - return getInputStream(resPath, r); - } - - @Override - protected long getResourceTimestampImpl(String resPath) throws IOException { - Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS); - return getTimestamp(r); - } - - @Override - protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { - ByteArrayOutputStream bout = new ByteArrayOutputStream(); - IOUtils.copy(content, bout); - bout.close(); - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); - - table.put(put); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - byte[] row = Bytes.toBytes(resPath); - byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); - Put put = buildPut(resPath, newTS, row, content, table); - - boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); - if (!ok) { - long real = getResourceTimestamp(resPath); + FileSystem fileSystem = FileSystem.get(hconf); + + return fileSystem.open(redirectPath); + } else { + return new ByteArrayInputStream(value); + } + } + + private long getTimestamp(Result r) { + if (r == null) { + return 0; + } else { + return Bytes.toLong(r.getValue(B_FAMILY, B_COLUMN_TS)); + } + } + + @Override + protected InputStream getResourceImpl(String resPath) throws IOException { + Result r = getByScan(resPath, B_FAMILY, B_COLUMN); + return getInputStream(resPath, r); + } + + @Override + protected long getResourceTimestampImpl(String resPath) throws IOException { + Result r = getByScan(resPath, B_FAMILY, B_COLUMN_TS); + return getTimestamp(r); + } + + @Override + protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException { + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + IOUtils.copy(content, bout); + bout.close(); + + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + try { + byte[] row = Bytes.toBytes(resPath); + Put put = buildPut(resPath, ts, row, bout.toByteArray(), table); + + table.put(put); + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldTS, long newTS) throws IOException, IllegalStateException { + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + try { + byte[] row = Bytes.toBytes(resPath); + byte[] bOldTS = oldTS == 0 ? null : Bytes.toBytes(oldTS); + Put put = buildPut(resPath, newTS, row, content, table); + + boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put); + if (!ok) { + long real = getResourceTimestamp(resPath); throw new IllegalStateException("Overwriting conflict " + resPath + ", expect old TS " + real + ", but it is " + oldTS); - } - - table.flushCommits(); - - return newTS; - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected void deleteResourceImpl(String resPath) throws IOException { - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - Delete del = new Delete(Bytes.toBytes(resPath)); - table.delete(del); - table.flushCommits(); - } finally { - IOUtils.closeQuietly(table); - } - } - - @Override - protected String getReadableResourcePathImpl(String resPath) { - return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); - } - - private Result getByScan(String path, byte[] family, byte[] column) throws IOException { - byte[] startRow = Bytes.toBytes(path); - byte[] endRow = plusZero(startRow); - - Scan scan = new Scan(startRow, endRow); - if (family == null || column == null) { - scan.setFilter(new KeyOnlyFilter()); - } else { - scan.addColumn(family, column); - } - - HTableInterface table = getConnection().getTable(getAllInOneTableName()); - try { - ResultScanner scanner = table.getScanner(scan); - Result result = null; - for (Result r : scanner) { - result = r; - } - return result == null || result.isEmpty() ? null : result; - } finally { - IOUtils.closeQuietly(table); - } - } - - private byte[] plusZero(byte[] startRow) { - byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); - endRow[endRow.length - 1] = 0; - return endRow; - } - - private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException { - Path redirectPath = bigCellHDFSPath(resPath); + } + + return newTS; + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected void deleteResourceImpl(String resPath) throws IOException { + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + try { + Delete del = new Delete(Bytes.toBytes(resPath)); + table.delete(del); + } finally { + IOUtils.closeQuietly(table); + } + } + + @Override + protected String getReadableResourcePathImpl(String resPath) { + return getAllInOneTableName() + "(key='" + resPath + "')@" + kylinConfig.getMetadataUrl(); + } + + private Result getByScan(String path, byte[] family, byte[] column) throws IOException { + byte[] startRow = Bytes.toBytes(path); + byte[] endRow = plusZero(startRow); + + Scan scan = new Scan(startRow, endRow); + if (family == null || column == null) { + scan.setFilter(new KeyOnlyFilter()); + } else { + scan.addColumn(family, column); + } + + Table table = getConnection().getTable(TableName.valueOf(getAllInOneTableName())); + try { + ResultScanner scanner = table.getScanner(scan); + Result result = null; + for (Result r : scanner) { + result = r; + } + return result == null || result.isEmpty() ? null : result; + } finally { + IOUtils.closeQuietly(table); + } + } + + private byte[] plusZero(byte[] startRow) { + byte[] endRow = Arrays.copyOf(startRow, startRow.length + 1); + endRow[endRow.length - 1] = 0; + return endRow; + } + + private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, Table table) throws IOException { + Path redirectPath = bigCellHDFSPath(resPath); Configuration hconf = HadoopUtil.getCurrentHBaseConfiguration(); - FileSystem fileSystem = FileSystem.get(hconf); - - if (fileSystem.exists(redirectPath)) { - fileSystem.delete(redirectPath, true); - } - - FSDataOutputStream out = fileSystem.create(redirectPath); - - try { - out.write(largeColumn); - } finally { - IOUtils.closeQuietly(out); - } - - return redirectPath; - } - - public Path bigCellHDFSPath(String resPath) { - String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory(); - Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath); - return redirectPath; - } - - private Put buildPut(String resPath, long ts, byte[] row, byte[] content, HTableInterface table) throws IOException { - int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); - if (content.length > kvSizeLimit) { - writeLargeCellToHdfs(resPath, content, table); - content = BytesUtil.EMPTY_BYTE_ARRAY; - } - - Put put = new Put(row); - put.add(B_FAMILY, B_COLUMN, content); - put.add(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); - - return put; - } -} + FileSystem fileSystem = FileSystem.get(hconf); + + if (fileSystem.exists(redirectPath)) { + fileSystem.delete(redirectPath, true); + } + + FSDataOutputStream out = fileSystem.create(redirectPath); + + try { + out.write(largeColumn); + } finally { + IOUtils.closeQuietly(out); + } + + return redirectPath; + } + + public Path bigCellHDFSPath(String resPath) { + String hdfsWorkingDirectory = this.kylinConfig.getHdfsWorkingDirectory(); + Path redirectPath = new Path(hdfsWorkingDirectory, "resources" + resPath); + return redirectPath; + } + + private Put buildPut(String resPath, long ts, byte[] row, byte[] content, Table table) throws IOException { + int kvSizeLimit = this.kylinConfig.getHBaseKeyValueSize(); + if (content.length > kvSizeLimit) { + writeLargeCellToHdfs(resPath, content, table); + content = BytesUtil.EMPTY_BYTE_ARRAY; + } + + Put put = new Put(row); + put.addColumn(B_FAMILY, B_COLUMN, content); + put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(ts)); + + return put; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java index 093ac9e..ccbb6f0 100644 --- a/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java +++ b/common/src/main/java/org/apache/kylin/common/util/HBaseRegionSizeCalculator.java @@ -23,19 +23,24 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLoad; import org.apache.hadoop.hbase.ServerLoad; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,30 +58,31 @@ public class HBaseRegionSizeCalculator { /** * Computes size of each region for table and given column families. * */ - public HBaseRegionSizeCalculator(HTable table) throws IOException { - this(table, new HBaseAdmin(table.getConfiguration())); - } - - /** Constructor for unit testing */ - HBaseRegionSizeCalculator(HTable table, HBaseAdmin hBaseAdmin) throws IOException { - + public HBaseRegionSizeCalculator(String tableName , Connection hbaseConnection) throws IOException { + Table table = null; + Admin admin = null; + try { + table = hbaseConnection.getTable(TableName.valueOf(tableName)); + admin = hbaseConnection.getAdmin(); + if (!enabled(table.getConfiguration())) { logger.info("Region size calculation disabled."); return; } - logger.info("Calculating region sizes for table \"" + new String(table.getTableName()) + "\"."); + logger.info("Calculating region sizes for table \"" + table.getName() + "\"."); // Get regions for table. - Set<HRegionInfo> tableRegionInfos = table.getRegionLocations().keySet(); + RegionLocator regionLocator = hbaseConnection.getRegionLocator(table.getName()); + List<HRegionLocation> regionLocationList = regionLocator.getAllRegionLocations(); Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); - for (HRegionInfo regionInfo : tableRegionInfos) { - tableRegions.add(regionInfo.getRegionName()); + for (HRegionLocation hRegionLocation : regionLocationList) { + tableRegions.add(hRegionLocation.getRegionInfo().getRegionName()); } - ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus(); + ClusterStatus clusterStatus = admin.getClusterStatus(); Collection<ServerName> servers = clusterStatus.getServers(); final long megaByte = 1024L * 1024L; @@ -99,7 +105,8 @@ public class HBaseRegionSizeCalculator { } } } finally { - hBaseAdmin.close(); + IOUtils.closeQuietly(table); + IOUtils.closeQuietly(admin); } } @@ -124,4 +131,4 @@ public class HBaseRegionSizeCalculator { public Map<byte[], Long> getRegionSizeMap() { return Collections.unmodifiableMap(sizeMap); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java index 6d2762c..481fc6c 100644 --- a/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java +++ b/common/src/test/java/org/apache/kylin/common/util/BasicHadoopTest.java @@ -21,12 +21,11 @@ package org.apache.kylin.common.util; import java.io.File; import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.kylin.common.persistence.HBaseConnection; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -56,16 +55,14 @@ public class BasicHadoopTest { cf.setBlocksize(4 * 1024 * 1024); // set to 4MB tableDesc.addFamily(cf); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get().getAdmin(); admin.createTable(tableDesc); admin.close(); } @Test public void testRetriveHtableHost() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(); for (HTableDescriptor table : tableDescriptors) { String value = table.getValue("KYLIN_HOST"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java index f2f1fc0..8c61a3a 100644 --- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java +++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java @@ -24,14 +24,13 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.cmd.ShellCmdOutput; import org.apache.kylin.job.exception.ExecuteException; @@ -99,19 +98,18 @@ public class GarbageCollectionStep extends AbstractExecutable { List<String> oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin admin = null; + Admin admin = null; try { - admin = new HBaseAdmin(conf); + admin = HBaseConnection.get().getAdmin(); for (String table : oldTables) { - if (admin.tableExists(table)) { - HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); + if (admin.tableExists(TableName.valueOf(table))) { + HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(table)); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { - if (admin.isTableEnabled(table)) { - admin.disableTable(table); + if (admin.isTableEnabled(TableName.valueOf(table))) { + admin.disableTable(TableName.valueOf(table)); } - admin.deleteTable(table); + admin.deleteTable(TableName.valueOf(table)); logger.debug("Dropped HBase table " + table); output.append("Dropped HBase table " + table + " \n"); } else { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java index 3c1e4a5..6f36eff 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/CubeHFileJob.java @@ -19,11 +19,15 @@ package org.apache.kylin.job.hadoop.cube; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.mapreduce.Job; @@ -31,6 +35,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.job.constant.BatchConstants; @@ -47,6 +52,8 @@ public class CubeHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; + Table table = null; try { options.addOption(OPTION_JOB_NAME); @@ -80,10 +87,12 @@ public class CubeHFileJob extends AbstractHadoopJob { attachKylinPropsAndMetadata(cube, job.getConfiguration()); String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); - HTable htable = new HTable(conf, tableName); + connection = HBaseConnection.get(); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); //Automatic config ! - HFileOutputFormat.configureIncrementalLoad(job, htable); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); // set block replication to 3 for hfiles conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3"); @@ -96,6 +105,7 @@ public class CubeHFileJob extends AbstractHadoopJob { printUsage(options); throw e; } finally { + IOUtils.closeQuietly(table); if (job != null) cleanupTempConfFile(job.getConfiguration()); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java index 3b25ee1..184b6cd 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java @@ -18,6 +18,13 @@ package org.apache.kylin.job.hadoop.cube; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; @@ -28,10 +35,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; @@ -50,13 +59,6 @@ import org.apache.kylin.metadata.realization.IRealizationConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - /** * @author ysong1 */ @@ -107,7 +109,7 @@ public class StorageCleanupJob extends AbstractHadoopJob { IIManager iiManager = IIManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); List<String> allTablesNeedToBeDropped = new ArrayList<String>(); @@ -141,9 +143,9 @@ public class StorageCleanupJob extends AbstractHadoopJob { // drop tables for (String htableName : allTablesNeedToBeDropped) { log.info("Deleting HBase table " + htableName); - if (hbaseAdmin.tableExists(htableName)) { - hbaseAdmin.disableTable(htableName); - hbaseAdmin.deleteTable(htableName); + if (hbaseAdmin.tableExists(TableName.valueOf(htableName))) { + hbaseAdmin.disableTable(TableName.valueOf(htableName)); + hbaseAdmin.deleteTable(TableName.valueOf(htableName)); log.info("Deleted HBase table " + htableName); } else { log.info("HBase table" + htableName + " does not exist"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java index 027c0ca..9f5e062 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/hbase/CreateHTableJob.java @@ -25,11 +25,10 @@ import org.apache.commons.cli.Options; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; @@ -42,6 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; @@ -81,7 +81,7 @@ public class CreateHTableJob extends AbstractHadoopJob { tableDesc.setValue(IRealizationConstants.HTableTag, config.getMetadataUrlPrefix()); Configuration conf = HadoopUtil.getCurrentHBaseConfiguration(); - HBaseAdmin admin = new HBaseAdmin(conf); + Admin admin = HBaseConnection.get().getAdmin(); try { if (User.isHBaseSecurityEnabled(conf)) { @@ -139,7 +139,7 @@ public class CreateHTableJob extends AbstractHadoopJob { byte[][] splitKeys = getSplits(conf, partitionFilePath); - if (admin.tableExists(tableName)) { + if (admin.tableExists(TableName.valueOf(tableName))) { // admin.disableTable(tableName); // admin.deleteTable(tableName); throw new RuntimeException("HBase table " + tableName + " exists!"); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java index c032bbc..fa42148 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHFileJob.java @@ -19,17 +19,20 @@ package org.apache.kylin.job.hadoop.invertedindex; import org.apache.commons.cli.Options; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ToolRunner; -import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; @@ -45,6 +48,8 @@ public class IICreateHFileJob extends AbstractHadoopJob { public int run(String[] args) throws Exception { Options options = new Options(); + Connection connection = null; + Table table = null; try { options.addOption(OPTION_JOB_NAME); @@ -69,8 +74,11 @@ public class IICreateHFileJob extends AbstractHadoopJob { job.setMapOutputValueClass(KeyValue.class); String tableName = getOptionValue(OPTION_HTABLE_NAME); - HTable htable = new HTable(HBaseConfiguration.create(getConf()), tableName); - HFileOutputFormat.configureIncrementalLoad(job, htable); + + connection = HBaseConnection.get(); + table = connection.getTable(TableName.valueOf(tableName)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(tableName)); + HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); this.deletePath(job.getConfiguration(), output); @@ -78,6 +86,8 @@ public class IICreateHFileJob extends AbstractHadoopJob { } catch (Exception e) { printUsage(options); throw e; + } finally { + IOUtils.closeQuietly(table); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java index 32d065a..63777ef 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/invertedindex/IICreateHTableJob.java @@ -24,11 +24,12 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.util.ToolRunner; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.invertedindex.IIInstance; @@ -78,10 +79,10 @@ public class IICreateHTableJob extends AbstractHadoopJob { DeployCoprocessorCLI.deployCoprocessor(tableDesc); // drop the table first - HBaseAdmin admin = new HBaseAdmin(conf); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); + Admin admin = HBaseConnection.get().getAdmin(); + if (admin.tableExists(TableName.valueOf(tableName))) { + admin.disableTable(TableName.valueOf(tableName)); + admin.deleteTable(TableName.valueOf(tableName)); } // create table http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java index b6e5af5..7fc1d72 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java @@ -21,11 +21,10 @@ package org.apache.kylin.job.tools; import java.io.IOException; import org.apache.commons.cli.Options; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.util.ToolRunner; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +53,7 @@ public class CleanHtableCLI extends AbstractHadoopJob { } private void clean() throws IOException { - Configuration conf = HBaseConfiguration.create(); - HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); + Admin hbaseAdmin = HBaseConnection.get().getAdmin(); for (HTableDescriptor descriptor : hbaseAdmin.listTables()) { String name = descriptor.getNameAsString().toLowerCase(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/5063c386/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java index b07d6a9..503f07e 100644 --- a/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java +++ b/job/src/main/java/org/apache/kylin/job/tools/CubeMigrationCLI.java @@ -24,14 +24,15 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Admin; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.HBaseConnection; import org.apache.kylin.common.persistence.JsonSerializer; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.Serializer; @@ -73,7 +74,7 @@ public class CubeMigrationCLI { private static ResourceStore srcStore; private static ResourceStore dstStore; private static FileSystem hdfsFS; - private static HBaseAdmin hbaseAdmin; + private static Admin hbaseAdmin; public static void main(String[] args) throws IOException, InterruptedException { @@ -113,8 +114,7 @@ public class CubeMigrationCLI { checkAndGetHbaseUrl(); - Configuration conf = HBaseConfiguration.create(); - hbaseAdmin = new HBaseAdmin(conf); + hbaseAdmin = HBaseConnection.get().getAdmin(); hdfsFS = FileSystem.get(new Configuration()); @@ -130,6 +130,8 @@ public class CubeMigrationCLI { } else { showOpts(); } + + IOUtils.closeQuietly(hbaseAdmin); } public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String overwriteIfExists, String realExecute) throws IOException, InterruptedException { @@ -284,10 +286,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); logger.info("CHANGE_HTABLE_HOST is completed"); break; } @@ -401,10 +403,10 @@ public class CubeMigrationCLI { case CHANGE_HTABLE_HOST: { String tableName = (String) opt.params[0]; HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName)); - hbaseAdmin.disableTable(tableName); + hbaseAdmin.disableTable(TableName.valueOf(tableName)); desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix()); - hbaseAdmin.modifyTable(tableName, desc); - hbaseAdmin.enableTable(tableName); + hbaseAdmin.modifyTable(TableName.valueOf(tableName), desc); + hbaseAdmin.enableTable(TableName.valueOf(tableName)); break; } case COPY_FILE_IN_META: {
