IMPALA-7061: Rework HBase splitting and assignment Some frontend PlannerTests rely on HBase tables being arranged in a deterministic way. Specifically, the HBase tables need to be split with specific region boundaries and those regions need to be assigned to specific HBase region servers.
Currently, the tables are created without splits and testdata/bin/split-hbase.sh runs Java code in HBaseTestDataRegionAssignment to split and assign the tables. This runs during dataload via testdata/bin/create-load-data.sh and during tests with bin/run-all-tests.sh. There are problems with both parts of this process. The table splitting is flaky. Since significant time can pass between the assignments and the tests, rebalancing means the assignments are not always stable. This changes the process so that the HBase tables are created with the splits already specified via the HBase shell. The splits remain stable over time. PlannerTestBase runs the assignment code in HBaseTestDataRegionAssignment at the start of the PlannerTests. This makes the assignments deterministic. No other tests depends on the exact assignments, so this does not regress anything. Testing: - Local testing - Ran gerrit-verify-dryrun-external - Verified minicluster profile 2 compiles Change-Id: I3d639128a856254a6ccb93d6750f531974b5f897 Reviewed-on: http://gerrit.cloudera.org:8080/10447 Reviewed-by: Philip Zeyliger <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9a541057 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9a541057 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9a541057 Branch: refs/heads/master Commit: 9a5410570e25431813b96e00f7b91db44f672f38 Parents: 5f20296 Author: Joe McDonnell <[email protected]> Authored: Thu May 17 15:50:32 2018 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri May 25 00:28:18 2018 +0000 ---------------------------------------------------------------------- bin/run-all-tests.sh | 8 - .../HBaseTestDataRegionAssignment.java | 139 ++++++++ .../HBaseTestDataRegionAssignment.java | 164 +++++++++ .../apache/impala/planner/PlannerTestBase.java | 7 + testdata/bin/create-load-data.sh | 9 - testdata/bin/generate-schema-statements.py | 25 +- testdata/bin/split-hbase.sh | 43 --- .../functional/functional_schema_template.sql | 4 + .../HBaseTestDataRegionAssigment.java | 320 ------------------ .../HBaseTestDataRegionAssigment.java | 338 ------------------- 10 files changed, 329 insertions(+), 728 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/bin/run-all-tests.sh ---------------------------------------------------------------------- diff --git a/bin/run-all-tests.sh b/bin/run-all-tests.sh index 4488f2c..08d140a 100755 --- a/bin/run-all-tests.sh +++ b/bin/run-all-tests.sh @@ -140,14 +140,6 @@ LOG_DIR="${IMPALA_EE_TEST_LOGS_DIR}" # Enable core dumps ulimit -c unlimited || true -if [[ "${TARGET_FILESYSTEM}" == "hdfs" ]]; then - # To properly test HBase integeration, HBase regions are split and assigned by this - # script. Restarting HBase will change the region server assignment. Run split-hbase.sh - # before running any test. - run-step "Split and assign HBase regions" split-hbase.log \ - "${IMPALA_HOME}/testdata/bin/split-hbase.sh" -fi - for i in $(seq 1 $NUM_TEST_ITERATIONS) do TEST_RET_CODE=0 http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/fe/src/compat-minicluster-profile-2/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java ---------------------------------------------------------------------- diff --git a/fe/src/compat-minicluster-profile-2/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java b/fe/src/compat-minicluster-profile-2/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java new file mode 100644 index 0000000..f8c1ae9 --- /dev/null +++ b/fe/src/compat-minicluster-profile-2/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.datagenerator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.impala.planner.HBaseScanNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * Deterministically assigns regions to region servers. + */ +public class HBaseTestDataRegionAssignment { + public class TableNotFoundException extends Exception { + public TableNotFoundException(String s) { + super(s); + } + } + + private final static Logger LOG = LoggerFactory.getLogger( + HBaseTestDataRegionAssignment.class); + private final Configuration conf; + private final HBaseAdmin hbaseAdmin; + private final List<ServerName> sortedRS; // sorted list of region server name + private final String[] splitPoints = { "1", "3", "5", "7", "9"}; + + public HBaseTestDataRegionAssignment() throws IOException { + conf = new Configuration(); + hbaseAdmin = new HBaseAdmin(conf); + ClusterStatus clusterStatus = hbaseAdmin.getClusterStatus(); + Collection<ServerName> regionServerNames = clusterStatus.getServers(); + sortedRS = new ArrayList<ServerName>(regionServerNames); + Collections.sort(sortedRS); + } + + public void close() throws IOException { + hbaseAdmin.close(); + } + + /** + * The table comes in already split into regions specified by splitPoints and with data + * already loaded. Pair up adjacent regions and assign to the same server. + * Each region pair in ([unbound:1,1:3], [3:5,5:7], [7:9,9:unbound]) + * will be on the same server. + */ + public void performAssignment(String tableName) throws IOException, + InterruptedException, TableNotFoundException { + HTableDescriptor[] desc = hbaseAdmin.listTables(tableName); + if (desc == null || desc.length == 0) { + throw new TableNotFoundException("Table " + tableName + " not found."); + } + + // Sort the region by start key + List<HRegionInfo> regions = hbaseAdmin.getTableRegions(tableName.getBytes()); + Preconditions.checkArgument(regions.size() == splitPoints.length + 1); + Collections.sort(regions); + + // Pair up two adjacent regions to the same region server. That is, + // region server 1 <- regions (unbound:1), (1:3) + // region server 2 <- regions (3:5), (5:7) + // region server 3 <- regions (7:9), (9:unbound) + NavigableMap<HRegionInfo, ServerName> expectedLocs = Maps.newTreeMap(); + for (int i = 0; i < regions.size(); ++i) { + HRegionInfo regionInfo = regions.get(i); + int rsIdx = (i / 2) % sortedRS.size(); + ServerName regionServerName = sortedRS.get(rsIdx); + hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), + regionServerName.getServerName().getBytes()); + expectedLocs.put(regionInfo, regionServerName); + } + + // hbaseAdmin.move() is an asynchronous operation. HBase tests use sleep to wait for + // the move to complete. It should be done in 10sec. + int sleepCnt = 0; + HTable hbaseTable = new HTable(conf, tableName); + try { + while(!expectedLocs.equals(hbaseTable.getRegionLocations()) && + sleepCnt < 100) { + Thread.sleep(100); + ++sleepCnt; + } + NavigableMap<HRegionInfo, ServerName> actualLocs = hbaseTable.getRegionLocations(); + Preconditions.checkArgument(expectedLocs.equals(actualLocs)); + + // Log the actual region location map + for (Map.Entry<HRegionInfo, ServerName> entry: actualLocs.entrySet()) { + LOG.info(HBaseScanNode.printKey(entry.getKey().getStartKey()) + " -> " + + entry.getValue().getHostAndPort()); + } + + // Force a major compaction such that the HBase table is backed by deterministic + // physical artifacts (files, WAL, etc.). Our #rows estimate relies on the sizes of + // these physical artifacts. + LOG.info("Major compacting HBase table: " + tableName); + hbaseAdmin.majorCompact(tableName); + } finally { + IOUtils.closeQuietly(hbaseTable); + } + } +} + http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/fe/src/compat-minicluster-profile-3/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java ---------------------------------------------------------------------- diff --git a/fe/src/compat-minicluster-profile-3/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java b/fe/src/compat-minicluster-profile-3/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java new file mode 100644 index 0000000..85f8510 --- /dev/null +++ b/fe/src/compat-minicluster-profile-3/test/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssignment.java @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.datagenerator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ClusterStatus; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.impala.planner.HBaseScanNode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * Deterministically assign regions to region servers. + */ +public class HBaseTestDataRegionAssignment { + public class TableNotFoundException extends Exception { + public TableNotFoundException(String s) { + super(s); + } + } + + private final static Logger LOG = LoggerFactory.getLogger( + HBaseTestDataRegionAssignment.class); + private final Configuration conf; + private Connection connection = null; + private final Admin admin; + private final List<ServerName> sortedRS; // sorted list of region server name + private final String[] splitPoints = { "1", "3", "5", "7", "9"}; + + private final static int REGION_MOVE_TIMEOUT_MILLIS = 60000; + + public HBaseTestDataRegionAssignment() throws IOException { + conf = new Configuration(); + connection = ConnectionFactory.createConnection(conf); + admin = connection.getAdmin(); + ClusterStatus clusterStatus = admin.getClusterStatus(); + List<ServerName> regionServerNames = + new ArrayList<ServerName>(clusterStatus.getServers()); + ServerName master = clusterStatus.getMaster(); + regionServerNames.remove(master); + sortedRS = new ArrayList<ServerName>(regionServerNames); + Collections.sort(sortedRS); + } + + public void close() throws IOException { + admin.close(); + } + + /** + * The table comes in already split into regions specified by splitPoints and with data + * already loaded. Pair up adjacent regions and assign to the same server. + * Each region pair in ([unbound:1,1:3], [3:5,5:7], [7:9,9:unbound]) + * will be on the same server. + */ + public void performAssignment(String tableName) throws IOException, + InterruptedException, TableNotFoundException { + TableName table = TableName.valueOf(tableName); + if (!admin.tableExists(table)) { + throw new TableNotFoundException("Table " + tableName + " not found."); + } + + // Sort the region by start key + List<RegionInfo> regions = admin.getRegions(table); + Preconditions.checkArgument(regions.size() == splitPoints.length + 1); + Collections.sort(regions, RegionInfo.COMPARATOR); + // Pair up two adjacent regions to the same region server. That is, + // region server 1 <- regions (unbound:1), (1:3) + // region server 2 <- regions (3:5), (5:7) + // region server 3 <- regions (7:9), (9:unbound) + HashMap<String, ServerName> expectedLocs = Maps.newHashMap(); + for (int i = 0; i < regions.size(); ++i) { + RegionInfo regionInfo = regions.get(i); + int rsIdx = (i / 2) % sortedRS.size(); + ServerName regionServerName = sortedRS.get(rsIdx); + LOG.info("Moving " + regionInfo.getRegionNameAsString() + + " to " + regionServerName.getAddress()); + admin.move(regionInfo.getEncodedNameAsBytes(), + regionServerName.getServerName().getBytes()); + expectedLocs.put(regionInfo.getRegionNameAsString(), regionServerName); + } + + // admin.move() is an asynchronous operation. Wait for the move to complete. + // It should be done in 60 sec. + long start = System.currentTimeMillis(); + long timeout = System.currentTimeMillis() + REGION_MOVE_TIMEOUT_MILLIS; + while (true) { + int matched = 0; + List<Pair<RegionInfo, ServerName>> pairs = + MetaTableAccessor.getTableRegionsAndLocations(connection, table); + Preconditions.checkState(pairs.size() == regions.size()); + for (Pair<RegionInfo, ServerName> pair: pairs) { + RegionInfo regionInfo = pair.getFirst(); + String regionName = regionInfo.getRegionNameAsString(); + ServerName serverName = pair.getSecond(); + Preconditions.checkNotNull(expectedLocs.get(regionName)); + LOG.info(regionName + " " + HBaseScanNode.printKey(regionInfo.getStartKey()) + + " -> " + serverName.getAddress().toString() + ", expecting " + + expectedLocs.get(regionName)); + if (expectedLocs.get(regionName).equals(serverName)) { + ++matched; + continue; + } + } + if (matched == regions.size()) { + long elapsed = System.currentTimeMillis() - start; + LOG.info("Regions moved after " + elapsed + " millis."); + break; + } + if (System.currentTimeMillis() < timeout) { + Thread.sleep(100); + continue; + } + throw new IllegalStateException( + String.format("Failed to assign regions to servers after " + + REGION_MOVE_TIMEOUT_MILLIS + " millis.")); + } + + // Force a major compaction such that the HBase table is backed by deterministic + // physical artifacts (files, WAL, etc.). Our #rows estimate relies on the sizes of + // these physical artifacts. + LOG.info("Major compacting HBase table: " + tableName); + admin.majorCompact(table); + } +} + http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java ---------------------------------------------------------------------- diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java index e7e228f..5e6d549 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java @@ -41,6 +41,7 @@ import org.apache.impala.catalog.CatalogException; import org.apache.impala.common.FrontendTestBase; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.RuntimeEnv; +import org.apache.impala.datagenerator.HBaseTestDataRegionAssignment; import org.apache.impala.testutil.TestFileParser; import org.apache.impala.testutil.TestFileParser.Section; import org.apache.impala.testutil.TestFileParser.TestCase; @@ -114,6 +115,12 @@ public class PlannerTestBase extends FrontendTestBase { String logDir = System.getenv("IMPALA_FE_TEST_LOGS_DIR"); if (logDir == null) logDir = "/tmp"; outDir_ = Paths.get(logDir, "PlannerTest"); + + // Rebalance the HBase tables + HBaseTestDataRegionAssignment assignment = new HBaseTestDataRegionAssignment(); + assignment.performAssignment("functional_hbase.alltypessmall"); + assignment.performAssignment("functional_hbase.alltypesagg"); + assignment.close(); } @Before http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/bin/create-load-data.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh index 88b3cb5..2a246e2 100755 --- a/testdata/bin/create-load-data.sh +++ b/testdata/bin/create-load-data.sh @@ -589,15 +589,6 @@ if [ "${TARGET_FILESYSTEM}" = "hdfs" ]; then run-step "Loading external data sources" load-ext-data-source.log \ copy-and-load-ext-data-source - # HBase splitting is only relevant for FE tests - if [[ -z "$REMOTE_LOAD" ]]; then - IGNORE_MSG="Ignoring this HBase splitting failure to allow dataload to complete and - tests to run. This failure will cause some frontend tests to fail, and it may - impact some HBase tests. Other tests are unaffected." - run-step "Splitting HBase" create-hbase.log ${IMPALA_HOME}/testdata/bin/split-hbase.sh \ - || echo ${IGNORE_MSG} - fi - run-step "Creating internal HBase table" create-internal-hbase-table.log \ create-internal-hbase-table http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/bin/generate-schema-statements.py ---------------------------------------------------------------------- diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index e039c48..2189eff 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -347,8 +347,8 @@ def build_hbase_create_stmt_in_hive(columns, partition_columns, table_name): # PARTITIONED BY is not supported and does not make sense for HBase. if partition_columns: columns.extend(partition_columns.split('\n')) - # stringid is a special case. It still points to functional_hbase.alltypesagg - if 'stringid' not in table_name: + # stringids is a special case. It still points to functional_hbase.alltypesagg + if 'stringids' not in table_name: tbl_properties = ('TBLPROPERTIES("hbase.table.name" = ' '"{db_name}{db_suffix}.{table_name}")') else: @@ -475,15 +475,19 @@ def build_load_statement(load_template, db_name, db_suffix, table_name): impala_home = base_load_dir) return load_template -def build_hbase_create_stmt(db_name, table_name, column_families): +def build_hbase_create_stmt(db_name, table_name, column_families, region_splits): hbase_table_name = "{db_name}_hbase.{table_name}".format(db_name=db_name, table_name=table_name) - create_stmt = list() - create_stmt.append("disable '%s'" % hbase_table_name) - create_stmt.append("drop '%s'" % hbase_table_name) + create_stmts = list() + create_stmts.append("disable '%s'" % hbase_table_name) + create_stmts.append("drop '%s'" % hbase_table_name) column_families = ','.join(["'{0}'".format(cf) for cf in column_families.splitlines()]) - create_stmt.append("create '%s', %s" % (hbase_table_name, column_families)) - return create_stmt + create_statement = "create '%s', %s" % (hbase_table_name, column_families) + if (region_splits): + create_statement += ", {SPLITS => [" + region_splits.strip() + "]}" + + create_stmts.append(create_statement) + return create_stmts # Does a hdfs directory listing and returns array with all the subdir names. def get_hdfs_subdirs_with_data(path): @@ -691,8 +695,9 @@ def generate_statements(output_name, test_vectors, sections, if file_format == 'hbase': # If the HBASE_COLUMN_FAMILIES section does not exist, default to 'd' column_families = section.get('HBASE_COLUMN_FAMILIES', 'd') + region_splits = section.get('HBASE_REGION_SPLITS', None) hbase_output.create.extend(build_hbase_create_stmt(db_name, table_name, - column_families)) + column_families, region_splits)) hbase_post_load.load.append("flush '%s_hbase.%s'\n" % (db_name, table_name)) # Need to make sure that tables created and/or data loaded in Hive is seen @@ -779,7 +784,7 @@ def parse_schema_template_file(file_name): 'ROW_FORMAT', 'CREATE', 'CREATE_HIVE', 'CREATE_KUDU', 'DEPENDENT_LOAD', 'DEPENDENT_LOAD_KUDU', 'DEPENDENT_LOAD_HIVE', 'LOAD', 'LOAD_LOCAL', 'ALTER', 'HBASE_COLUMN_FAMILIES', - 'TABLE_PROPERTIES'] + 'TABLE_PROPERTIES', 'HBASE_REGION_SPLITS'] return parse_test_file(file_name, VALID_SECTION_NAMES, skip_unknown_sections=False) if __name__ == "__main__": http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/bin/split-hbase.sh ---------------------------------------------------------------------- diff --git a/testdata/bin/split-hbase.sh b/testdata/bin/split-hbase.sh deleted file mode 100755 index 8ede905..0000000 --- a/testdata/bin/split-hbase.sh +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -set -euo pipefail -trap 'echo Error in $0 at line $LINENO: $(cd "'$PWD'" && awk "NR == $LINENO" $0)' ERR - -. ${IMPALA_HOME}/bin/impala-config.sh - -if ${CLUSTER_DIR}/admin is_kerberized; then - KERB_ARGS="--use_kerberos" -else - KERB_ARGS= -fi - -# Split hbasealltypesagg and hbasealltypessmall and assign their splits -cd $IMPALA_HOME/testdata -${IMPALA_HOME}/bin/mvn-quiet.sh clean -${IMPALA_HOME}/bin/mvn-quiet.sh package -mvn $IMPALA_MAVEN_OPTIONS -q dependency:copy-dependencies - -. ${IMPALA_HOME}/bin/set-classpath.sh -export CLASSPATH=$IMPALA_HOME/testdata/target/impala-testdata-0.1-SNAPSHOT.jar:$CLASSPATH - -: ${JAVA_KERBEROS_MAGIC=} -"$JAVA" ${JAVA_KERBEROS_MAGIC} \ - org.apache.impala.datagenerator.HBaseTestDataRegionAssigment \ - functional_hbase.alltypesagg functional_hbase.alltypessmall http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/datasets/functional/functional_schema_template.sql ---------------------------------------------------------------------- diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index be666ee..4d26ea9 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -140,6 +140,8 @@ string_col string timestamp_col timestamp ---- ROW_FORMAT delimited fields terminated by ',' escaped by '\\' +---- HBASE_REGION_SPLITS +'1','3','5','7','9' ---- ALTER ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION(year=2009, month=1); ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION(year=2009, month=2); @@ -515,6 +517,8 @@ string_col string timestamp_col timestamp ---- ROW_FORMAT delimited fields terminated by ',' escaped by '\\' +---- HBASE_REGION_SPLITS +'1','3','5','7','9' ---- ALTER ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION(year=2010, month=1, day=1); ALTER TABLE {table_name} ADD IF NOT EXISTS PARTITION(year=2010, month=1, day=2); http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/src/compat-minicluster-profile-2/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java ---------------------------------------------------------------------- diff --git a/testdata/src/compat-minicluster-profile-2/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java b/testdata/src/compat-minicluster-profile-2/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java deleted file mode 100644 index a5799a1..0000000 --- a/testdata/src/compat-minicluster-profile-2/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java +++ /dev/null @@ -1,320 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.datagenerator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.commons.io.IOUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.util.Threads; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -/** - * Splits HBase tables into regions and deterministically assigns regions to region - * servers. - */ -class HBaseTestDataRegionAssigment { - public class TableNotFoundException extends Exception { - public TableNotFoundException(String s) { - super(s); - } - } - - private final static Logger LOG = LoggerFactory.getLogger( - HBaseTestDataRegionAssigment.class); - private final Configuration conf; - private final HBaseAdmin hbaseAdmin; - private final List<ServerName> sortedRS; // sorted list of region server name - private final String[] splitPoints = { "1", "3", "5", "7", "9"}; - - // Number of times to retry a series region-split/wait-for-split calls. - private final static int MAX_SPLIT_ATTEMPTS = 10; - - // Maximum time in ms to wait for a region to be split. - private final static int WAIT_FOR_SPLIT_TIMEOUT = 10000; - - public HBaseTestDataRegionAssigment() throws IOException { - conf = new Configuration(); - hbaseAdmin = new HBaseAdmin(conf); - ClusterStatus clusterStatus = hbaseAdmin.getClusterStatus(); - Collection<ServerName> regionServerNames = clusterStatus.getServers(); - sortedRS = new ArrayList<ServerName>(regionServerNames); - Collections.sort(sortedRS); - } - - public void close() throws IOException { - hbaseAdmin.close(); - } - - /** - * Split the table regions according to splitPoints and pair up adjacent regions to the - * same server. Each region pair in ([unbound:1,1:3], [3:5,5:7], [7:9,9:unbound]) - * will be on the same server. - * The table must have data loaded and only a single region. - */ - public void performAssigment(String tableName) throws IOException, - InterruptedException, TableNotFoundException { - HTableDescriptor[] desc = hbaseAdmin.listTables(tableName); - if (desc == null || desc.length == 0) { - throw new TableNotFoundException("Table " + tableName + " not found."); - } - - if (hbaseAdmin.getTableRegions(tableName.getBytes()).size() == 1) { - // Split into regions - // The table has one region only to begin with. The logic of - // blockUntilRegionSplit requires that the input regionName has performed a split. - // If the table has already been split (i.e. regions count > 1), the same split - // call will be a no-op and this will cause blockUntilRegionSplit to break. - for (int i = 0; i < splitPoints.length; ++i) { - hbaseAdmin.majorCompact(tableName); - List<HRegionInfo> regions = hbaseAdmin.getTableRegions(tableName.getBytes()); - HRegionInfo splitRegion = regions.get(regions.size() - 1); - int attempt = 1; - boolean done = false; - while (!done && attempt < MAX_SPLIT_ATTEMPTS) { - // HBase seems to not always properly receive/process this split RPC, - // so we need to retry the split/block several times. - hbaseAdmin.split(splitRegion.getRegionNameAsString(), splitPoints[i]); - done = blockUntilRegionSplit(conf, WAIT_FOR_SPLIT_TIMEOUT, - splitRegion.getRegionName(), true); - Thread.sleep(100); - ++attempt; - } - if (!done) { - throw new IllegalStateException( - String.format("Failed to split region '%s' after %s attempts.", - splitRegion.getRegionNameAsString(), WAIT_FOR_SPLIT_TIMEOUT)); - } - LOG.info(String.format("Split region '%s' after %s attempts.", - splitRegion.getRegionNameAsString(), attempt)); - } - } - - // Sort the region by start key - List<HRegionInfo> regions = hbaseAdmin.getTableRegions(tableName.getBytes()); - Preconditions.checkArgument(regions.size() == splitPoints.length + 1); - Collections.sort(regions); - - // Pair up two adjacent regions to the same region server. That is, - // region server 1 <- regions (unbound:1), (1:3) - // region server 2 <- regions (3:5), (5:7) - // region server 3 <- regions (7:9), (9:unbound) - NavigableMap<HRegionInfo, ServerName> expectedLocs = Maps.newTreeMap(); - for (int i = 0; i < regions.size(); ++i) { - HRegionInfo regionInfo = regions.get(i); - int rsIdx = (i / 2) % sortedRS.size(); - ServerName regionServerName = sortedRS.get(rsIdx); - hbaseAdmin.move(regionInfo.getEncodedNameAsBytes(), - regionServerName.getServerName().getBytes()); - expectedLocs.put(regionInfo, regionServerName); - } - - // hbaseAdmin.move() is an asynchronous operation. HBase tests use sleep to wait for - // the move to complete. It should be done in 10sec. - int sleepCnt = 0; - HTable hbaseTable = new HTable(conf, tableName); - try { - while(!expectedLocs.equals(hbaseTable.getRegionLocations()) && - sleepCnt < 100) { - Thread.sleep(100); - ++sleepCnt; - } - NavigableMap<HRegionInfo, ServerName> actualLocs = hbaseTable.getRegionLocations(); - Preconditions.checkArgument(expectedLocs.equals(actualLocs)); - - // Log the actual region location map - for (Map.Entry<HRegionInfo, ServerName> entry: actualLocs.entrySet()) { - LOG.info(printKey(entry.getKey().getStartKey()) + " -> " + - entry.getValue().getHostAndPort()); - } - - // Force a major compaction such that the HBase table is backed by deterministic - // physical artifacts (files, WAL, etc.). Our #rows estimate relies on the sizes of - // these physical artifacts. - LOG.info("Major compacting HBase table: " + tableName); - hbaseAdmin.majorCompact(tableName); - } finally { - IOUtils.closeQuietly(hbaseTable); - } - } - - /** - * Returns non-printable characters in escaped octal, otherwise returns the characters. - */ - public static String printKey(byte[] key) { - StringBuilder result = new StringBuilder(); - for (int i = 0; i < key.length; ++i) { - if (!Character.isISOControl(key[i])) { - result.append((char) key[i]); - } else { - result.append("\\"); - result.append(Integer.toOctalString(key[i])); - } - } - return result.toString(); - } - - /** - * The following static methods blockUntilRegionSplit, getRegionRow, - * blockUntilRegionIsOpened and blockUntilRegionIsInMeta are copied from - * org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction - * to help block until a region split is completed. - * - * The original code was modified to return a true/false in case of success/failure. - * - * Blocks until the region split is complete in META and region server opens the - * daughters - */ - private static boolean blockUntilRegionSplit(Configuration conf, long timeout, - final byte[] regionName, boolean waitForDaughters) - throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - HRegionInfo daughterA = null, daughterB = null; - HTable metaTable = new HTable(conf, TableName.META_TABLE_NAME); - - try { - while (System.currentTimeMillis() - start < timeout) { - Result result = getRegionRow(metaTable, regionName); - if (result == null) { - break; - } - - HRegionInfo region = HRegionInfo.getHRegionInfo(result); - if(region.isSplitParent()) { - PairOfSameType<HRegionInfo> pair = HRegionInfo.getDaughterRegions(result); - daughterA = pair.getFirst(); - daughterB = pair.getSecond(); - break; - } - Threads.sleep(100); - } - if (daughterA == null || daughterB == null) return false; - - //if we are here, this means the region split is complete or timed out - if (waitForDaughters) { - long rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsInMeta(metaTable, rem, daughterA); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsInMeta(metaTable, rem, daughterB); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsOpened(conf, rem, daughterA); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsOpened(conf, rem, daughterB); - } - } finally { - IOUtils.closeQuietly(metaTable); - } - return true; - } - - private static Result getRegionRow(HTable metaTable, byte[] regionName) - throws IOException { - Get get = new Get(regionName); - return metaTable.get(get); - } - - private static void blockUntilRegionIsInMeta(HTable metaTable, long timeout, - HRegionInfo hri) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout) { - Result result = getRegionRow(metaTable, hri.getRegionName()); - if (result != null) { - HRegionInfo info = HRegionInfo.getHRegionInfo(result); - if (info != null && !info.isOffline()) { - break; - } - } - Threads.sleep(10); - } - } - - /** - * Starting with HBase 0.95.2 the Get class' c'tor no longer accepts - * empty key strings leading to the rather undesirable behavior that this method - * is not guaranteed to succeed. This method repeatedly attempts to 'get' the start key - * of the given region from the region server to detect when the region server becomes - * available. However, the first region has an empty array as the start key causing the - * Get c'tor to throw an exception as stated above. The end key cannot be used instead - * because it is an exclusive upper bound. - */ - private static void blockUntilRegionIsOpened(Configuration conf, long timeout, - HRegionInfo hri) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - HTable table = new HTable(conf, hri.getTableName()); - - try { - byte [] row = hri.getStartKey(); - // Check for null/empty row. If we find one, use a key that is likely to - // be in first region. If key '0' happens not to be in the given region - // then an exception will be thrown. - if (row == null || row.length <= 0) row = new byte [] {'0'}; - Get get = new Get(row); - while (System.currentTimeMillis() - start < timeout) { - try { - table.get(get); - break; - } catch(IOException ex) { - //wait some more - } - Threads.sleep(10); - } - } finally { - IOUtils.closeQuietly(table); - } - } - - /** - * args contains a list of hbase table names. This program will split the hbase tables - * into regions and assign each region to a specific region server. - */ - public static void main(String args[]) throws IOException, InterruptedException, - TableNotFoundException { - HBaseTestDataRegionAssigment assignment = new HBaseTestDataRegionAssigment(); - for (String htable: args) { - assignment.performAssigment(htable); - } - assignment.close(); - // Exit forcefully because of HDFS-6057. Otherwise, there the JVM won't exit due to a - // non-daemon thread still being up. - System.exit(0); - } -} - http://git-wip-us.apache.org/repos/asf/impala/blob/9a541057/testdata/src/compat-minicluster-profile-3/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java ---------------------------------------------------------------------- diff --git a/testdata/src/compat-minicluster-profile-3/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java b/testdata/src/compat-minicluster-profile-3/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java deleted file mode 100644 index 0f6bbba..0000000 --- a/testdata/src/compat-minicluster-profile-3/java/org/apache/impala/datagenerator/HBaseTestDataRegionAssigment.java +++ /dev/null @@ -1,338 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.datagenerator; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.hadoop.hbase.util.Threads; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -/** - * Splits HBase tables into regions and deterministically assigns regions to region - * servers. - */ -class HBaseTestDataRegionAssigment { - public class TableNotFoundException extends Exception { - public TableNotFoundException(String s) { - super(s); - } - } - - private final static Logger LOG = LoggerFactory.getLogger( - HBaseTestDataRegionAssigment.class); - private final Configuration conf; - private Connection connection = null; - private final Admin admin; - private final List<ServerName> sortedRS; // sorted list of region server name - private final String[] splitPoints = { "1", "3", "5", "7", "9"}; - - // Number of times to retry a series region-split/wait-for-split calls. - private final static int MAX_SPLIT_ATTEMPTS = 10; - - // Maximum time in ms to wait for a region to be split. - private final static int WAIT_FOR_SPLIT_TIMEOUT = 10000; - - private final static int REGION_MOVE_TIMEOUT_MILLIS = 60000; - - public HBaseTestDataRegionAssigment() throws IOException { - conf = new Configuration(); - connection = ConnectionFactory.createConnection(conf); - admin = connection.getAdmin(); - ClusterStatus clusterStatus = admin.getClusterStatus(); - List<ServerName> regionServerNames = - new ArrayList<ServerName>(clusterStatus.getServers()); - ServerName master = clusterStatus.getMaster(); - regionServerNames.remove(master); - sortedRS = new ArrayList<ServerName>(regionServerNames); - Collections.sort(sortedRS); - } - - public void close() throws IOException { - admin.close(); - } - - /** - * Split the table regions according to splitPoints and pair up adjacent regions to the - * same server. Each region pair in ([unbound:1,1:3], [3:5,5:7], [7:9,9:unbound]) - * will be on the same server. - * The table must have data loaded. We attempt to split even already partially split - * tables in order to facilitate recovery from partial transient failures. - */ - public void performAssigment(String tableName) throws IOException, - InterruptedException, TableNotFoundException { - TableName table = TableName.valueOf(tableName); - if (!admin.tableExists(table)) { - throw new TableNotFoundException("Table " + tableName + " not found."); - } - if (admin.getRegions(table).size() <= splitPoints.length) { - // Split into regions - // The table has one region only to begin with. The logic of - // blockUntilRegionSplit requires that the input regionName has performed a split. - // If the table has already been split (i.e. regions count > 1), the same split - // call will be a no-op and this will cause blockUntilRegionSplit to break. In - // that case, swallow the resulting exception. Other exceptions will be re-thrown. - for (int i = 0; i < splitPoints.length; ++i) { - admin.majorCompact(table); - List<RegionInfo> regions = admin.getRegions(table); - RegionInfo splitRegion = regions.get(regions.size() - 1); - boolean done = false; - int attempt = 0; - for (; !done && attempt < MAX_SPLIT_ATTEMPTS; ++attempt) { - // HBase seems to not always properly receive/process this split RPC, - // so we need to retry the split/block several times. - try { - admin.split(splitRegion.getTable(), Bytes.toBytes(splitPoints[i])); - done = blockUntilRegionSplit(conf, WAIT_FOR_SPLIT_TIMEOUT, - splitRegion.getRegionName(), true); - } catch (IOException ex) { - if (!ex.getMessage().equals( - "should not give a splitkey which equals to startkey!")) { - throw ex; - } - done = true; - } - } - if (!done) { - throw new IllegalStateException( - String.format("Failed to split region '%s' after %s attempts (%d ms).", - splitRegion.getRegionNameAsString(), MAX_SPLIT_ATTEMPTS, - MAX_SPLIT_ATTEMPTS * WAIT_FOR_SPLIT_TIMEOUT)); - } - LOG.info(String.format("Split region '%s' after %s attempts.", - splitRegion.getRegionNameAsString(), attempt)); - } - } - - // Sort the region by start key - List<RegionInfo> regions = admin.getRegions(table); - Preconditions.checkArgument(regions.size() == splitPoints.length + 1); - Collections.sort(regions, RegionInfo.COMPARATOR); - // Pair up two adjacent regions to the same region server. That is, - // region server 1 <- regions (unbound:1), (1:3) - // region server 2 <- regions (3:5), (5:7) - // region server 3 <- regions (7:9), (9:unbound) - HashMap<String, ServerName> expectedLocs = Maps.newHashMap(); - for (int i = 0; i < regions.size(); ++i) { - RegionInfo regionInfo = regions.get(i); - int rsIdx = (i / 2) % sortedRS.size(); - ServerName regionServerName = sortedRS.get(rsIdx); - LOG.info("Moving " + regionInfo.getRegionNameAsString() + - " to " + regionServerName.getAddress()); - admin.move(regionInfo.getEncodedNameAsBytes(), - regionServerName.getServerName().getBytes()); - expectedLocs.put(regionInfo.getRegionNameAsString(), regionServerName); - } - - // admin.move() is an asynchronous operation. Wait for the move to complete. - // It should be done in 60 sec. - long start = System.currentTimeMillis(); - long timeout = System.currentTimeMillis() + REGION_MOVE_TIMEOUT_MILLIS; - while (true) { - int matched = 0; - List<Pair<RegionInfo, ServerName>> pairs = - MetaTableAccessor.getTableRegionsAndLocations(connection, table); - Preconditions.checkState(pairs.size() == regions.size()); - for (Pair<RegionInfo, ServerName> pair: pairs) { - RegionInfo regionInfo = pair.getFirst(); - String regionName = regionInfo.getRegionNameAsString(); - ServerName serverName = pair.getSecond(); - Preconditions.checkNotNull(expectedLocs.get(regionName)); - LOG.info(regionName + " " + printKey(regionInfo.getStartKey()) + " -> " + - serverName.getAddress().toString() + ", expecting " + - expectedLocs.get(regionName)); - if (expectedLocs.get(regionName).equals(serverName)) { - ++matched; - continue; - } - } - if (matched == regions.size()) { - long elapsed = System.currentTimeMillis() - start; - LOG.info("Regions moved after " + elapsed + " millis."); - break; - } - if (System.currentTimeMillis() < timeout) { - Thread.sleep(100); - continue; - } - throw new IllegalStateException( - String.format("Failed to assign regions to servers after " + - REGION_MOVE_TIMEOUT_MILLIS + " millis.")); - } - - // Force a major compaction such that the HBase table is backed by deterministic - // physical artifacts (files, WAL, etc.). Our #rows estimate relies on the sizes of - // these physical artifacts. - LOG.info("Major compacting HBase table: " + tableName); - admin.majorCompact(table); - } - - /** - * Returns non-printable characters in escaped octal, otherwise returns the characters. - */ - public static String printKey(byte[] key) { - StringBuilder result = new StringBuilder(); - for (int i = 0; i < key.length; ++i) { - if (!Character.isISOControl(key[i])) { - result.append((char) key[i]); - } else { - result.append("\\"); - result.append(Integer.toOctalString(key[i])); - } - } - return result.toString(); - } - - /** - * The following static methods blockUntilRegionSplit, blockUntilRegionIsOpened - * and blockUntilRegionIsInMeta are copied from - * org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction - * to help block until a region split is completed. - * - * The original code was modified to return a true/false in case of success/failure. - * - * Blocks until the region split is complete in META and region server opens the - * daughters - */ - private static boolean blockUntilRegionSplit(Configuration conf, long timeout, - final byte[] regionName, boolean waitForDaughters) - throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - RegionInfo daughterA = null, daughterB = null; - try (Connection conn = ConnectionFactory.createConnection(conf); - Table metaTable = conn.getTable(TableName.META_TABLE_NAME)) { - Result result = null; - while ((System.currentTimeMillis() - start) < timeout) { - result = metaTable.get(new Get(regionName)); - if (result == null) { - break; - } - RegionInfo region = MetaTableAccessor.getRegionInfo(result); - if (region.isSplitParent()) { - PairOfSameType<RegionInfo> pair = MetaTableAccessor.getDaughterRegions(result); - daughterA = pair.getFirst(); - daughterB = pair.getSecond(); - break; - } - Threads.sleep(100); - } - if (daughterA == null || daughterB == null) return false; - - //if we are here, this means the region split is complete or timed out - if (waitForDaughters) { - long rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsInMeta(conn, rem, daughterA); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsInMeta(conn, rem, daughterB); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsOpened(conf, rem, daughterA); - - rem = timeout - (System.currentTimeMillis() - start); - blockUntilRegionIsOpened(conf, rem, daughterB); - } - } - return true; - } - - private static void blockUntilRegionIsInMeta(Connection conn, long timeout, - RegionInfo hri) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < timeout) { - HRegionLocation loc = MetaTableAccessor.getRegionLocation(conn, hri); - if (loc != null && !loc.getRegion().isOffline()) { - break; - } - Threads.sleep(10); - } - } - - /** - * Starting with HBase 0.95.2 the Get class' c'tor no longer accepts - * empty key strings leading to the rather undesirable behavior that this method - * is not guaranteed to succeed. This method repeatedly attempts to 'get' the start key - * of the given region from the region server to detect when the region server becomes - * available. However, the first region has an empty array as the start key causing the - * Get c'tor to throw an exception as stated above. The end key cannot be used instead - * because it is an exclusive upper bound. - */ - private static void blockUntilRegionIsOpened(Configuration conf, long timeout, - RegionInfo hri) throws IOException, InterruptedException { - long start = System.currentTimeMillis(); - try (Connection conn = ConnectionFactory.createConnection(conf); - Table table = conn.getTable(hri.getTable())) { - byte [] row = hri.getStartKey(); - // Check for null/empty row. If we find one, use a key that is likely to - // be in first region. If key '0' happens not to be in the given region - // then an exception will be thrown. - if (row == null || row.length <= 0) row = new byte [] {'0'}; - Get get = new Get(row); - while (System.currentTimeMillis() - start < timeout) { - try { - table.get(get); - break; - } catch(IOException ex) { - //wait some more - } - Threads.sleep(10); - } - } - } - - /** - * args contains a list of hbase table names. This program will split the hbase tables - * into regions and assign each region to a specific region server. - */ - public static void main(String args[]) throws IOException, InterruptedException, - TableNotFoundException { - HBaseTestDataRegionAssigment assignment = new HBaseTestDataRegionAssigment(); - for (String htable: args) { - assignment.performAssigment(htable); - } - assignment.close(); - // Exit forcefully because of HDFS-6057. Otherwise, there the JVM won't exit due to a - // non-daemon thread still being up. - System.exit(0); - } -} -
