This is an automated email from the ASF dual-hosted git repository. vincentpoon pushed a commit to branch 4.x-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push: new ab254f9 PHOENIX-5094 increment pending disable count for index when rebuild starts ab254f9 is described below commit ab254f9783ca394fc10d07fa2a52dd16d1bff2a1 Author: Kiran Kumar Maturi <maturi.ki...@gmail.com> AuthorDate: Fri Feb 1 19:37:11 2019 +0530 PHOENIX-5094 increment pending disable count for index when rebuild starts --- .../index/IndexRebuildIncrementDisableCountIT.java | 237 +++++++++++++++++++++ .../phoenix/coprocessor/MetaDataEndpointImpl.java | 2 +- .../coprocessor/MetaDataRegionObserver.java | 23 +- .../phoenix/index/PhoenixIndexFailurePolicy.java | 21 +- .../java/org/apache/phoenix/util/IndexUtil.java | 30 +++ 5 files changed, 292 insertions(+), 21 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java new file mode 100644 index 0000000..694f359 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver; +import org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask; +import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class IndexRebuildIncrementDisableCountIT extends BaseUniqueNamesOwnClusterIT { + private static final Log LOG = LogFactory.getLog(IndexRebuildIncrementDisableCountIT.class); + private static long pendingDisableCount = 0; + private static String ORG_PREFIX = "ORG"; + private static Result pendingDisableCountResult = null; + private static String indexState = null; + private static final Random RAND = new Random(5); + private static final int WAIT_AFTER_DISABLED = 5000; + private static final long REBUILD_PERIOD = 50000; + private static final long REBUILD_INTERVAL = 2000; + private static RegionCoprocessorEnvironment indexRebuildTaskRegionEnvironment; + private static String schemaName; + private static String tableName; + private static String fullTableName; + private static String indexName; + private static String fullIndexName; + private static Connection conn; + private static PhoenixConnection phoenixConn; + + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, + Boolean.TRUE.toString()); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, + Long.toString(REBUILD_INTERVAL)); + serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, "50000000"); + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, + Long.toString(REBUILD_PERIOD)); // batch at 50 seconds + serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB, + Long.toString(WAIT_AFTER_DISABLED)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1); + clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2"); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + indexRebuildTaskRegionEnvironment = + (RegionCoprocessorEnvironment) getUtility() + .getRSForFirstRegionInTable( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME) + .get(0).getCoprocessorHost() + .findCoprocessorEnvironment(MetaDataRegionObserver.class.getName()); + MetaDataRegionObserver.initRebuildIndexConnectionProps( + indexRebuildTaskRegionEnvironment.getConfiguration()); + schemaName = generateUniqueName(); + tableName = generateUniqueName(); + fullTableName = SchemaUtil.getTableName(schemaName, tableName); + indexName = generateUniqueName(); + fullIndexName = SchemaUtil.getTableName(schemaName, indexName); + conn = DriverManager.getConnection(getUrl()); + phoenixConn = conn.unwrap(PhoenixConnection.class); + } + + static long getPendingDisableCount(PhoenixConnection conn, String indexTableName) { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); + Get get = new Get(indexTableKey); + get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES); + + try { + pendingDisableCountResult = + conn.getQueryServices() + .getTable(SchemaUtil.getPhysicalTableName( + PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, + conn.getQueryServices().getProps()).getName()) + .get(get); + return Bytes.toLong(pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES)); + } catch (Exception e) { + LOG.error("Exception in getPendingDisableCount: " + e); + return 0; + } + } + + private static void checkIndexPendingDisableCount(final PhoenixConnection conn, + final String indexTableName) throws Exception { + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + while (!TestUtil.checkIndexState(conn, indexTableName, PIndexState.ACTIVE, + 0L)) { + long count = getPendingDisableCount(conn, indexTableName); + if (count > 0) { + indexState = + new String( + pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES, + PhoenixDatabaseMetaData.INDEX_STATE_BYTES)); + pendingDisableCount = count; + } + Thread.sleep(100); + } + } catch (Exception e) { + LOG.error("Error in checkPendingDisableCount : " + e); + } + } + }; + Thread t1 = new Thread(runnable); + t1.start(); + } + + static String getOrgId(long id) { + return ORG_PREFIX + "-" + id; + } + + static String getRandomOrgId(int maxOrgId) { + return getOrgId(Math.round(Math.random() * maxOrgId)); + } + + private static void mutateRandomly(Connection conn, String tableName, int maxOrgId) { + try { + + Statement stmt = conn.createStatement(); + for (int i = 0; i < 10000; i++) { + stmt.executeUpdate( + "UPSERT INTO " + tableName + " VALUES('" + getRandomOrgId(maxOrgId) + "'," + i + + "," + (i + 1) + "," + (i + 2) + ")"); + } + conn.commit(); + } catch (Exception e) { + LOG.error("Client side exception:" + e); + } + } + + private static MutationCode updateIndexState(PhoenixConnection phoenixConn, + String fullIndexName, PIndexState state) throws Throwable { + HTableInterface metaTable = + phoenixConn.getQueryServices() + .getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES); + long ts = EnvironmentEdgeManager.currentTimeMillis(); + return IndexUtil.updateIndexState(fullIndexName, ts, metaTable, state).getMutationCode(); + } + + @Test + public void testIndexStateTransitions() throws Throwable { + // create table and indices + String createTableSql = + "CREATE TABLE " + fullTableName + + "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER, v3 INTEGER)"; + conn.createStatement().execute(createTableSql); + conn.createStatement() + .execute("CREATE INDEX " + indexName + " ON " + fullTableName + "(v1)"); + conn.commit(); + updateIndexState(phoenixConn, fullIndexName, PIndexState.DISABLE); + mutateRandomly(conn, fullTableName, 20); + boolean[] cancel = new boolean[1]; + checkIndexPendingDisableCount(phoenixConn, fullIndexName); + try { + do { + runIndexRebuilder(Collections.<String> singletonList(fullTableName)); + } while (!TestUtil.checkIndexState(conn, fullIndexName, PIndexState.ACTIVE, 0L)); + } finally { + cancel[0] = true; + } + assertTrue("Index state is inactive ", indexState.equals("i")); + assertTrue("pendingDisable count is incremented when index is inactive", + pendingDisableCount == MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT); + assertTrue("pending disable count is 0 when index is active: ", getPendingDisableCount(phoenixConn, fullIndexName) == 0); + } + + @Test + public void checkIndexPendingDisableResetCounter() throws Throwable { + IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT); + updateIndexState(phoenixConn, fullIndexName, PIndexState.PENDING_DISABLE); + assertTrue("Pending disable count should reset when index moves from ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) == 0); + IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT); + updateIndexState(phoenixConn, fullIndexName, PIndexState.INACTIVE); + updateIndexState(phoenixConn, fullIndexName, PIndexState.PENDING_DISABLE); + assertTrue("Pending disable count should reset when index moves from ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) == MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT); + } + + private static void runIndexRebuilder(List<String> tables) + throws InterruptedException, SQLException { + BuildIndexScheduleTask task = + new MetaDataRegionObserver.BuildIndexScheduleTask(indexRebuildTaskRegionEnvironment, + tables); + task.run(); + } + +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 799a41b..4fbeea0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -4211,7 +4211,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements Coprocesso newState = PIndexState.DISABLE; } } - if (newState == PIndexState.PENDING_DISABLE && currentState != PIndexState.PENDING_DISABLE) { + if (newState == PIndexState.PENDING_DISABLE && currentState != PIndexState.PENDING_DISABLE && currentState != PIndexState.INACTIVE) { // reset count for first PENDING_DISABLE newKVs.add(KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, Bytes.toBytes(0L))); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 4045d47..8c58213 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -101,6 +101,9 @@ import com.google.common.collect.Maps; public class MetaDataRegionObserver extends BaseRegionObserver { public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class); public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = "REBUILDINDEX"; + // PHOENIX-5094 To differentiate the increment in PENDING_DISABLE_COUNT made by client or index + // rebuilder, we are using large value for index rebuilder + public static final long PENDING_DISABLE_INACTIVE_STATE_COUNT = 10000L; private static final byte[] SYSTEM_CATALOG_KEY = SchemaUtil.getTableKey( ByteUtil.EMPTY_BYTE_ARRAY, QueryConstants.SYSTEM_SCHEMA_NAME_BYTES, @@ -255,6 +258,19 @@ public class MetaDataRegionObserver extends BaseRegionObserver { this.props = new ReadOnlyProps(env.getConfiguration().iterator()); } + public List<PTable> decrementIndexesPendingDisableCount(PhoenixConnection conn, PTable dataPTable, List<PTable> indexes){ + List<PTable> indexesIncremented = new ArrayList<>(); + for(PTable index :indexes) { + try { + String indexName = index.getName().getString(); + IndexUtil.incrementCounterForIndex(conn, indexName, -PENDING_DISABLE_INACTIVE_STATE_COUNT); + indexesIncremented.add(index); + }catch(Exception e) { + LOG.warn("Decrement of -" + PENDING_DISABLE_INACTIVE_STATE_COUNT +" for index :" + index.getName().getString() + "of table: " + dataPTable.getName().getString(), e); + } + } + return indexesIncremented; + } @Override public void run() { // FIXME: we should replay the data table Put, as doing a partial index build would only add @@ -392,6 +408,10 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE if (indexState == PIndexState.DISABLE) { + if(IndexUtil.getIndexPendingDisableCount(conn, indexTableFullName) < PENDING_DISABLE_INACTIVE_STATE_COUNT){ + // to avoid incrementing again + IndexUtil.incrementCounterForIndex(conn, indexTableFullName, PENDING_DISABLE_INACTIVE_STATE_COUNT); + } IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null); continue; // Must wait until clients start to do index maintenance again } else if (indexState == PIndexState.PENDING_ACTIVE) { @@ -503,7 +523,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { + (scanEndTime == HConstants.LATEST_TIMESTAMP ? "LATEST_TIMESTAMP" : scanEndTime)); MutationState mutationState = plan.execute(); long rowCount = mutationState.getUpdateCount(); - if (scanEndTime == latestUpperBoundTimestamp) { + decrementIndexesPendingDisableCount(conn, dataPTable, indexesToPartiallyRebuild); + if (scanEndTime == latestUpperBoundTimestamp) { LOG.info("Rebuild completed for all inactive/disabled indexes in data table:" + dataPTable.getName()); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 172c2d6..26a1fbe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.index; -import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; - import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.sql.SQLException; @@ -42,7 +40,6 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -515,25 +512,11 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { } private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException { - incrementCounterForIndex(conn, failedIndexTable, 1); + IndexUtil.incrementCounterForIndex(conn, failedIndexTable, 1); } private static void decrementCounterForIndex(PhoenixConnection conn, String failedIndexTable) throws IOException { - incrementCounterForIndex(conn, failedIndexTable, -1); - } - - private static void incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException { - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable); - Increment incr = new Increment(indexTableKey); - incr.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount); - try { - conn.getQueryServices() - .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, - conn.getQueryServices().getProps()).getName()) - .increment(incr); - } catch (SQLException e) { - throw new IOException(e); - } + IndexUtil.incrementCounterForIndex(conn, failedIndexTable, -1); } private static boolean canRetryMore(int numRetry, int maxRetries, long canRetryUntil) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 9010f5f..f1d9ce4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -20,6 +20,7 @@ package org.apache.phoenix.util; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; @@ -46,6 +47,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -843,4 +845,32 @@ public class IndexUtil { return Lists.newArrayList(indexIterator); } + public static Result incrementCounterForIndex(PhoenixConnection conn, String failedIndexTable,long amount) throws IOException { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable); + Increment incr = new Increment(indexTableKey); + incr.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount); + try { + return conn.getQueryServices() + .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, + conn.getQueryServices().getProps()).getName()) + .increment(incr); + } catch (SQLException e) { + throw new IOException(e); + } + } + + public static long getIndexPendingDisableCount(PhoenixConnection conn, String failedIndexTable) throws IOException { + byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(failedIndexTable); + Get get = new Get(indexTableKey); + get.addColumn(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES); + try { + Result result = conn.getQueryServices() + .getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME, + conn.getQueryServices().getProps()).getName()) + .get(get); + return Bytes.toLong(result.getValue(TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES)); + } catch (SQLException e) { + throw new IOException(e); + } + } }