This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abdcdb8e2 PHOENIX-7313 All cell versions should not be retained 
during flushes … (#1888)
0abdcdb8e2 is described below

commit 0abdcdb8e20fb8f5d0932ee47789753fe69801f9
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Thu May 16 15:29:49 2024 -0700

    PHOENIX-7313 All cell versions should not be retained during flushes … 
(#1888)
---
 .../coprocessor/BaseScannerRegionObserver.java     | 33 ++++++++---
 .../phoenix/coprocessor/CompactionScanner.java     | 18 ++++--
 .../phoenix/coprocessor/TTLRegionScanner.java      | 10 ++--
 .../UngroupedAggregateRegionObserver.java          |  9 ++-
 .../phoenix/end2end/MaxLookbackExtendedIT.java     | 69 ++++++++++++++++++++++
 .../org/apache/phoenix/end2end/TableTTLIT.java     | 62 +++++++++++++++++--
 .../java/org/apache/phoenix/util/TestUtil.java     | 12 ++--
 7 files changed, 185 insertions(+), 28 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index fa450625e3..d27a187dd1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -357,20 +357,33 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr, useQualiferAsListIndex);
     }
 
-    public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
+    public void setScanOptionsForFlushesAndCompactions(Store store, 
ScanOptions options,
+            boolean retainAllVersions) {
         // We want the store to give us all the deleted cells to 
StoreCompactionScanner
         options.setKeepDeletedCells(KeepDeletedCells.TTL);
         options.setTTL(HConstants.FOREVER);
-        options.setMaxVersions(Integer.MAX_VALUE);
-        options.setMinVersions(Integer.MAX_VALUE);
+        if (retainAllVersions) {
+            options.setMaxVersions(Integer.MAX_VALUE);
+            options.setMinVersions(Integer.MAX_VALUE);
+        } else {
+            options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
+                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+            options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
+                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
+        }
+
     }
+
     @Override
     public void 
preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
             ScanType scanType, ScanOptions options, CompactionLifeCycleTracker 
tracker,
             CompactionRequest request) throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
+                    || request.isMajor();
+            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
             return;
         }
         long maxLookbackAge = getMaxLookbackAge(c);
@@ -384,10 +397,14 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
     public void 
preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store 
store,
             ScanOptions options, FlushLifeCycleTracker tracker) throws 
IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
+
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
             return;
         }
+
         long maxLookbackAge = getMaxLookbackAge(c);
         if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
             
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, 
store,
@@ -401,7 +418,9 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
             throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
+                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
+            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
             return;
         }
         long maxLookbackAge = getMaxLookbackAge(c);
