This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new 78803190f3 PHOENIX-7515: Add metric for count of Phoenix client 
batches used by a commit call (#2069)
78803190f3 is described below

commit 78803190f356c1f853ea34a9dcb4c17fdc322c9a
Author: sanjeet006py <[email protected]>
AuthorDate: Mon Feb 3 23:10:02 2025 +0530

    PHOENIX-7515: Add metric for count of Phoenix client batches used by a 
commit call (#2069)
---
 .../org/apache/phoenix/execute/MutationState.java  |  10 +-
 .../org/apache/phoenix/monitoring/MetricType.java  |   3 +
 .../phoenix/monitoring/MutationMetricQueue.java    |  17 ++-
 .../phoenix/monitoring/TableClientMetrics.java     |   7 +-
 .../phoenix/monitoring/BasePhoenixMetricsIT.java   |   9 +-
 .../phoenix/monitoring/PhoenixMetricsIT.java       |  37 +++++-
 .../monitoring/PhoenixTableLevelMetricsIT.java     | 124 ++++++++++++++++++---
 .../monitoring/TableMetricsManagerTest.java        |  24 ++--
 8 files changed, 185 insertions(+), 46 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index d6c118c4bb..42f21d502b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -1321,6 +1321,7 @@ public class MutationState implements SQLCloseable {
             List<Mutation> mutationList = pair.getValue();
             List<List<Mutation>> mutationBatchList =
                     getMutationBatchList(batchSize, batchSizeBytes, 
mutationList);
+            int totalBatchCount = mutationBatchList.size();
 
             // create a span per target table
             // TODO maybe we can be smarter about the table name to string 
here?
@@ -1454,7 +1455,6 @@ public class MutationState implements SQLCloseable {
                         // REPLAY_ONLY_INDEX_WRITES for first batch
                         // only in case of 1121 SQLException
                         itrListMutation.remove();
-
                         batchCount++;
                         if (LOGGER.isDebugEnabled())
                             LOGGER.debug("Sent batch of " + 
mutationBatch.size() + " for "
@@ -1548,7 +1548,7 @@ public class MutationState implements SQLCloseable {
                                     numMutations,
                                     numFailedMutations,
                                     numFailedPhase3Mutations,
-                                    mutationCommitTime);
+                                    mutationCommitTime, totalBatchCount);
                     // Combine failure mutation metrics with committed ones 
for the final picture
                     
committedMutationsMetric.combineMetric(failureMutationMetrics);
                     mutationMetricQueue.addMetricsForTable(htableNameStr, 
committedMutationsMetric);
@@ -1648,7 +1648,7 @@ public class MutationState implements SQLCloseable {
                 numUpsertMutationsInBatch,
                 allUpsertsMutations ? 1 : 0,
                 numDeleteMutationsInBatch,
-                allDeletesMutations ? 1 : 0);
+                allDeletesMutations ? 1 : 0, 0);
     }
 
     /**
@@ -1667,7 +1667,7 @@ public class MutationState implements SQLCloseable {
     static MutationMetric getCommittedMutationsMetric(
             MutationBytes totalMutationBytesObject, List<List<Mutation>> 
unsentMutationBatchList,
             long numMutations, long numFailedMutations,
-            long numFailedPhase3Mutations, long mutationCommitTime) {
+            long numFailedPhase3Mutations, long mutationCommitTime, long 
mutationBatchCounter) {
         long committedUpsertMutationBytes = totalMutationBytesObject == null ? 
0 :
                 totalMutationBytesObject.getUpsertMutationBytes();
         long committedAtomicUpsertMutationBytes = totalMutationBytesObject == 
null ? 0:
@@ -1731,7 +1731,7 @@ public class MutationState implements SQLCloseable {
                 committedDeleteMutationCounter,
                 committedTotalMutationBytes,
                 numFailedPhase3Mutations,
-                0, 0, 0, 0 );
+                0, 0, 0, 0, mutationBatchCounter);
     }
 
     private void filterIndexCheckerMutations(Map<TableInfo, List<Mutation>> 
mutationMap,
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 8a45fcd554..44a7d75945 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -88,6 +88,9 @@ public enum MetricType {
     DELETE_BATCH_FAILED_COUNTER("dbfc", "Number of delete mutation batches 
that failed to be committed",
             LogLevel.OFF, PLong.INSTANCE),
 
+    MUTATION_BATCH_COUNTER("mbc", "Number of mutation batches committed "
+            + "in a commit call", LogLevel.OFF, PLong.INSTANCE),
+
     // select-specific query (read) metrics updated during executeQuery
     SELECT_SUCCESS_SQL_COUNTER("sss", "Counter for number of select sql 
queries that successfully"
             + " passed the executeQuery phase", LogLevel.OFF, PLong.INSTANCE),
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
index 5a129c0914..d42a10190b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MutationMetricQueue.java
@@ -23,6 +23,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_BYTES;
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_MUTATION_SQL_COUNTER;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
@@ -93,7 +94,8 @@ public class MutationMetricQueue {
             
publishedMetricsForTable.put(metric.getUpsertBatchFailedCounter().getMetricType(),
 metric.getUpsertBatchFailedCounter().getValue());
             
publishedMetricsForTable.put(metric.getDeleteBatchFailedSize().getMetricType(), 
metric.getDeleteBatchFailedSize().getValue());
             
publishedMetricsForTable.put(metric.getDeleteBatchFailedCounter().getMetricType(),
 metric.getDeleteBatchFailedCounter().getValue());
-
+            
publishedMetricsForTable.put(metric.getMutationBatchCounter().getMetricType(),
+                    metric.getMutationBatchCounter().getValue());
         }
         return publishedMetrics;
     }
@@ -125,8 +127,11 @@ public class MutationMetricQueue {
         private final CombinableMetric numOfIndexCommitFailMutations = new 
CombinableMetricImpl(
                 INDEX_COMMIT_FAILURE_SIZE);
 
+        private final CombinableMetric mutationBatchCounter =
+                new CombinableMetricImpl(MUTATION_BATCH_COUNTER);
+
         public static final MutationMetric EMPTY_METRIC =
-                new MutationMetric(0,0,0,0, 0, 0,0,0,0,0,0,0,0,0,0);
+                new MutationMetric(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0);
 
         public MutationMetric(long numMutations, long upsertMutationsSizeBytes,
                 long deleteMutationsSizeBytes, long commitTimeForUpserts, long 
commitTimeForAtomicUpserts,
@@ -134,7 +139,7 @@ public class MutationMetricQueue {
                 long deleteMutationSqlCounterSuccess, long totalMutationBytes,
                 long numOfPhase3Failed, long upsertBatchFailedSize,
                 long upsertBatchFailedCounter, long deleteBatchFailedSize,
-                long deleteBatchFailedCounter) {
+                long deleteBatchFailedCounter, long mutationBatchCounter) {
             this.numMutations.change(numMutations);
             this.totalCommitTimeForUpserts.change(commitTimeForUpserts);
             
this.totalCommitTimeForAtomicUpserts.change(commitTimeForAtomicUpserts);
@@ -151,6 +156,7 @@ public class MutationMetricQueue {
             this.upsertBatchFailedCounter.change(upsertBatchFailedCounter);
             this.deleteBatchFailedSize.change(deleteBatchFailedSize);
             this.deleteBatchFailedCounter.change(deleteBatchFailedCounter);
+            this.mutationBatchCounter.change(mutationBatchCounter);
         }
 
         public CombinableMetric getTotalCommitTimeForUpserts() {
@@ -215,6 +221,10 @@ public class MutationMetricQueue {
             return deleteBatchFailedCounter;
         }
 
+        public CombinableMetric getMutationBatchCounter() {
+            return mutationBatchCounter;
+        }
+
         public void combineMetric(MutationMetric other) {
             this.numMutations.combine(other.numMutations);
             
this.totalCommitTimeForUpserts.combine(other.totalCommitTimeForUpserts);
@@ -232,6 +242,7 @@ public class MutationMetricQueue {
             
this.upsertBatchFailedCounter.combine(other.upsertBatchFailedCounter);
             this.deleteBatchFailedSize.combine(other.deleteBatchFailedSize);
             
this.deleteBatchFailedCounter.combine(other.deleteBatchFailedCounter);
+            this.mutationBatchCounter.combine(other.mutationBatchCounter);
         }
 
     }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
index 5a1aa3dbc7..6832775919 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/TableClientMetrics.java
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_SIZE;
 import static 
org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_FAILED_SIZE;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BYTES;
 import static 
org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILURES;
 import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
@@ -74,9 +75,6 @@ import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_SUCCESS_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.SELECT_AGGREGATE_FAILURE_SQL_COUNTER;
-import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
-import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_FAILURES;
-import static 
org.apache.phoenix.monitoring.MetricType.TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS;
 import static 
org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_COMMIT_TIME;
 import static 
org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_QUERY_TIME;
@@ -140,7 +138,8 @@ public class TableClientMetrics {
                 
TABLE_NUM_SYSTEM_TABLE_RPC_SUCCESS(NUM_SYSTEM_TABLE_RPC_SUCCESS),
                 
TABLE_NUM_SYSTEM_TABLE_RPC_FAILURES(NUM_SYSTEM_TABLE_RPC_FAILURES),
                 
TABLE_NUM_METADATA_LOOKUP_FAILURES(NUM_METADATA_LOOKUP_FAILURES),
-                
TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS);
+                
TABLE_TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS(TIME_SPENT_IN_SYSTEM_TABLE_RPC_CALLS),
+                TABLE_MUTATION_BATCH_SUCCESS_COUNTER(MUTATION_BATCH_COUNTER);
 
         private MetricType metricType;
         private PhoenixTableMetric metric;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
index 6be65afe5c..5a83bfa6d5 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/BasePhoenixMetricsIT.java
@@ -115,7 +115,7 @@ public abstract class BasePhoenixMetricsIT extends BaseTest 
{
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
 
-            assertEquals("There should have been sixteen metrics", 16, 
p.size());
+            assertEquals("There should have been seventeen metrics", 17, 
p.size());
 
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
@@ -132,6 +132,7 @@ public abstract class BasePhoenixMetricsIT extends BaseTest 
{
             boolean upsertMutationSqlCounterPresent = false;
             boolean upsertCommitTimeCounterPresent = false;
             boolean deleteCommitTimeCounterPresent = false;
+            boolean mutationBatchCounterPresent = false;
             for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
                 MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
@@ -206,6 +207,11 @@ public abstract class BasePhoenixMetricsIT extends 
BaseTest {
                     }
                     deleteCommitTimeCounterPresent = true;
                 }
+                else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) 
{
+                    assertTrue("mutation batch success counter should be 
greater than zero",
+                            metricValue > 0);
+                    mutationBatchCounterPresent = true;
+                }
             }
             assertTrue(mutationBatchSizePresent);
             assertTrue(mutationCommitTimePresent);
@@ -222,6 +228,7 @@ public abstract class BasePhoenixMetricsIT extends BaseTest 
{
             assertTrue(deleteBatchFailedCounterPresent);
             assertTrue(upsertCommitTimeCounterPresent);
             assertTrue(deleteCommitTimeCounterPresent);
+            assertTrue(mutationBatchCounterPresent);
         }
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 8c007df082..f3495fef91 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -41,6 +41,7 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXEC
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
@@ -76,7 +77,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -89,7 +89,6 @@ import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
 import org.apache.phoenix.log.LogLevel;
 import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.EnvironmentEdge;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -133,7 +132,8 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
     private static final String DELETE_ALL_DML = "DELETE FROM %s";
 
     private static final List<MetricType> mutationMetricsToSkip =
-            Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, 
DELETE_COMMIT_TIME);
+            Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, 
DELETE_COMMIT_TIME,
+                    MUTATION_BATCH_COUNTER);
     private static final List<MetricType> readMetricsToSkip =
             Lists.newArrayList(TASK_QUEUE_WAIT_TIME, TASK_EXECUTION_TIME, 
TASK_END_TO_END_TIME,
                     COUNT_MILLS_BETWEEN_NEXTS);
@@ -377,6 +377,29 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT 
{
         }
     }
 
+    static void createTableAndRunUpsertSelect(String destTableName, String 
sourceTableName,
+                                              boolean 
resetGlobalMetricsAfterTableCreate,
+                                              boolean 
resetTableMetricsAfterTableCreate,
+                                              boolean commit, Connection conn) 
throws SQLException {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(String.format(DDL, destTableName));
+        }
+        conn.commit();
+        if (resetGlobalMetricsAfterTableCreate) {
+            resetGlobalMetrics();
+        }
+
+        if (resetTableMetricsAfterTableCreate) {
+            PhoenixRuntime.clearTableLevelMetrics();
+        }
+        try (Statement stmt = conn.createStatement()) {
+            stmt.executeUpdate(String.format(UPSERT_SELECT_DML, destTableName, 
sourceTableName));
+        }
+        if (commit) {
+            conn.commit();
+        }
+    }
+
     static void doPointDeleteFromTable(String tableName, Connection conn) 
throws SQLException {
         try (PreparedStatement stmt = conn.prepareStatement(
                 String.format(POINT_DELETE_DML, tableName))) {
@@ -487,11 +510,12 @@ public class PhoenixMetricsIT extends 
BasePhoenixMetricsIT {
             String t = entry.getKey();
             assertEquals("Table names didn't match!", tableName, t);
             Map<MetricType, Long> p = entry.getValue();
-            assertEquals("There should have been sixteen metrics", 16, 
p.size());
+            assertEquals("There should have been seventeen metrics", 17, 
p.size());
             boolean mutationBatchSizePresent = false;
             boolean mutationCommitTimePresent = false;
             boolean mutationBytesPresent = false;
             boolean mutationBatchFailedPresent = false;
+            boolean mutationBatchCounterPresent = false;
             for (Entry<MetricType, Long> metric : p.entrySet()) {
                 MetricType metricType = metric.getKey();
                 long metricValue = metric.getValue();
@@ -508,11 +532,16 @@ public class PhoenixMetricsIT extends 
BasePhoenixMetricsIT {
                     assertEquals("Zero failed mutations expected", 0, 
metricValue);
                     mutationBatchFailedPresent = true;
                 }
+                else if (metricType.equals(MetricType.MUTATION_BATCH_COUNTER)) 
{
+                    assertEquals("Mutation batch success count should be 
greater than zero", 1, metricValue);
+                    mutationBatchCounterPresent = true;
+                }
             }
             assertTrue(mutationBatchSizePresent);
             assertTrue(mutationCommitTimePresent);
             assertTrue(mutationBytesPresent);
             assertTrue(mutationBatchFailedPresent);
+            assertTrue(mutationBatchCounterPresent);
         }
         Map<String, Map<MetricType, Long>> readMetrics = 
PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(pConn);
         assertEquals("Read metrics should be empty", 0, readMetrics.size());
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
index ab00c2ace4..18817dc8c4 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixTableLevelMetricsIT.java
@@ -62,6 +62,7 @@ import static 
org.apache.phoenix.monitoring.MetricType.ATOMIC_UPSERT_SQL_COUNTER
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_BYTES;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES;
+import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.NUM_SYSTEM_TABLE_RPC_SUCCESS;
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_FAILURE_SQL_COUNTER;
 import static 
org.apache.phoenix.monitoring.MetricType.DELETE_AGGREGATE_SUCCESS_SQL_COUNTER;
@@ -110,10 +111,13 @@ import static 
org.apache.phoenix.monitoring.MetricType.UPSERT_SUCCESS_SQL_COUNTE
 import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.POINT_LOOKUP_SELECT_QUERY;
 import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.RANGE_SCAN_SELECT_QUERY;
 import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndInsertValues;
+import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.createTableAndRunUpsertSelect;
 import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.doPointDeleteFromTable;
 import static 
org.apache.phoenix.monitoring.PhoenixMetricsIT.doDeleteAllFromTable;
+import static 
org.apache.phoenix.query.QueryServices.ENABLE_SERVER_UPSERT_SELECT;
 import static 
org.apache.phoenix.util.DelayedOrFailingRegionServer.INJECTED_EXCEPTION_STRING;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
+import static org.apache.phoenix.util.PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.clearTableLevelMetrics;
 import static 
org.apache.phoenix.util.PhoenixRuntime.getOverAllReadRequestMetricInfo;
 import static 
org.apache.phoenix.util.PhoenixRuntime.getPhoenixTableClientMetrics;
@@ -179,6 +183,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
         // Add our own driver
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         props.put(BaseTest.DRIVER_CLASS_NAME_ATTRIB, 
PhoenixMetricsTestingDriver.class.getName());
+        props.put(ENABLE_SERVER_UPSERT_SELECT, "true");
         initAndRegisterTestDriver(url, new 
ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -322,6 +327,8 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
      * @param writeMutMetrics                       write mutation metrics 
object
      * @param conn                                  connection object. Note: 
this method must be called after connection close
      *                                              since that's where we 
populate table-level write metrics
+     * @param expectedMutationBatchCount            expected number of 
mutation batches per commit call
+
      */
     private static void assertMutationTableMetrics(final boolean isUpsert, 
final String tableName,
             final long expectedUpsertOrDeleteSuccessSqlCt,
@@ -333,7 +340,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             final long expectedUpsertOrDeleteAggregateSuccessCt,
             final long expectedUpsertOrDeleteAggregateFailureCt,
             final Map<MetricType, Long> writeMutMetrics, final Connection conn,
-            final boolean expectedSystemCatalogMetric)
+            final boolean expectedSystemCatalogMetric, final long 
expectedMutationBatchCount)
             throws SQLException {
         assertTrue(conn != null && conn.isClosed());
         assertFalse(hasMutationBeenExplicitlyCommitted && writeMutMetrics == 
null);
@@ -423,6 +430,10 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                         writeMutMetrics.get(isUpsert ?
                                 UPSERT_BATCH_FAILED_COUNTER :
                                 DELETE_BATCH_FAILED_COUNTER), CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        writeMutMetrics.get(MUTATION_BATCH_COUNTER), 
CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        expectedMutationBatchCount, CompareOp.EQ);
             }
         }
         if (expectedSystemCatalogMetric) {
@@ -704,7 +715,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             // Must be asserted after connection close since that's where
             // we populate table-level metrics
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, 
numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 100);
         }
     }
 
