Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.3 a9170b232 -> 5771eb213


PHOENIX-4785 Unable to write to table if index is made active during retry


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5771eb21
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5771eb21
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5771eb21

Branch: refs/heads/4.x-HBase-1.3
Commit: 5771eb213feca0e50c9f3542b5118e44b7816f3e
Parents: a9170b2
Author: Ankit Singhal <ankitsingha...@gmail.com>
Authored: Thu Jun 21 16:11:41 2018 -0700
Committer: Ankit Singhal <ankitsingha...@gmail.com>
Committed: Thu Jun 21 16:11:41 2018 -0700

----------------------------------------------------------------------
 .../end2end/index/MutableIndexFailureIT.java    | 128 ++++++++++++++-----
 .../MutableIndexFailureWithNamespaceIT.java     |  80 ++++++++++++
 .../coprocessor/MetaDataEndpointImpl.java       |  30 +++++
 .../index/PhoenixIndexFailurePolicy.java        |  71 +++++++++-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   2 +-
 5 files changed, 276 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5771eb21/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index dfbaf3f..8f88513 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -28,10 +28,16 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -104,10 +110,10 @@ public class MutableIndexFailureIT extends BaseTest {
     private final boolean throwIndexWriteFailure;
     private String schema = generateUniqueName();
     private List<CommitException> exceptions = Lists.newArrayList();
-    private static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
-    private static final int forwardOverlapMs = 1000;
-    private static final int disableTimestampThresholdMs = 10000;
-    private static final int numRpcRetries = 2;
+    protected static RegionCoprocessorEnvironment 
indexRebuildTaskRegionEnvironment;
+    protected static final int forwardOverlapMs = 1000;
+    protected static final int disableTimestampThresholdMs = 10000;
+    protected static final int numRpcRetries = 2;
 
     public MutableIndexFailureIT(boolean transactional, boolean localIndex, 
boolean isNamespaceMapped, Boolean disableIndexOnWriteFailure, boolean 
failRebuildTask, Boolean throwIndexWriteFailure) {
         this.transactional = transactional;
@@ -127,6 +133,23 @@ public class MutableIndexFailureIT extends BaseTest {
 
     @BeforeClass
     public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProps();
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        NUM_SLAVES_BASE = 4;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        indexRebuildTaskRegionEnvironment =
+                (RegionCoprocessorEnvironment) getUtility()
+                        .getRSForFirstRegionInTable(
+                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
+                        .get(0).getCoprocessorHost()
+                        
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        MetaDataRegionObserver.initRebuildIndexConnectionProps(
+            indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+    
+    protected static Map<String,String> getServerProps(){
         Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(10);
         serverProps.put("hbase.coprocessor.region.classes", 
FailingRegionObserver.class.getName());
         serverProps.put(HConstants.HBASE_RPC_TIMEOUT_KEY, "10000");
@@ -142,19 +165,7 @@ public class MutableIndexFailureIT extends BaseTest {
          * because we want to control it's execution ourselves
          */
         serverProps.put(QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY, 
Long.toString(Long.MAX_VALUE));
-        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
-        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
-        NUM_SLAVES_BASE = 4;
-        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
-        indexRebuildTaskRegionEnvironment =
-                (RegionCoprocessorEnvironment) getUtility()
-                        .getRSForFirstRegionInTable(
-                            
PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
-                        
.getOnlineRegions(PhoenixDatabaseMetaData.SYSTEM_CATALOG_HBASE_TABLE_NAME)
-                        .get(0).getCoprocessorHost()
-                        
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
-        MetaDataRegionObserver.initRebuildIndexConnectionProps(
-            indexRebuildTaskRegionEnvironment.getConfiguration());
+        return serverProps;
     }
 
     @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}")
 // name is used by failsafe as file name in reports
@@ -162,16 +173,10 @@ public class MutableIndexFailureIT extends BaseTest {
         return Arrays.asList(new Object[][] { 
                 // note - can't disableIndexOnWriteFailure without 
throwIndexWriteFailure, PHOENIX-4130
                 { false, false, false, false, false, false},
-                { false, false, true, true, false, null},
-                { false, false, true, true, false, true},
                 { false, false, false, true, false, null},
                 { true, false, false, true, false, null},
-                { true, false, true, true, false, null},
-                { false, true, true, true, false, null},
                 { false, true, false, null, false, null},
                 { true, true, false, true, false, null},
-                { true, true, true, null, false, null},
-
                 { false, false, false, false, false, null},
                 { false, true, false, false, false, null},
                 { false, false, false, false, false, null},
@@ -180,9 +185,7 @@ public class MutableIndexFailureIT extends BaseTest {
                 { false, true, false, true, false, null},
                 { false, true, false, true, false, null},
                 { false, false, false, true, true, null},
-                { false, false, true, true, true, null},
                 { false, false, false, false, true, false},
-                { false, false, true, false, true, false},
                 } 
         );
     }
@@ -258,6 +261,9 @@ public class MutableIndexFailureIT extends BaseTest {
             rs = conn.createStatement().executeQuery(query);
             assertFalse(rs.next());
 
+            initializeTable(conn, fullTableName);
+            addRowsInTableDuringRetry(fullTableName);
+
             // Verify the metadata for index is correct.
             rs = conn.getMetaData().getTables(null, 
StringUtil.escapeLike(schema), null,
                     new String[] { PTableType.INDEX.toString() });
@@ -270,8 +276,9 @@ public class MutableIndexFailureIT extends BaseTest {
             assertTrue(rs.next());
             assertEquals(thirdIndexName, rs.getString(3));
             assertEquals(PIndexState.ACTIVE.toString(), 
rs.getString("INDEX_STATE"));
-            initializeTable(conn, fullTableName);
-            
+            // we should be able to write to ACTIVE index even in case of 
disable index on failure policy
+            addRowToTable(conn, fullTableName);
+
             query = "SELECT /*+ NO_INDEX */ k,v1 FROM " + fullTableName;
             rs = conn.createStatement().executeQuery("EXPLAIN " + query);
             String expectedPlan = "CLIENT PARALLEL 2-WAY FULL SCAN OVER "
@@ -395,16 +402,67 @@ public class MutableIndexFailureIT extends BaseTest {
         stmt.setString(2, "x");
         stmt.setString(3, "1");
         stmt.execute();
-        stmt.setString(1, "b");
-        stmt.setString(2, "y");
-        stmt.setString(3, "2");
-        stmt.execute();
+        conn.commit();
+    }
+
+    private void addRowToTable(Connection conn, String tableName) throws 
SQLException {
+        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + 
tableName + " VALUES(?,?,?)");
         stmt.setString(1, "c");
         stmt.setString(2, "z");
         stmt.setString(3, "3");
         stmt.execute();
         conn.commit();
+    }
 
+    private void addRowsInTableDuringRetry(final String tableName)
+            throws SQLException, InterruptedException, ExecutionException {
+        int threads=10;
+        boolean wasFailWrite = FailingRegionObserver.FAIL_WRITE;
+        boolean wasToggleFailWriteForRetry = 
FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY;
+        try {
+            Callable callable = new Callable() {
+
+                @Override
+                public Boolean call() {
+                    Properties props = 
PropertiesUtil.deepCopy(TEST_PROPERTIES);
+                    props.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
String.valueOf(isNamespaceMapped));
+                    try (Connection conn = driver.connect(url, props)) {
+                        // In case of disable index on failure policy, INDEX 
will be in PENDING_DISABLE on first retry
+                        // but will
+                        // become active if retry is successfull
+                        PreparedStatement stmt = conn.prepareStatement("UPSERT 
INTO " + tableName + " VALUES(?,?,?)");
+                        stmt.setString(1, "b");
+                        stmt.setString(2, "y");
+                        stmt.setString(3, "2");
+                        stmt.execute();
+                        if (!leaveIndexActiveOnFailure && !transactional) {
+                            FailingRegionObserver.FAIL_WRITE = true;
+                            FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY 
= true;
+                        }
+                        conn.commit();
+                    } catch (SQLException e) {
+                        return false;
+                    }
+                    return true;
+                }
+            };
+            ExecutorService executor = Executors.newFixedThreadPool(threads);
+            List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+            for (int i = 0; i < threads; i++) {
+                futures.add(executor.submit(callable));
+            }
+            for (Future<Boolean> future : futures) {
+                Boolean isSuccess = future.get();
+                // transactions can have conflict so ignoring the check for 
them
+                if (!transactional) {
+                    assertTrue(isSuccess);
+                }
+            }
+            executor.shutdown();
+        } finally {
+            FailingRegionObserver.FAIL_WRITE = wasFailWrite;
+            FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = 
wasToggleFailWriteForRetry;
+        }
     }
 
     private void validateDataWithIndex(Connection conn, String fullTableName, 
String fullIndexName, boolean localIndex) throws SQLException {
@@ -504,6 +562,7 @@ public class MutableIndexFailureIT extends BaseTest {
     }
 
     public static class FailingRegionObserver extends SimpleRegionObserver {
+        public static boolean TOGGLE_FAIL_WRITE_FOR_RETRY = false;
         public static volatile boolean FAIL_WRITE = false;
         public static volatile boolean FAIL_NEXT_WRITE = false;
         public static final String FAIL_INDEX_NAME = "FAIL_IDX";
@@ -518,6 +577,9 @@ public class MutableIndexFailureIT extends BaseTest {
             } else if 
(c.getEnvironment().getRegionInfo().getTable().getNameAsString().endsWith("A_" 
+ FAIL_INDEX_NAME)
                     && FAIL_WRITE) {
                 throwException = true;
+                if (TOGGLE_FAIL_WRITE_FOR_RETRY) {
+                    FAIL_WRITE = !FAIL_WRITE;
+                }
             } else {
                 // When local index updates are atomic with data updates, 
testing a write failure to a local
                 // index won't make sense.
@@ -540,7 +602,9 @@ public class MutableIndexFailureIT extends BaseTest {
                 }
             }
             if (throwException) {
-                dropIndex(c);
+                if (!TOGGLE_FAIL_WRITE_FOR_RETRY) {
+                    dropIndex(c);
+                }
                 throw new DoNotRetryIOException();
             }
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5771eb21/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
new file mode 100644
index 0000000..5ed9e1f
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureWithNamespaceIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+/*
+ * This class is to ensure gets its own cluster with Namespace Enabled
+ */
+public class MutableIndexFailureWithNamespaceIT extends MutableIndexFailureIT {
+
+    public MutableIndexFailureWithNamespaceIT(boolean transactional, boolean 
localIndex, boolean isNamespaceMapped,
+            Boolean disableIndexOnWriteFailure, boolean failRebuildTask, 
Boolean throwIndexWriteFailure) {
+        super(transactional, localIndex, isNamespaceMapped, 
disableIndexOnWriteFailure, failRebuildTask,
+                throwIndexWriteFailure);
+    }
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = getServerProps();
+        serverProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.TRUE.toString());
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
+        clientProps.put(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, 
Boolean.TRUE.toString());
+        clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "2");
+        NUM_SLAVES_BASE = 4;
+        setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), 
new ReadOnlyProps(clientProps.entrySet().iterator()));
+        TableName systemTable = 
SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES,
+                true);
+        indexRebuildTaskRegionEnvironment = 
(RegionCoprocessorEnvironment)getUtility()
+                
.getRSForFirstRegionInTable(systemTable).getOnlineRegions(systemTable).get(0).getCoprocessorHost()
+                
.findCoprocessorEnvironment(MetaDataRegionObserver.class.getName());
+        
MetaDataRegionObserver.initRebuildIndexConnectionProps(indexRebuildTaskRegionEnvironment.getConfiguration());
+    }
+    
+    @Parameters(name = 
"MutableIndexFailureIT_transactional={0},localIndex={1},isNamespaceMapped={2},disableIndexOnWriteFailure={3},failRebuildTask={4},throwIndexWriteFailure={5}")
 // name is used by failsafe as file name in reports
+    public static List<Object[]> data() {
+        return Arrays.asList(new Object[][] { 
+                // note - can't disableIndexOnWriteFailure without 
throwIndexWriteFailure, PHOENIX-4130
+                { false, false, true, true, false, null},
+                { false, false, true, true, false, true},
+                { true, false, true, true, false, null},
+                { false, true, true, true, false, null},
+                { true, true, true, null, false, null},
+                { false, false, true, true, true, null},
+                { false, false, true, false, true, false},
+                } 
+        );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5771eb21/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5e2e4df..ae2fa66 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -3871,7 +3871,37 @@ public class MetaDataEndpointImpl extends 
MetaDataProtocol implements Coprocesso
                         return;
                     }
                 }
+                if (newState == PIndexState.PENDING_DISABLE && currentState != 
PIndexState.PENDING_DISABLE) {
+                    // reset count for first PENDING_DISABLE
+                    newKVs.add(KeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
+                            
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, 
Bytes.toBytes(0L)));
+                }
+                if (currentState == PIndexState.PENDING_DISABLE) {
+                    if (newState == PIndexState.ACTIVE) {
+                        //before making index ACTIVE check if all clients 
succeed otherwise keep it PENDING_DISABLE
+                        byte[] count = region
+                                .get(new Get(key).addColumn(TABLE_FAMILY_BYTES,
+                                        
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES))
+                                .getValue(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES);
+                        if (count != null && Bytes.toLong(count) != 0) {
+                            newState = PIndexState.PENDING_DISABLE;
+                            newKVs.remove(disableTimeStampKVIndex);
+                            newKVs.set(indexStateKVIndex, 
KeyValueUtil.newKeyValue(key, TABLE_FAMILY_BYTES,
+                                    INDEX_STATE_BYTES, timeStamp, 
Bytes.toBytes(newState.getSerializedValue())));
+                        }
+                    } else if (newState == PIndexState.DISABLE) {
+                        //reset the counter for pending disable when 
transitioning from PENDING_DISABLE to DISABLE
+                        newKVs.add(KeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
+                                
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, 
Bytes.toBytes(0L)));
+                    }
 
+                }
+                
+                if(newState == PIndexState.ACTIVE||newState == 
PIndexState.PENDING_ACTIVE||newState == PIndexState.DISABLE){
+                    newKVs.add(KeyValueUtil.newKeyValue(key, 
TABLE_FAMILY_BYTES,
+                            
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, timeStamp, 
Bytes.toBytes(0L)));   
+                }
+                
                 if (currentState == PIndexState.BUILDING && newState != 
PIndexState.ACTIVE) {
                     timeStamp = currentStateKV.getTimestamp();
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5771eb21/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 428124f..397565b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.index;
 
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
+
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -438,6 +441,7 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     public static void doBatchWithRetries(MutateCommand mutateCommand,
             IndexWriteException iwe, PhoenixConnection connection, 
ReadOnlyProps config)
             throws IOException {
+        incrementPendingDisableCounter(iwe, connection);
         int maxTries = config.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
         long pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -472,6 +476,57 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
         throw new DoNotRetryIOException(iwe); // send failure back to client
     }
 
+    private static void incrementPendingDisableCounter(IndexWriteException 
indexWriteException,PhoenixConnection conn) {
+        try {
+            Set<String> indexesToUpdate = new HashSet<>();
+            if (indexWriteException instanceof 
MultiIndexWriteFailureException) {
+                MultiIndexWriteFailureException indexException =
+                        (MultiIndexWriteFailureException) indexWriteException;
+                List<HTableInterfaceReference> failedIndexes = 
indexException.getFailedTables();
+                if (indexException.isDisableIndexOnFailure() && failedIndexes 
!= null) {
+                    for (HTableInterfaceReference failedIndex : failedIndexes) 
{
+                        String failedIndexTable = failedIndex.getTableName();
+                        if (!indexesToUpdate.contains(failedIndexTable)) {
+                            incrementCounterForIndex(conn,failedIndexTable);
+                            indexesToUpdate.add(failedIndexTable);
+                        }
+                    }
+                }
+            } else if (indexWriteException instanceof 
SingleIndexWriteFailureException) {
+                SingleIndexWriteFailureException indexException =
+                        (SingleIndexWriteFailureException) indexWriteException;
+                String failedIndex = indexException.getTableName();
+                if (indexException.isDisableIndexOnFailure() && failedIndex != 
null) {
+                    incrementCounterForIndex(conn,failedIndex);
+                }
+            }
+        } catch (Exception handleE) {
+            LOG.warn("Error while trying to handle index write exception", 
indexWriteException);
+        }
+    }
+
+    private static void incrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable) throws IOException {
+        incrementCounterForIndex(conn, failedIndexTable, 1);
+    }
+
+    private static void decrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable) throws IOException {
+        incrementCounterForIndex(conn, failedIndexTable, -1);
+    }
+    
+    private static void incrementCounterForIndex(PhoenixConnection conn, 
String failedIndexTable,long amount) throws IOException {
+        byte[] indexTableKey = 
SchemaUtil.getTableKeyFromFullName(failedIndexTable);
+        Increment incr = new Increment(indexTableKey);
+        incr.addColumn(TABLE_FAMILY_BYTES, 
PhoenixDatabaseMetaData.PENDING_DISABLE_COUNT_BYTES, amount);
+        try {
+            conn.getQueryServices()
+                    
.getTable(SchemaUtil.getPhysicalTableName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME,
+                            conn.getQueryServices().getProps()).getName())
+                    .increment(incr);
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
+    }
+
     private static boolean canRetryMore(int numRetry, int maxRetries, long 
canRetryUntil) {
         // If there is a single try we must not take into account the time.
         return numRetry < maxRetries
@@ -494,13 +549,25 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
     }
 
     private static void updateIndex(String indexFullName, PhoenixConnection 
conn,
-            PIndexState indexState) throws SQLException {
+            PIndexState indexState) throws SQLException, IOException {
+        //Decrement the counter because we will be here when client give retry 
after getting failed or succeed
+        decrementCounterForIndex(conn,indexFullName);
+        Long indexDisableTimestamp = null;
         if (PIndexState.DISABLE.equals(indexState)) {
             LOG.info("Disabling index after hitting max number of index write 
retries: "
                     + indexFullName);
+            IndexUtil.updateIndexState(conn, indexFullName, indexState, 
indexDisableTimestamp);
         } else if (PIndexState.ACTIVE.equals(indexState)) {
             LOG.debug("Resetting index to active after subsequent success " + 
indexFullName);
+            //At server disabled timestamp will be reset only if there is no 
other client is in PENDING_DISABLE state
+            indexDisableTimestamp = 0L;
+            try {
+                IndexUtil.updateIndexState(conn, indexFullName, indexState, 
indexDisableTimestamp);
+            } catch (SQLException e) {
+                // It's possible that some other client had made the Index 
DISABLED already , so we can ignore unallowed
+                // transition(DISABLED->ACTIVE)
+                if (e.getErrorCode() != 
SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION.getErrorCode()) { throw e; }
+            }
         }
-        IndexUtil.updateIndexState(conn, indexFullName, indexState, null);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5771eb21/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index 5bcd286..8dd4a88 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -49,7 +49,6 @@ import 
org.apache.phoenix.expression.function.SQLViewTypeFunction;
 import org.apache.phoenix.expression.function.SqlTypeNameFunction;
 import org.apache.phoenix.expression.function.TransactionProviderNameFunction;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.iterate.DelegateResultIterator;
 import org.apache.phoenix.iterate.MaterializedResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
@@ -214,6 +213,7 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     
     public static final String TABLE_FAMILY = 
QueryConstants.DEFAULT_COLUMN_FAMILY;
     public static final byte[] TABLE_FAMILY_BYTES = 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
+    public static final byte[] PENDING_DISABLE_COUNT_BYTES = 
Bytes.toBytes("PENDING_DISABLE_COUNT");
 
     public static final String TYPE_SEQUENCE = "SEQUENCE";
     public static final String SYSTEM_FUNCTION_TABLE = "FUNCTION";

Reply via email to