@@ -428,7 +447,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
 
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(options);
+            setScanOptionsForFlushesAndCompactions(store, options, true);
             return;
         }
         if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index 3bcc2cefa8..ebe92b8741 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -76,6 +76,8 @@ public class CompactionScanner implements InternalScanner {
     private final byte[] emptyCF;
     private final byte[] emptyCQ;
     private final byte[] storeColumnFamily;
+    private final String tableName;
+    private final String columnFamilyName;
     private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
     private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
     private HBaseLevelRowCompactor hBaseLevelRowCompactor;
@@ -94,19 +96,18 @@ public class CompactionScanner implements InternalScanner {
         this.emptyCQ = emptyCQ;
         this.config = env.getConfiguration();
         compactionTime = EnvironmentEdgeManager.currentTimeMillis();
-        this.maxLookbackInMillis = maxLookbackInMillis;
-        String columnFamilyName = store.getColumnFamilyName();
+        columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
-        String tableName = region.getRegionInfo().getTable().getNameAsString();
+        tableName = region.getRegionInfo().getTable().getNameAsString();
         Long overriddenMaxLookback =
                 maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
-        maxLookbackInMillis = overriddenMaxLookback == null ?
+        this.maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback);
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets 
the scan time range
         // for scn queries [0, scn). This means that the maxlookback size 
should be
         // maxLookbackInMillis + 1 so that the oldest scn does not return 
empty row
-        this.maxLookbackWindowStart = maxLookbackInMillis == 0 ?
-                compactionTime : compactionTime - (maxLookbackInMillis + 1);
+        this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
+                compactionTime : compactionTime - (this.maxLookbackInMillis + 
1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
         ttl = cfd.getTimeToLive();
         this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
@@ -121,6 +122,9 @@ public class CompactionScanner implements InternalScanner {
                         || localIndex;
         phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
         hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+        LOGGER.info("Starting Phoenix CompactionScanner for table " + 
tableName + " store "
+                + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
+                + maxLookbackInMillis + "ms");
     }
 
     /**
@@ -155,6 +159,8 @@ public class CompactionScanner implements InternalScanner {
 
     @Override
     public void close() throws IOException {
+        LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName 
+ " store "
+                + columnFamilyName);
         storeScanner.close();
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 261ef94fe8..aa1196130f 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -70,14 +70,14 @@ public class TTLRegionScanner extends BaseRegionScanner {
         long currentTime = scan.getTimeRange().getMax() == 
HConstants.LATEST_TIMESTAMP ?
                 EnvironmentEdgeManager.currentTimeMillis() : 
scan.getTimeRange().getMax();
         ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
-        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
-        ttl *= 1000;
         // Regardless if the Phoenix Table TTL feature is disabled cluster 
wide or the client is
         // an older client and does not supply the empty column parameters, 
the masking should not
-        // be done here.
-        isMaskingEnabled = emptyCF != null && emptyCQ != null &&
-                
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
+        // be done here. We also disable masking when TTL is 
HConstants.FOREVER.
+        isMaskingEnabled = emptyCF != null && emptyCQ != null && ttl != 
HConstants.FOREVER
+                && 
env.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
                         
QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
+        ttl *= 1000;
     }
 
     private void init() throws IOException {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index c07e1e25c8..4ae157aba1 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -597,7 +597,14 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                     InternalScanner internalScanner = scanner;
                     if (request.isMajor()) {
                         boolean isDisabled = false;
-                        final String fullTableName = 
tableName.getNameAsString();
+                        boolean isMultiTenantIndexTable = false;
+                        if 
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+                            isMultiTenantIndexTable = true;
+                        }
+                        final String fullTableName = isMultiTenantIndexTable ?
+                                
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+                                        MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+                                tableName.getNameAsString();
                         PTable table = null;
                         try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(
                                 
compactionConfig).unwrap(PhoenixConnection.class)) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index f95918159a..f9900fdb7b 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,17 +22,22 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.CompactionScanner;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.ManualEnvironmentEdge;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,7 +50,9 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.Date;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -60,6 +67,9 @@ import static 
org.apache.phoenix.util.TestUtil.assertRowExistsAtSCN;
 import static org.apache.phoenix.util.TestUtil.assertRowHasExpectedValueAtSCN;
 import static org.apache.phoenix.util.TestUtil.assertTableHasTtl;
 import static org.apache.phoenix.util.TestUtil.assertTableHasVersions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
@@ -316,6 +326,65 @@ public class MaxLookbackExtendedIT extends BaseTest {
         }
     }
 
+    @Test(timeout=60000L)
+    public void testViewIndexIsCompacted() throws Exception {
+        if(hasTableLevelMaxLookback) {
+            return;
+        }
+        String baseTable =  SchemaUtil.getTableName("SCHEMA1", 
generateUniqueName());
+        String globalViewName = generateUniqueName();
+        String fullGlobalViewName = SchemaUtil.getTableName("SCHEMA2", 
globalViewName);
+        String globalViewIdx =  generateUniqueName();
+        TableName dataTable = TableName.valueOf(baseTable);
+        TableName indexTable = TableName.valueOf("_IDX_" + baseTable);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("CREATE TABLE " + baseTable
+                    + " (TENANT_ID CHAR(15) NOT NULL, PK2 INTEGER NOT NULL, 
PK3 INTEGER NOT NULL, "
+                    + "COL1 VARCHAR, COL2 VARCHAR, COL3 CHAR(15) CONSTRAINT PK 
PRIMARY KEY"
+                    + "(TENANT_ID, PK2, PK3)) MULTI_TENANT=true");
+            conn.createStatement().execute("CREATE VIEW " + fullGlobalViewName
+                    + " AS SELECT * FROM " + baseTable);
+            conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " 
ON "
+                    + fullGlobalViewName + " (COL1) INCLUDE (COL2)");
+
+            conn.createStatement().executeUpdate("UPSERT INTO  " + 
fullGlobalViewName
+                    + " (TENANT_ID, PK2, PK3, COL1, COL2) VALUES 
('TenantId1',1, 2, 'a', 'b')");
+            conn.commit();
+
+            String query = "SELECT COL2 FROM " + fullGlobalViewName + " WHERE  
COL1 = 'a'";
+            // Verify that query uses the global view index
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            PTable table = 
((PhoenixResultSet)rs).getContext().getCurrentTable().getTable();
+            assertTrue(table.getSchemaName().getString().equals("SCHEMA2") &&
+                    table.getTableName().getString().equals(globalViewIdx));
+            assertTrue(rs.next());
+            assertEquals("b", rs.getString(1));
+            assertFalse(rs.next());
+            // Force a flush
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, dataTable, 1);
+            assertRawRowCount(conn, indexTable, 1);
+            // Delete the row from both tables
+            conn.createStatement().execute("DELETE FROM " + fullGlobalViewName
+                            + " WHERE TENANT_ID = 'TenantId1'");
+            conn.commit();
+            // Force a flush
+            flush(dataTable);
+            flush(indexTable);
+            assertRawRowCount(conn, dataTable, 1);
+            assertRawRowCount(conn, indexTable, 1);
+            // Move change beyond the max lookback window
+            injectEdge.setValue(System.currentTimeMillis() + MAX_LOOKBACK_AGE 
* 1000 + 1);
+            EnvironmentEdgeManager.injectEdge(injectEdge);
+            // Major compact both tables
+            majorCompact(dataTable);
+            majorCompact(indexTable);
+            // Everything should have been purged by major compaction
+            assertRawRowCount(conn, dataTable, 0);
+            assertRawRowCount(conn, indexTable, 0);
+        }
+    }
     @Test(timeout=60000L)
     public void testTTLAndMaxLookbackAge() throws Exception {
         if(hasTableLevelMaxLookback) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 852cf8f01f..3b6d1277e8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -131,8 +132,8 @@ public class TableTTLIT extends BaseTest {
                     { true, false, KeepDeletedCells.FALSE, 5, 50, null},
                     { true, false, KeepDeletedCells.TRUE, 1, 25, null},
                     { true, false, KeepDeletedCells.TTL, 5, 100, null},
-                    { false, false, KeepDeletedCells.FALSE, 1, 100, 15},
-                    { false, false, KeepDeletedCells.TRUE, 5, 50, 15},
+                    { false, false, KeepDeletedCells.FALSE, 1, 100, 0},
+                    { false, false, KeepDeletedCells.TRUE, 5, 50, 0},
                     { false, false, KeepDeletedCells.TTL, 1, 25, 15}});
     }
 
@@ -155,7 +156,7 @@ public class TableTTLIT extends BaseTest {
     @Test
     public void testMaskingAndCompaction() throws Exception {
         final int maxLookbackAge = tableLevelMaxLooback != null ? 
tableLevelMaxLooback : MAX_LOOKBACK_AGE;
-        final int maxDeleteCounter = maxLookbackAge;
+        final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
         final int maxCompactionCounter = ttl / 2;
         final int maxMaskingCounter = 2 * ttl;
         final byte[] rowKey = Bytes.toBytes("a");
@@ -232,10 +233,51 @@ public class TableTTLIT extends BaseTest {
     }
 
     @Test
-    public void testRowSpansMultipleTTLWindows() throws Exception {
-        if (tableLevelMaxLooback != null) {
+    public void 
testFlushesAndMinorCompactionShouldNotRetainCellsWhenMaxLookbackIsDisabled()
+            throws Exception {
+        final int maxLookbackAge = tableLevelMaxLooback != null
+                ? tableLevelMaxLooback : MAX_LOOKBACK_AGE;
+        if (maxLookbackAge > 0) {
             return;
         }
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String tableName = generateUniqueName();
+            createTable(tableName);
+            conn.createStatement().execute("Alter Table " + tableName + " set 
\"phoenix.max.lookback.age.seconds\" = 0");
+            conn.commit();
+            final int flushCount = 10;
+            byte[] row = Bytes.toBytes("a");
+            for (int i = 0; i < flushCount; i++) {
+                // Generate more row versions than the maximum cell versions 
for the table
+                int updateCount = RAND.nextInt(10) + versions;
+                for (int j = 0; j < updateCount; j++) {
+                    updateRow(conn, tableName, "a");
+                }
+                flush(TableName.valueOf(tableName));
+                // At every flush, extra cell versions should be removed.
+                // MAX_COLUMN_INDEX table columns and one empty column will be 
retained for
+                // each row version.
+                TestUtil.assertRawCellCount(conn, 
TableName.valueOf(tableName), row,
+                        (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+            }
+            // Run one minor compaction (in case no minor compaction has 
happened yet)
+            Admin admin = utility.getAdmin();
+            admin.compact(TableName.valueOf(tableName));
+            int waitCount = 0;
+            while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
+                    Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) 
* versions) {
+                // Wait for major compactions to happen
+                Thread.sleep(1000);
+                waitCount++;
+                if (waitCount > 30) {
+                    Assert.fail();
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testRowSpansMultipleTTLWindows() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
             createTable(tableName);
@@ -303,6 +345,16 @@ public class TableTTLIT extends BaseTest {
         conn.commit();
     }
 
+    private void updateRow(Connection conn, String tableName, String id)
+            throws SQLException {
+
+        for (int i = 1; i <= MAX_COLUMN_INDEX; i++) {
+            String value = Integer.toString(RAND.nextInt(1000));
+            updateColumn(conn, tableName, id, i, value);
+        }
+        conn.commit();
+    }
+
     private void compareRow(Connection conn, String tableName1, String 
tableName2, String id,
             int maxColumnIndex) throws SQLException, IOException {
         StringBuilder queryBuilder = new StringBuilder("SELECT ");
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 79b0168f77..c15bd407c9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1364,13 +1364,17 @@ public class TestUtil {
         assertEquals(expectedRowCount, count);
     }
 
-    public static void assertRawCellCount(Connection conn, TableName tableName,
-                                          byte[] row, int expectedCellCount)
-        throws SQLException, IOException {
+    public static int getRawCellCount(Connection conn, TableName tableName, 
byte[] row)
+            throws SQLException, IOException {
         ConnectionQueryServices cqs = 
conn.unwrap(PhoenixConnection.class).getQueryServices();
         Table table = cqs.getTable(tableName.getName());
         CellCount cellCount = getCellCount(table, true);
-        int count = cellCount.getCellCount(Bytes.toString(row));
+        return cellCount.getCellCount(Bytes.toString(row));
+    }
+    public static void assertRawCellCount(Connection conn, TableName tableName,
+                                          byte[] row, int expectedCellCount)
+        throws SQLException, IOException {
+        int count = getRawCellCount(conn, tableName, row);
         assertEquals(expectedCellCount, count);
     }
 

Reply via email to