@@ -730,7 +741,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, 
numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -770,7 +781,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             // mutation commit time since autoCommit was on
             assertMutationTableMetrics(true, tableName, numRows, 0,
                     writeMutMetrics.get(UPSERT_COMMIT_TIME), true, numRows, 0, 
0, numRows, 0,
-                    writeMutMetrics, conn,true);
+                    writeMutMetrics, conn,true, 10);
         }
     }
 
@@ -805,7 +816,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             }
             assertNotNull("Failed to get a connection!", conn);
             conn.close();
-            assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 
0, 1, 0, null, conn, true);
+            assertMutationTableMetrics(true, tableName, 0, 1, 0, false, 0, 0, 
0, 1, 0, null, conn, true, 0);
         }
     }
 
@@ -845,7 +856,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, delay, 
true, numRows, 0, 0, 1,
-                    0, writeMutMetrics, conn, true);
+                    0, writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -893,7 +904,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, 0, 1, 0, true, 1, 0, 
1, 0, 1,
-                    writeMutMetrics, conn, true);
+                    writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -944,7 +955,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, 
numRows, 0, numRows, 0,
-                    1, writeMutMetrics, conn, true);
+                    1, writeMutMetrics, conn, true, 1);
         }
     }
 
@@ -984,7 +995,70 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(true, tableName, numRows, 0, 0, true, 
numRows, delayRs, 0, 1,
-                    0, writeMutMetrics, conn, true);
+                    0, writeMutMetrics, conn, true, 1);
+        }
+    }
+
+    @Test public void testUpsertSelectWithRunOnServerAsTrue() throws 
SQLException {
+        String srcTableName = generateUniqueName();
+        String destTableName = generateUniqueName();
+        int numRows = 10;
+        Map<MetricType, Long> writeMutMetrics;
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndInsertValues(srcTableName, true, true,
+                    numRows, true, conn, false);
+        }
+        try (Connection conn = getConnFromTestDriver()) {
+            conn.setAutoCommit(true); // Set auto-commit to make upsert select 
run on server
+            createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+                    true, true, conn);
+            writeMutMetrics = 
PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+        }
+        assertNull(writeMutMetrics); // No commits were done from client to 
server so, no metrics recorded
+        for (PhoenixTableMetric metric: 
getPhoenixTableClientMetrics().get(destTableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 0, CompareOp.EQ);
+        }
+    }
+
+    @Test public void testUpsertSelectWithRunOnServerAsFalse() throws 
SQLException {
+        String srcTableName = generateUniqueName();
+        String destTableName = generateUniqueName();
+        int numRows = 10;
+        Map<MetricType, Long> writeMutMetrics;
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndInsertValues(srcTableName, true, true,
+                    numRows, true, conn, false);
+        }
+        try (Connection conn = getConnFromTestDriver()) {
+            createTableAndRunUpsertSelect(destTableName, srcTableName, true,
+                    true, true, conn);
+            writeMutMetrics = 
PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(destTableName);
+        }
+        // Rows were fetched to client from source table and committed to 
destination table on server
+        assertNotNull(writeMutMetrics);
+        for (PhoenixTableMetric metric: 
getPhoenixTableClientMetrics().get(destTableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                    writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, CompareOp.EQ);
+        }
+    }
+
+    @Test public void testUpsertWithOverriddenUpsertBatchSize() throws 
SQLException {
+        String tableName = generateUniqueName();
+        int numRows = 100;
+        Map<MetricType, Long> writeMutMetrics;
+        Properties props = new Properties();
+        props.put(UPSERT_BATCH_SIZE_ATTRIB, "5");
+        try (Connection conn = DriverManager.getConnection(url, props)) {
+            createTableAndInsertValues(tableName, true, true,
+                    numRows, true, conn, false);
+            writeMutMetrics = 
PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
+        }
+        assertNotNull(writeMutMetrics);
+        for (PhoenixTableMetric metric: 
getPhoenixTableClientMetrics().get(tableName)) {
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                    writeMutMetrics.get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            assertMetricValue(metric, MUTATION_BATCH_COUNTER, 20, 
CompareOp.EQ);
         }
     }
 
