This is an automated email from the ASF dual-hosted git repository.

vincentpoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b5d9d  PHOENIX-5094 increment pending disable count for index when 
rebuild starts
52b5d9d is described below

commit 52b5d9d5f8d1f41218956bb6099a3d7eaf50939a
Author: Kiran Kumar Maturi <maturi.ki...@gmail.com>
AuthorDate: Fri Feb 1 17:17:26 2019 +0530

    PHOENIX-5094 increment pending disable count for index when rebuild starts
---
 .../index/IndexRebuildIncrementDisableCountIT.java | 237 +++++++++++++++++++++
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |   6 +-
 .../coprocessor/MetaDataRegionObserver.java        |  22 ++
 .../phoenix/index/PhoenixIndexFailurePolicy.java   |  21 +-
 .../java/org/apache/phoenix/util/IndexUtil.java    |  32 ++-
 5 files changed, 294 insertions(+), 24 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java
new file mode 100644
index 0000000..084bee2
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexRebuildIncrementDisableCountIT.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import 
org.apache.phoenix.coprocessor.MetaDataRegionObserver.BuildIndexScheduleTask;
+import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class IndexRebuildIncrementDisableCountIT extends 
BaseUniqueNamesOwnClusterIT {
+    private static final Log LOG = 
LogFactory.getLog(IndexRebuildIncrementDisableCountIT.class);
+    private static long pendingDisableCount = 0;
+    private static String ORG_PREFIX = "ORG";
+    private static Result pendingDisableCountResult = null;
+    private static String indexState = null;
+    private static final Random RAND = new Random(5);
+    private static final int WAIT_AFTER_DISABLED = 5000;
+    private static final long REBUILD_PERIOD = 50000;
+    private static final long REBUILD_INTERVAL = 2000;
+    private static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
+    private static String schemaName;
+    private static String tableName;
+    private static String fullTableName;
+    private static String indexName;
+    private static String fullIndexName;
+    private static Connection conn;
+    private static PhoenixConnection phoenixConn;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB,
+            Boolean.TRUE.toString());
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB,
+            Long.toString(REBUILD_INTERVAL));
+        
serverProps.put(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, 
"50000000");
+        serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD,
+            Long.toString(REBUILD_PERIOD)); // batch at 50 seconds
+        
serverProps.put(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME_ATTRIB,
+            Long.toString(WAIT_AFTER_DISABLED));
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+            new ReadOnlyProps(clientProps.entrySet().iterator()));
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        
.getRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+        schemaName = generateUniqueName();
+        tableName = generateUniqueName();
+        fullTableName = SchemaUtil.getTableName(schemaName, tableName);
+        indexName = generateUniqueName();
+        fullIndexName = SchemaUtil.getTableName(schemaName, indexName);
+        conn = DriverManager.getConnection(getUrl());
+        phoenixConn = conn.unwrap(PhoenixConnection.class);
+    }
+
+    static long getPendingDisableCount(PhoenixConnection conn, String 
indexTableName) {
+        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(indexTableName);
+        Get get = new Get(indexTableKey);
+        get.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+        get.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.INDEX_STATE_BYTES);
+
+        try {
+            pendingDisableCountResult =
+                    conn.getQueryServices()
+                            .getTable(SchemaUtil.getPhysicalTableName(
+                                PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                                conn.getQueryServices().getProps()).getName())
+                            .get(get);
+            return 
Bytes.toLong(pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
+                PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
+        } catch (Exception e) {
+            LOG.error("Exception in getPendingDisableCount: " + e);
+            return 0;
+        }
+    }
+
+    private static void checkIndexPendingDisableCount(final PhoenixConnection 
conn,
+            final String indexTableName) throws Exception {
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!TestUtil.checkIndexState(conn, indexTableName, 
PIndexState.ACTIVE,
+                        0L)) {
+                        long count = getPendingDisableCount(conn, 
indexTableName);
+                        if (count > 0) {
+                            indexState =
+                                    new String(
+                                            
pendingDisableCountResult.getValue(TABLE_FAMILY_BYTES,
+                                                
PhoenixDatabaseMetaData.INDEX_STATE_BYTES));
+                            pendingDisableCount = count;
+                        }
+                        Thread.sleep(100);
+                    }
+                } catch (Exception e) {
+                    LOG.error("Error in checkPendingDisableCount : " + e);
+                }
+            }
+        };
+        Thread t1 = new Thread(runnable);
+        t1.start();
+    }
+
+    static String getOrgId(long id) {
+        return ORG_PREFIX + "-" + id;
+    }
+
+    static String getRandomOrgId(int maxOrgId) {
+        return getOrgId(Math.round(Math.random() * maxOrgId));
+    }
+
+    private static void mutateRandomly(Connection conn, String tableName, int 
maxOrgId) {
+        try {
+
+            Statement stmt = conn.createStatement();
+            for (int i = 0; i < 10000; i++) {
+                stmt.executeUpdate(
+                    "UPSERT INTO " + tableName + " VALUES('" + 
getRandomOrgId(maxOrgId) + "'," + i
+                            + "," + (i + 1) + "," + (i + 2) + ")");
+            }
+            conn.commit();
+        } catch (Exception e) {
+            LOG.error("Client side exception:" + e);
+        }
+    }
+
+    private static MutationCode updateIndexState(PhoenixConnection phoenixConn,
+            String fullIndexName, PIndexState state) throws Throwable {
+        Table metaTable =
+                phoenixConn.getQueryServices()
+                        
.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
+        long ts = EnvironmentEdgeManager.currentTimeMillis();
+        return IndexUtil.updateIndexState(fullIndexName, ts, metaTable, 
state).getMutationCode();
+    }
+
+    @Test
+    public void testIndexStateTransitions() throws Throwable {
+        // create table and indices
+        String createTableSql =
+                "CREATE TABLE " + fullTableName
+                        + "(org_id VARCHAR NOT NULL PRIMARY KEY, v1 INTEGER, 
v2 INTEGER, v3 INTEGER)";
+        conn.createStatement().execute(createTableSql);
+        conn.createStatement()
+                .execute("CREATE INDEX " + indexName + " ON " + fullTableName 
+ "(v1)");
+        conn.commit();
+        updateIndexState(phoenixConn, fullIndexName, PIndexState.DISABLE);
+        mutateRandomly(conn, fullTableName, 20);
+        boolean[] cancel = new boolean[1];
+        checkIndexPendingDisableCount(phoenixConn, fullIndexName);
+        try {
+            do {
+                runIndexRebuilder(Collections.<String> 
singletonList(fullTableName));
+            } while (!TestUtil.checkIndexState(conn, fullIndexName, 
PIndexState.ACTIVE, 0L));
+        } finally {
+            cancel[0] = true;
+        }
+        assertTrue("Index state is inactive ", indexState.equals("i"));
+        assertTrue("pendingDisable count is incremented when index is 
inactive",
+            pendingDisableCount == 
MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
+        assertTrue("pending disable count is 0 when index is active: ", 
getPendingDisableCount(phoenixConn, fullIndexName) == 0);
+    }
+    
+    @Test
+    public void checkIndexPendingDisableResetCounter() throws Throwable {
+        IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, 
MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
+        updateIndexState(phoenixConn, fullIndexName, 
PIndexState.PENDING_DISABLE);
+        assertTrue("Pending disable count should reset when index moves from 
ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) 
== 0);
+        IndexUtil.incrementCounterForIndex(phoenixConn, fullIndexName, 
MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
+        updateIndexState(phoenixConn, fullIndexName, PIndexState.INACTIVE);
+        updateIndexState(phoenixConn, fullIndexName, 
PIndexState.PENDING_DISABLE);
+        assertTrue("Pending disable count should reset when index moves from 
ACTIVE to PENDING_DISABLE ", getPendingDisableCount(phoenixConn, fullIndexName) 
== MetaDataRegionObserver.PENDING_DISABLE_INACTIVE_STATE_COUNT);
+    }
+
+    private static void runIndexRebuilder(List<String> tables)
+            throws InterruptedException, SQLException {
+        BuildIndexScheduleTask task =
+                new 
MetaDataRegionObserver.BuildIndexScheduleTask(indexRebuildTaskRegionEnvironment,
+                        tables);
+        task.run();
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 62d8f3a..0b95b26 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -72,9 +72,9 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQU
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_CONSTANT_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
 import static 
org.apache.phoenix.query.QueryConstants.DIVERGED_VIEW_BASE_COLUMN_COUNT;
 import static org.apache.phoenix.schema.PTableType.INDEX;
 import static org.apache.phoenix.schema.PTableType.TABLE;
@@ -101,7 +101,6 @@ import java.util.NavigableMap;
 import java.util.Properties;
 import java.util.Set;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparatorImpl;
@@ -255,6 +254,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
@@ -4231,7 +4231,7 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements RegionCopr
                         newState = PIndexState.DISABLE;
                     }
                 }
