This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit d5d7cd6b57cfcceb9ed7e11ffe3694831b55281d
Author: Monani Mihir <monani.mi...@gmail.com>
AuthorDate: Fri Apr 12 21:48:45 2019 +0530

    PHOENIX-5194 Thread Cache is not update for Index retries in for 
MutationState#send()#doMutation()
---
 .../org/apache/phoenix/execute/MutationState.java  | 45 ++++++++++++++++++----
 .../phoenix/index/PhoenixIndexFailurePolicy.java   | 10 ++++-
 2 files changed, 45 insertions(+), 10 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 33cd596..1cbc4bc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -41,6 +41,7 @@ import javax.annotation.concurrent.Immutable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -220,7 +221,7 @@ public class MutationState implements SQLCloseable {
      * Commit a write fence when creating an index so that we can detect when 
a data table transaction is started before
      * the create index but completes after it. In this case, we need to rerun 
the data table transaction after the
      * index creation so that the index rows are generated. See TEPHRA-157 for 
more information.
-     * 
+     *
      * @param dataTable
      *            the data table upon which an index is being added
      * @throws SQLException
@@ -445,7 +446,7 @@ public class MutationState implements SQLCloseable {
     /**
      * Combine a newer mutation with this one, where in the event of overlaps, 
the newer one will take precedence.
      * Combine any metrics collected for the newer mutation.
-     * 
+     *
      * @param newMutationState
      *            the newer mutation state
      */
@@ -500,8 +501,8 @@ public class MutationState implements SQLCloseable {
             final MultiRowMutationState values, final long mutationTimestamp, 
final long serverTimestamp,
             boolean includeAllIndexes, final boolean sendAll) {
         final PTable table = tableRef.getTable();
-        final List<PTable> indexList = includeAllIndexes ? 
-                
Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()))
 : 
+        final List<PTable> indexList = includeAllIndexes ?
+                
Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()))
 :
                     IndexUtil.getClientMaintainedIndexes(table);
         final Iterator<PTable> indexes = indexList.iterator();
         final List<Mutation> mutationList = 
Lists.newArrayListWithExpectedSize(values.size());
@@ -648,7 +649,7 @@ public class MutationState implements SQLCloseable {
 
     /**
      * Get the unsorted list of HBase mutations for the tables with 
uncommitted data.
-     * 
+     *
      * @return list of HBase mutations for uncommitted data.
      */
     public Iterator<Pair<byte[], List<Mutation>>> toMutations(Long timestamp) {
@@ -730,7 +731,7 @@ public class MutationState implements SQLCloseable {
     /**
      * Validates that the meta data is valid against the server meta data if 
we haven't yet done so. Otherwise, for
      * every UPSERT VALUES call, we'd need to hit the server to see if the 
meta data has changed.
-     * 
+     *
      * @return the server time to use for the upsert
      * @throws SQLException
      *             if the table or any columns no longer exist
@@ -953,7 +954,7 @@ public class MutationState implements SQLCloseable {
                     TableRef origTableRef = tableInfo.getOrigTableRef();
                     PTable table = origTableRef.getTable();
                     table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    final ServerCache cache = tableInfo.isDataTable() ? 
+                    final ServerCache cache = tableInfo.isDataTable() ?
                             
IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
                                     mutationList, indexMetaDataPtr) : null;
                     // If we haven't retried yet, retry for this case only, as 
it's possible that
@@ -982,7 +983,10 @@ public class MutationState implements SQLCloseable {
                         for (final List<Mutation> mutationBatch : 
mutationBatchList) {
                             if (shouldRetryIndexedMutation) {
                                 // if there was an index write failure, retry 
the mutation in a loop
-                                final HTableInterface finalHTable = hTable;
+                                final Table finalHTable = hTable;
+                                final ImmutableBytesWritable 
finalindexMetaDataPtr =
+                                        indexMetaDataPtr;
+                                final PTable finalPTable = table;
                                 
PhoenixIndexFailurePolicy.doBatchWithRetries(new MutateCommand() {
                                     @Override
                                     public void doMutation() throws 
IOException {
@@ -991,6 +995,9 @@ public class MutationState implements SQLCloseable {
                                         } catch (InterruptedException e) {
                                             Thread.currentThread().interrupt();
                                             throw new IOException(e);
+                                        } catch (IOException e) {
+                                            e = 
updateTableRegionCacheIfNecessary(e);
+                                            throw e;
                                         }
                                     }
 
@@ -998,6 +1005,28 @@ public class MutationState implements SQLCloseable {
                                     public List<Mutation> getMutationList() {
                                         return mutationBatch;
                                     }
+
+                                    private IOException
+                                            
updateTableRegionCacheIfNecessary(IOException ioe) {
+                                        SQLException sqlE =
+                                                
ServerUtil.parseLocalOrRemoteServerException(ioe);
+                                        if (sqlE != null
+                                                && sqlE.getErrorCode() == 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+                                                        .getErrorCode()) {
+                                            try {
+                                                
connection.getQueryServices().clearTableRegionCache(
+                                                    
finalHTable.getName().getName());
+                                                
IndexMetaDataCacheClient.setMetaDataOnMutations(
+                                                    connection, finalPTable, 
mutationBatch,
+                                                    finalindexMetaDataPtr);
+                                            } catch (SQLException e) {
+                                                return 
ServerUtil.createIOException(
+                                                    "Exception during updating 
index meta data cache",
+                                                    ioe);
+                                            }
+                                        }
+                                        return ioe;
+                                    }
                                 }, iwe, connection, 
connection.getQueryServices().getProps());
                             } else {
                                 hTable.batch(mutationBatch);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
index 46919d5..c58b5c9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java
@@ -491,8 +491,14 @@ public class PhoenixIndexFailurePolicy extends 
DelegateIndexFailurePolicy {
             } catch (IOException e) {
                 SQLException inferredE = 
ServerUtil.parseLocalOrRemoteServerException(e);
                 if (inferredE == null || inferredE.getErrorCode() != 
SQLExceptionCode.INDEX_WRITE_FAILURE.getErrorCode()) {
-                    // if it's not an index write exception, throw exception, 
to be handled normally in caller's try-catch
-                    throw e;
+                    // If this call is from phoenix client, we also need to 
check if SQLException
+                    // error is INDEX_METADATA_NOT_FOUND or not
+                    // if it's not an INDEX_METADATA_NOT_FOUND, throw 
exception,
+                    // to be handled normally in caller's try-catch
+                    if (inferredE.getErrorCode() != 
SQLExceptionCode.INDEX_METADATA_NOT_FOUND
+                            .getErrorCode()) {
+                        throw e;
+                    }
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();

Reply via email to