@@ -1015,7 +1089,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 1, 0, 
0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1046,7 +1120,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 
numRows, 0, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1083,7 +1157,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             assertNull(writeMutMetrics);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, false, 0, 0, 
0, 0, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1127,7 +1201,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             assertNull(writeMutMetrics);
             conn.close();
-            assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 
0, 0, 1, null, conn, false);
+            assertMutationTableMetrics(false, tableName, 0, 1, 0, false, 0, 0, 
0, 0, 1, null, conn, false, 0);
         }
     }
 
@@ -1166,7 +1240,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, injectDelay, 
true, 1, 0, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1212,7 +1286,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 
numRows, 0, numRows, 0, 1,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1237,6 +1311,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
 
         // Insert data into the table
         String insertData = "UPSERT INTO " + dataTable + " VALUES (?, ?)";
+        Map<String, Map<MetricType, Long>> writeMutMetrics;
         try (Connection conn = getConnFromTestDriver();
              PreparedStatement stmt = conn.prepareStatement(insertData)) {
             for (int i = 1; i <= 10; i++) {
@@ -1245,6 +1320,21 @@ public class PhoenixTableLevelMetricsIT extends BaseTest 
{
                 stmt.executeUpdate();
             }
             conn.commit();
+            writeMutMetrics = 
PhoenixRuntime.getWriteMetricInfoForMutationsSinceLastReset(conn);
+        }
+        for(PhoenixTableMetric metric: 
getPhoenixTableClientMetrics().get(dataTable)) {
+            if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER, 1, 
CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        
writeMutMetrics.get(dataTable).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            }
+        }
+        for(PhoenixTableMetric metric: 
getPhoenixTableClientMetrics().get(indexName)) {
+            if(metric.getMetricType().equals(MUTATION_BATCH_COUNTER)) {
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER, 2, 
CompareOp.EQ);
+                assertMetricValue(metric, MUTATION_BATCH_COUNTER,
+                        
writeMutMetrics.get(indexName).get(MUTATION_BATCH_COUNTER), CompareOp.EQ);
+            }
         }
 
         // Check if the index is being used