-                if (newState == PIndexState.PENDING_DISABLE && currentState != 
PIndexState.PENDING_DISABLE) {
+                if (newState == PIndexState.PENDING_DISABLE && currentState != 
PIndexState.PENDING_DISABLE && currentState != PIndexState.INACTIVE) {
                     // reset count for first PENDING_DISABLE
                     newKVs.add(PhoenixKeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
                         PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, 
timeStamp, Bytes.toBytes(0L)));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
index 956e04b..ad78f7e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java
@@ -103,6 +103,9 @@ import com.google.common.collect.Maps;
 public class MetaDataRegionObserver implements 
RegionObserver,RegionCoprocessor {
     public static final Log LOG = 
LogFactory.getLog(MetaDataRegionObserver.class);
     public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = 
"REBUILDINDEX";
+    // PHOENIX-5094 To differentiate the increment in PENDING_DISABLE_COUNT 
made by client or index
+    // rebuilder, we are using large value for index rebuilder
+    public static final long PENDING_DISABLE_INACTIVE_STATE_COUNT = 10000L;
     private static final byte[] SYSTEM_CATALOG_KEY = SchemaUtil.getTableKey(
             ByteUtil.EMPTY_BYTE_ARRAY,
             QueryConstants.SYSTEM_SCHEMA_NAME_BYTES,
@@ -262,6 +265,20 @@ public class MetaDataRegionObserver implements 
RegionObserver,RegionCoprocessor
             this.props = new ReadOnlyProps(env.getConfiguration().iterator());
         }
 
+        public List<PTable> 
decrementIndexesPendingDisableCount(PhoenixConnection conn, PTable dataPTable, 
List<PTable> indexes){
+            List<PTable> indexesIncremented = new ArrayList<>();
+            for(PTable index :indexes) {
+                try {
+                    String indexName = index.getName().getString();
+                    IndexUtil.incrementCounterForIndex(conn, indexName, 
-PENDING_DISABLE_INACTIVE_STATE_COUNT);
+                    indexesIncremented.add(index);
+                }catch(Exception e) {
+                    LOG.warn("Decrement  of -" + 
PENDING_DISABLE_INACTIVE_STATE_COUNT +" for index :" + 
index.getName().getString() + "of table: " + dataPTable.getName().getString(), 
e);
+                }
+            }
+            return indexesIncremented;
+        }
+
         @Override
         public void run() {
             // FIXME: we should replay the data table Put, as doing a partial 
index build would only add
@@ -399,6 +416,10 @@ public class MetaDataRegionObserver implements 
RegionObserver,RegionCoprocessor
                     // Allow index to begin incremental maintenance as index 
is back online and we
                     // cannot transition directly from DISABLED -> ACTIVE
                     if (indexState == PIndexState.DISABLE) {
+                        if(IndexUtil.getIndexPendingDisableCount(conn, 
indexTableFullName) < PENDING_DISABLE_INACTIVE_STATE_COUNT){
+                            // to avoid incrementing again
+                            IndexUtil.incrementCounterForIndex(conn, 
indexTableFullName, PENDING_DISABLE_INACTIVE_STATE_COUNT);
+                        }
                         IndexUtil.updateIndexState(conn, indexTableFullName, 
PIndexState.INACTIVE, null);
                         continue; // Must wait until clients start to do index 
maintenance again
                     } else if (indexState == PIndexState.PENDING_ACTIVE) {
@@ -510,6 +531,7 @@ public class MetaDataRegionObserver implements 
RegionObserver,RegionCoprocessor
                                     + (scanEndTime == 
HConstants.LATEST_TIMESTAMP ? "LATEST_TIMESTAMP" : scanEndTime));
                                                        MutationState 
mutationState = plan.execute();
                                                        long rowCount = 
mutationState.getUpdateCount();
+                                                       
decrementIndexesPendingDisableCount(conn, dataPTable, 
indexesToPartiallyRebuild);
                             if (scanEndTime == latestUpperBoundTimestamp) {
                                 LOG.info("Rebuild completed for all 
inactive/disabled indexes in data table:"
                                         + dataPTable.getName());
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index e1df403..7770070 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -17,8 +17,6 @@
  */
 package org.apache.phoenix.index;
 
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
-
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
@@ -39,7 +37,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
-import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
@@ -512,25 +509,11 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     }
 
     private static void incrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable) throws IOException {
-        incrementCounterForIndex(conn, failedIndexTable, 1);
+        IndexUtil.incrementCounterForIndex(conn, failedIndexTable, 1);
     }
 
     private static void decrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable) throws IOException {
-        incrementCounterForIndex(conn, failedIndexTable, -1);
-    }
-    
-    private static void incrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable,long amount) throws IOException {
-        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(failedIndexTable);
-        Increment incr = new Increment(indexTableKey);
-        incr.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount);
-        try {
-            conn.getQueryServices()
-                    
.getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
-                            conn.getQueryServices().getProps()).getName())
-                    .increment(incr);
-        } catch (SQLException e) {
-            throw new IOException(e);
-        }
+        IndexUtil.incrementCounterForIndex(conn, failedIndexTable, -1);
     }
 
     private static boolean canRetryMore(int numRetry, int maxRetries, long 
canRetryUntil) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index d95b630..58d8eb7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.util;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
 import static 
org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
 import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY;
 import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER;
@@ -40,10 +41,10 @@ import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
@@ -820,5 +821,32 @@ public class IndexUtil {
                                     Collections.<PTable>emptyIterator();
         return Lists.newArrayList(indexIterator);
     }
-    
+    public static Result incrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable,long amount) throws IOException {
+        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+        Increment incr = new Increment(indexTableKey);
+        incr.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount);
+        try {
+            return conn.getQueryServices()
+                    
.getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                            conn.getQueryServices().getProps()).getName())
+                    .increment(incr);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public static long getIndexPendingDisableCount(PhoenixConnection conn, 
String failedIndexTable) throws IOException {
+        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+        Get get = new Get(indexTableKey);
+        get.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+        try {
+            Result result = conn.getQueryServices()
+                    
.getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                            conn.getQueryServices().getProps()).getName())
+                    .get(get);
+            return Bytes.toLong(result.getValue(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES));
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
 }

Reply via email to