@@ -1316,7 +1406,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
                     
getWriteMetricInfoForMutationsSinceLastReset(conn).get(tableName);
             conn.close();
             assertMutationTableMetrics(false, tableName, 1, 0, 0, true, 
numRows, delayRs, 0, 1, 0,
-                    writeMutMetrics, conn, false);
+                    writeMutMetrics, conn, false, 1);
         }
     }
 
@@ -1358,7 +1448,7 @@ public class PhoenixTableLevelMetricsIT extends BaseTest {
             // 1 regular upsert + numAtomicUpserts
             // 2 mutations (regular and atomic on the same row in the same 
batch will be split)
             assertMutationTableMetrics(true, tableName, 1 + numAtomicUpserts, 
0, 0, true, 2, 0, 0, 2, 0,
-                writeMutMetrics, conn, false);
+                writeMutMetrics, conn, false, 2);
             assertEquals(numAtomicUpserts, 
getMetricFromTableMetrics(tableName, ATOMIC_UPSERT_SQL_COUNTER));
             assertTrue(getMetricFromTableMetrics(tableName, 
ATOMIC_UPSERT_COMMIT_TIME) > 0);
         }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
index 54f4c6c613..bf92444c62 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/monitoring/TableMetricsManagerTest.java
@@ -236,32 +236,32 @@ public class TableMetricsManagerTest {
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, 
true);
         MutationMetricQueue.MutationMetric metric = new 
MutationMetricQueue.MutationMetric(
                 0L, 5L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, 
true);
         metric = new MutationMetricQueue.MutationMetric(0L, 10L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, 
true);
         metric = new MutationMetricQueue.MutationMetric(0L, 50L, 0L, 0L, 0L,0L,
-                0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 5, 
true);
         metric = new MutationMetricQueue.MutationMetric(0L, 100L, 0L, 0L, 
0L,0L,
-                0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 6, 
true);
         metric = new MutationMetricQueue.MutationMetric(0L, 500L, 0L, 0L, 
0L,0L,
-                0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, 
true);
         metric = new MutationMetricQueue.MutationMetric(0L, 1000L, 0L, 0L, 
0L,0L,
-                0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L);
+                0L, 1L, 0L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), true);
 
 
@@ -300,32 +300,32 @@ public class TableMetricsManagerTest {
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 1, 
false);
         MutationMetricQueue.MutationMetric metric = new 
MutationMetricQueue.MutationMetric(
                 0L, 0L, 5L, 0L, 0L, 0L,
-                0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 5L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 2, 
false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 10L, 0L, 0L, 
0L,
-                0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 10L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 4, 
false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 50L, 0L, 0L, 
0L,
-                0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 50L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 
5,false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 100L, 0L, 0L, 
0L,
-                0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 100L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 
6,false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 500L, 0L, 0L, 
0L,
-                0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 500L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
         TableMetricsManager.updateLatencyHistogramForMutations(tableName, 8, 
false);
         metric = new MutationMetricQueue.MutationMetric(0L, 0L, 1000L, 0L, 0L, 
0L,
-                0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L);
+                0L, 0L, 1L, 1000L, 0L, 0L, 0L, 0L, 0L, 0L);
         TableMetricsManager.updateSizeHistogramMetricsForMutations(tableName, 
metric.getTotalMutationsSizeBytes().getValue(), false);
 
 


Reply via email to