This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new d926df818b PHOENIX-7543 Wrong result returned when query is served by index and some columns are null (#2097) d926df818b is described below commit d926df818b086f187fa2bb5ec678483e447338f2 Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Fri Mar 28 10:27:37 2025 -0700 PHOENIX-7543 Wrong result returned when query is served by index and some columns are null (#2097) Add DeleteColumn cells to index rows when a column is set to null in an upsert statement. --- .../org/apache/phoenix/index/IndexMaintainer.java | 46 +++++ .../java/org/apache/phoenix/util/IndexUtil.java | 35 ++++ .../coprocessor/GlobalIndexRegionScanner.java | 59 +++--- .../phoenix/hbase/index/IndexRegionObserver.java | 47 +++-- .../apache/phoenix/index/GlobalIndexChecker.java | 41 ----- .../org/apache/phoenix/end2end/IndexToolIT.java | 12 ++ .../end2end/index/GlobalIndexCheckerIT.java | 198 ++++++++++++++++++++- .../apache/phoenix/index/IndexMaintainerTest.java | 141 ++++++++++++--- 8 files changed, 462 insertions(+), 117 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 5dcc18ecdb..2073d8192e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -71,6 +72,7 @@ import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; +import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -1345,6 +1347,50 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return put; } + /** + * For mutable covered indexes, an index update is a full update. However, if some included + * columns are set to null in the upsert statement we need to write a DeleteColumn cell + * to such columns. + * @param indexUpdate Put mutation updating index which includes at a minimum the empty column + * @param ts The update timestamp + * @return Delete mutation with DeleteColumn cells for all covered columns that are missing + * in the indexUpdate Put mutation + * @throws IOException + */ + public Delete buildDeleteColumnMutation(Put indexUpdate, long ts) throws IOException { + if (getIndexStorageScheme() == ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + // for single cell storage format no need to build a delete column mutation + return null; + } + if (coveredColumnsMap == null || coveredColumnsMap.isEmpty()) { + // no covered columns in the index no need to build a delete mutation + return null; + } + int colsSet = indexUpdate.getFamilyCellMap() + .values().stream().mapToInt(elem -> elem.size()).sum(); + if (coveredColumnsMap.size() + 1 == colsSet) { // add 1 for the empty column + // Index row update is always a full update except when some columns are explicitly + // set to null. Do a quick size check to determine if some covered columns are being + // set to null or not. + return null; + } + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(indexUpdate.getRow()); + Delete delete = new Delete(rowKey.get()); + for (Entry<ColumnReference, ColumnReference> coveredCol : coveredColumnsMap.entrySet()) { + ColumnReference indexCol = coveredCol.getValue(); + if (!indexUpdate.has(indexCol.getFamily(), indexCol.getQualifier())) { + KeyValue kv = GenericKeyValueBuilder.INSTANCE.buildDeleteColumns( + rowKey, + indexCol.getFamilyWritable(), + indexCol.getQualifierWritable(), + ts); + delete.add(kv); + } + } + assert !delete.isEmpty(); + return delete; + } + public enum DeleteType {SINGLE_VERSION, ALL_VERSIONS}; private DeleteType getDeleteTypeOrNull(Collection<? extends Cell> pendingUpdates) { return getDeleteTypeOrNull(pendingUpdates, this.nDataCFs); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java index 5783fcd739..8c996d0fb6 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.util; +import static org.apache.hadoop.hbase.Cell.Type.DeleteColumn; +import static org.apache.hadoop.hbase.Cell.Type.DeleteFamily; import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.LOCAL_INDEX_BUILD; import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.LOCAL_INDEX_BUILD_PROTO; import static org.apache.phoenix.coprocessorclient.MetaDataProtocol.PHOENIX_MAJOR_VERSION; @@ -49,6 +51,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.index.PhoenixIndexCodec; @@ -900,6 +903,38 @@ public class IndexUtil { return columns; } + public static boolean isDeleteFamily(Mutation mutation) { + if (!(mutation instanceof Delete)) { + return false; + } + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (cell.getType() == DeleteFamily) { + return true; + } + } + } + return false; + } + + public static boolean isDeleteColumn(Mutation mutation) { + if (!(mutation instanceof Delete)) { + return false; + } + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (cell.getType() == DeleteColumn) { + return true; + } + } + } + return false; + } + + public static boolean isDeleteFamilyOrDeleteColumn(Mutation mutation) { + return isDeleteFamily(mutation) || isDeleteColumn(mutation); + } + public static class SimpleValueGetter implements ValueGetter { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final Put put; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 5706916cae..39bdf63d83 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -89,6 +89,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static org.apache.hadoop.hbase.Cell.Type.DeleteColumn; +import static org.apache.hadoop.hbase.Cell.Type.DeleteFamily; import static org.apache.phoenix.hbase.index.write.AbstractParallelWriterIndexCommitter.INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_INVALID; import static org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository.IndexVerificationErrorType.BEYOND_MAX_LOOKBACK_MISSING; @@ -589,17 +591,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { } }; - private boolean isDeleteFamily(Mutation mutation) { - for (List<Cell> cells : mutation.getFamilyCellMap().values()) { - for (Cell cell : cells) { - if (cell.getType() == Cell.Type.DeleteFamily) { - return true; - } - } - } - return false; - } - private void updateUnverifiedIndexRowCounters(Put actual, long expectedTs, List<Mutation> indexRowsToBeDeleted, IndexToolVerificationResult.PhaseResult verificationPhaseResult) { // Get the empty column of the given index row @@ -714,7 +705,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { while (iterator.hasNext()) { Mutation mutation = iterator.next(); if ((mutation instanceof Put && !isVerified((Put) mutation)) || - (mutation instanceof Delete && !isDeleteFamily(mutation))) { + (mutation instanceof Delete + && !IndexUtil.isDeleteFamilyOrDeleteColumn(mutation))) { iterator.remove(); } else { if (((previous instanceof Put && mutation instanceof Put) || @@ -763,6 +755,11 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { * index rebuilds, the delete family markers are used to delete index rows due to data table row deletes or * data table row overwrites. * + * Delete Column Markers + * Delete column markers are generated during read repair, regular table updates and + * index rebuilds. The delete column markers are used for any included column in the index + * which is set to null. + * * Verification Algorithm * * IndexTool verification generates an expected list of index mutations from the data table rows and uses this list @@ -773,7 +770,9 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { * * Every mutation will include a set of cells with the same timestamp * Every mutation has a different timestamp - * A delete mutation will include only delete family cells and it is for deleting the entire row and its versions + * A delete mutation can either include delete family cells and it is for deleting the entire + * row and its versions or delete column cells. The delete column cells are added for those + * included columns in the index which are set to null. * Every put mutation is verified * * For both verification types, after the expected list of index mutations is constructed for a given data table, @@ -783,7 +782,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { * As in the construction for the expected list, the cells are grouped into a put and a delete set. The put and * delete sets for a given row are further grouped based on their timestamps into put and delete mutations such that * all the cells in a mutation have the timestamps. The put and delete mutations are then sorted within a single - * list. Mutations in this list are sorted in ascending order of their timestamp. This list is the actual list. + * list. Mutations in this list are sorted in descending order of their timestamp. + * This list is the actual list. * * For the without-repair verification, unverified mutations and family version delete markers are removed from * the actual list and then the list is compared with the expected list. @@ -848,7 +848,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { // Between an expected delete and put, there can be one or more deletes due to // concurrent mutations or data table write failures. Skip all of them if any // There cannot be any actual delete mutation between two expected put mutations. - while (getTimestamp(actual) >= getTimestamp(expected) && actual instanceof Delete) { + while (getTimestamp(actual) >= getTimestamp(expected) + && IndexUtil.isDeleteFamily(actual)) { actualIndex++; if (actualIndex == actualSize) { break; @@ -868,9 +869,13 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { continue; } } else { // expected instanceof Delete - // Between put and delete, delete and delete, or before the first delete, there can be other deletes. - // Skip all of them if any - while (getTimestamp(actual) > getTimestamp(expected) && actual instanceof Delete) { + // Between put and delete, delete and delete, or before the first delete, there can + // be other deletes. Skip all of them if any. This can happen when there are + // unverified index rows on a deleted row and read-repair will put a DeleteFamily + // marker. Those delete family markers will be visible until compaction runs on the + // index table. + while (getTimestamp(actual) > getTimestamp(expected) + && IndexUtil.isDeleteFamily(actual)) { actualIndex++; if (actualIndex == actualSize) { break; @@ -880,8 +885,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { if (actualIndex == actualSize) { break; } - if (getTimestamp(actual) == getTimestamp(expected) && - (actual instanceof Delete && isDeleteFamily(actual))) { + if (isMatchingMutation(expected, actual)) { expectedIndex++; actualIndex++; continue; @@ -1155,7 +1159,8 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { }; public static List<Mutation> getMutationsWithSameTS(Put put, Delete del) { - // Reorder the mutations on the same row so that delete comes before put when they have the same timestamp + // Reorder the mutations on the same row so that put comes before delete when they + // have the same timestamp return getMutationsWithSameTS(put, del, MUTATION_TS_COMPARATOR); } @@ -1253,7 +1258,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { * uncovered partial indexes. * pendingMutations is a sorted list of data table mutations that are used to replay index * table mutations. This list is sorted in ascending order by the tuple of row key, timestamp - * and mutation type where delete comes after put. + * and mutation type where put comes before delete. */ public static List<Mutation> prepareIndexMutationsForRebuild(IndexMaintainer indexMaintainer, Put dataPut, Delete dataDel) throws IOException { @@ -1319,6 +1324,10 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { Put indexPut = prepareIndexPutForRebuild(indexMaintainer, rowKeyPtr, nextDataRowVG, ts); indexMutations.add(indexPut); + Delete deleteColumn = indexMaintainer.buildDeleteColumnMutation(indexPut, ts); + if (deleteColumn != null) { + indexMutations.add(deleteColumn); + } // Delete the current index row if the new index key is different than the current one if (indexRowKeyForCurrentDataRow != null) { if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { @@ -1369,6 +1378,10 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { Put indexPut = prepareIndexPutForRebuild(indexMaintainer, rowKeyPtr, nextDataRowVG, ts); indexMutations.add(indexPut); + Delete deleteColumn = indexMaintainer.buildDeleteColumnMutation(indexPut, ts); + if (deleteColumn != null) { + indexMutations.add(deleteColumn); + } // Delete the current index row if the new index key is different than the current one if (indexRowKeyForCurrentDataRow != null) { if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { @@ -1406,7 +1419,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { List<Mutation> mutationList = indexMutationMap.get(indexRowKey); if (mutationList == null) { if (!mostRecentDone) { - if (mutation instanceof Put) { + if (mutation instanceof Put || IndexUtil.isDeleteColumn(mutation)) { mostRecentIndexRowKeys.add(indexRowKey); mostRecentDone = true; } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 48534d8354..d4e965bbe9 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -203,21 +203,26 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return lastContext; } } - - private static boolean ignoreIndexRebuildForTesting = false; - private static boolean failPreIndexUpdatesForTesting = false; - private static boolean failPostIndexUpdatesForTesting = false; - private static boolean failDataTableUpdatesForTesting = false; - - public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; } - - public static void setFailPreIndexUpdatesForTesting(boolean fail) { failPreIndexUpdatesForTesting = fail; } - - public static void setFailPostIndexUpdatesForTesting(boolean fail) { failPostIndexUpdatesForTesting = fail; } - - public static void setFailDataTableUpdatesForTesting(boolean fail) { - failDataTableUpdatesForTesting = fail; - } + private static boolean ignoreIndexRebuildForTesting = false; + private static boolean failPreIndexUpdatesForTesting = false; + private static boolean failPostIndexUpdatesForTesting = false; + private static boolean failDataTableUpdatesForTesting = false; + private static boolean ignoreWritingDeleteColumnsToIndex = false; + public static void setIgnoreIndexRebuildForTesting(boolean ignore) { + ignoreIndexRebuildForTesting = ignore; + } + public static void setFailPreIndexUpdatesForTesting(boolean fail) { + failPreIndexUpdatesForTesting = fail; + } + public static void setFailPostIndexUpdatesForTesting(boolean fail) { + failPostIndexUpdatesForTesting = fail; + } + public static void setFailDataTableUpdatesForTesting(boolean fail) { + failDataTableUpdatesForTesting = fail; + } + public static void setIgnoreWritingDeleteColumnsToIndex(boolean ignore) { + ignoreWritingDeleteColumnsToIndex = ignore; + } public enum BatchMutatePhase { PRE, POST, FAILED @@ -994,6 +999,14 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { QueryConstants.UNVERIFIED_BYTES); context.indexUpdates.put(hTableInterfaceReference, new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get())); + if (!ignoreWritingDeleteColumnsToIndex) { + Delete deleteColumn = + indexMaintainer.buildDeleteColumnMutation(indexPut, ts); + if (deleteColumn != null) { + context.indexUpdates.put(hTableInterfaceReference, + new Pair<Mutation, byte[]>(deleteColumn, rowKeyPtr.get())); + } + } // Delete the current index row if the new index key is different than the current one if (currentDataRowState != null) { ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState); @@ -1056,7 +1069,9 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (m instanceof Put) { // This will be done before the data table row is updated (i.e., in the first write phase) context.preIndexUpdates.put(hTableInterfaceReference, m); - } else { + } else if (IndexUtil.isDeleteFamily(m)) { + // DeleteColumn is always accompanied by a Put so no need to make the index + // row unverified again. Only do this for DeleteFamily // Set the status of the index row to "unverified" Put unverifiedPut = new Put(m.getRow()); unverifiedPut.addColumn( diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java index 62b8677103..c2d97dbb97 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java @@ -513,51 +513,10 @@ public class GlobalIndexChecker extends BaseScannerRegionObserver implements Reg emptyCQ, 0, emptyCQ.length) == 0; } - /** - * An index row is composed of cells with the same timestamp. However, if there are multiple versions of an - * index row, HBase can return an index row with cells from multiple versions, and thus it can return cells - * with different timestamps. This happens if the version of the row we are reading does not have a value - * (i.e., effectively has null value) for a column whereas an older version has a value for the column. - * In this case, we need to remove the older cells for correctness. - */ - private void removeOlderCells(List<Cell> cellList) { - Iterator<Cell> cellIterator = cellList.iterator(); - if (!cellIterator.hasNext()) { - return; - } - Cell cell = cellIterator.next(); - long maxTs = cell.getTimestamp(); - long ts; - boolean allTheSame = true; - while (cellIterator.hasNext()) { - cell = cellIterator.next(); - ts = cell.getTimestamp(); - if (ts != maxTs) { - if (ts > maxTs) { - maxTs = ts; - } - allTheSame = false; - } - } - if (allTheSame) { - return; - } - cellIterator = cellList.iterator(); - while (cellIterator.hasNext()) { - cell = cellIterator.next(); - if (cell.getTimestamp() != maxTs) { - cellIterator.remove(); - } - } - } - private boolean verifyRowAndRemoveEmptyColumn(List<Cell> cellList) throws IOException { if (indexMaintainer.isUncovered()) { return true; } - if (!indexMaintainer.isImmutableRows()) { - removeOlderCells(cellList); - } long cellListSize = cellList.size(); Cell cell = null; if (cellListSize == 0) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 3291457164..7214ac6991 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.CounterGroup; import org.apache.hadoop.mapreduce.Job; import org.apache.phoenix.compile.ExplainPlan; @@ -838,6 +839,17 @@ public class IndexToolIT extends BaseTest { return indexTool.getJob().getCounters().getGroup(PhoenixIndexToolJobCounters.class.getName()); } + public static void dumpMRJobCounters(IndexTool indexTool) throws IOException { + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(indexTool); + dumpMRJobCounters(mrJobCounters); + } + + public static void dumpMRJobCounters(CounterGroup mrJobCounters) { + for (Counter cntr : mrJobCounters) { + LOGGER.info(String.format("%s=%d", cntr.getName(), cntr.getValue())); + } + } + private static List<String> getArgList (boolean useSnapshot, String schemaName, String dataTable, String indxTable, String tenantId, IndexTool.IndexVerifyType verifyType, Long startTime, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 878c25cd91..98359795db 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -32,11 +32,13 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCA import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; @@ -47,6 +49,8 @@ import java.util.Map; import java.util.Properties; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.phoenix.jdbc.PhoenixResultSet; import org.apache.phoenix.thirdparty.com.google.common.base.Strings; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.end2end.IndexToolIT; @@ -657,18 +661,36 @@ public class GlobalIndexCheckerIT extends BaseTest { String indexTableName = generateUniqueName(); conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + dataTableName + " (val1) include (val2, val3)" + this.indexDDLOptions); - conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab', 'abcc')"); + // For immutable tables updating columns to null is ignored + conn.createStatement().execute( + "upsert into " + dataTableName + " (id, val1, val2, val3) " + + "values ('a', 'ab', 'abcc', null)"); conn.commit(); String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'"; // Verify that we will read from the index table assertExplainPlan(conn, selectSql, dataTableName, indexTableName); - ResultSet rs = conn.createStatement().executeQuery(selectSql); - assertTrue(rs.next()); - assertEquals("a", rs.getString(1)); - assertEquals("ab", rs.getString(2)); - assertEquals("abcc", rs.getString(3)); - assertEquals("abcd", rs.getString(4)); - assertFalse(rs.next()); + try (ResultSet rs = conn.createStatement().executeQuery(selectSql)) { + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("ab", rs.getString(2)); + assertEquals("abcc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); + assertFalse(rs.next()); + } + + // now read the same row from data table + selectSql = "SELECT * from " + dataTableName + " WHERE id = 'a'"; + try (ResultSet rs = conn.createStatement().executeQuery(selectSql)){ + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(dataTableName)); + assertTrue(rs.next()); + assertEquals("a", rs.getString(1)); + assertEquals("ab", rs.getString(2)); + assertEquals("abcc", rs.getString(3)); + assertEquals("abcd", rs.getString(4)); + assertFalse(rs.next()); + } } } @@ -1223,6 +1245,166 @@ public class GlobalIndexCheckerIT extends BaseTest { } } + @Test + public void testIndexRowWithNullIncludedColumnAndFilter() throws Exception { + if (async) { + // No need to run the same test twice one for async = true and the other for async = false + return; + } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + populateTable(dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " on " + + dataTableName + " (val1) include (val2, val3)" + indexDDLOptions); + conn.commit(); + // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc', null) + conn.createStatement().execute( + "upsert into " + dataTableName + " (id, val3) values ('a', null)"); + conn.commit(); + + String dql = String.format( + "select id, val2 from %s where val1='ab' and val3='abcd'", dataTableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertFalse(rs.next()); + } + + dql = String.format( + "select id, val2 from %s where val1='ab' and val3 is null", dataTableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertTrue(rs.next()); + assertEquals("abc", rs.getString("val2")); + } + + // update row ('a', 'ab', 'abc', null) -> ('a', 'ac', null, null) + conn.createStatement().execute( + "upsert into " + dataTableName + " values ('a', 'ac', null, null)"); + conn.commit(); + + dql = String.format( + "select id, val2 from %s where val1='ac' and val3 is null", dataTableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertTrue(rs.next()); + assertNull(rs.getString("val2")); + } + TestUtil.dumpTable(conn, TableName.valueOf(dataTableName)); + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + } + } + + @Test + public void testIndexToolWithNullIncludedColumn() throws Exception { + if (async) { + // No need to run the same test twice one for async = true and the other for async = false + return; + } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + populateTable(dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " on " + + dataTableName + " (val1) include (val2, val3)" + indexDDLOptions); + conn.commit(); + IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(true); + // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc', null) + conn.createStatement().execute( + "upsert into " + dataTableName + " (id, val3) values ('a', null)"); + conn.commit(); + // insert a new partial row + conn.createStatement().execute("upsert into " + dataTableName + + " (id, val1, val2) values ('c', 'cd', 'cde')"); + conn.commit(); + IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(false); + IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, indexName, null, + 0, IndexTool.IndexVerifyType.BEFORE); + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it); + IndexToolIT.dumpMRJobCounters(mrJobCounters); + try { + // single cell index doesn't have an issue with null values + assertEquals(encoded ? 0: 2, mrJobCounters.findCounter( + BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(encoded ? 0 : 2, mrJobCounters.findCounter( + REBUILT_INDEX_ROW_COUNT.name()).getValue()); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(dataTableName)); + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + throw e; + } + } + } + + @Test + public void testIndexToolWithMultipleDeleteFamilyMarkers() throws Exception { + if (async) { + // No need to run the same test twice one for async = true and the other for async = false + return; + } + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexName = generateUniqueName(); + // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + populateTable(dataTableName); + conn.createStatement().execute("CREATE INDEX " + indexName + " on " + + dataTableName + " (val1) include (val2, val3)" + indexDDLOptions); + conn.commit(); + String delete = String.format("DELETE FROM %s where id = 'a'", dataTableName); + conn.createStatement().execute(delete); + conn.commit(); + // skip phase2, inserts an unverified row in index + IndexRegionObserver.setFailDataTableUpdatesForTesting(true); + String dml = "upsert into " + dataTableName + + " (id, val1, val3) values ('a', 'ab', ?)"; + try (PreparedStatement ps = conn.prepareStatement(dml)) { + for (int i = 0; i < 5; ++i) { + ps.setString(1, "val3_ " + i); + ps.executeUpdate(); + commitWithException(conn); + // trigger a read repair of the unverified row + // since the data table row has been deleted the read repair will insert a + // delete family marker on the unverified index row + String dql = String.format( + "select id, val2, val3 from %s where val1='ab'", dataTableName); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); + String explainPlan = QueryUtil.getExplainPlan(prs.getUnderlyingIterator()); + assertTrue(explainPlan.contains(indexName)); + assertFalse(rs.next()); + } + } + } + IndexRegionObserver.setFailDataTableUpdatesForTesting(false); + // update row ('a', 'ab', 'abc', 'abcd') -> ('a', 'ab', 'abc', null) + conn.createStatement().execute( + "upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab', null)"); + conn.commit(); + IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, indexName, null, + 0, IndexTool.IndexVerifyType.ONLY); + CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it); + IndexToolIT.dumpMRJobCounters(mrJobCounters); + try { + assertEquals(2, mrJobCounters.findCounter( + BEFORE_REBUILD_VALID_INDEX_ROW_COUNT.name()).getValue()); + assertEquals(0, mrJobCounters.findCounter( + REBUILT_INDEX_ROW_COUNT.name()).getValue()); + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(dataTableName)); + TestUtil.dumpTable(conn, TableName.valueOf(indexName)); + throw e; + } + } + } + @Test public void testViewIndexRowUpdate() throws Exception { if (async) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 1a6d1dada4..b691f8ce0e 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -20,6 +20,8 @@ package org.apache.phoenix.index; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.sql.Connection; @@ -37,6 +39,7 @@ import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -50,8 +53,11 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; @@ -63,7 +69,7 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; public class IndexMaintainerTest extends BaseConnectionlessQueryTest { private static final String DEFAULT_SCHEMA_NAME = ""; private static final String DEFAULT_TABLE_NAME = "rkTest"; - + private void testIndexRowKeyBuilding(String dataColumns, String pk, String indexColumns, Object[] values) throws Exception { testIndexRowKeyBuilding(DEFAULT_SCHEMA_NAME, DEFAULT_TABLE_NAME, dataColumns, pk, indexColumns, values, "", "", ""); } @@ -88,10 +94,10 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { public byte[] getRowKey() { return row; } - + }; } - + private void testIndexRowKeyBuilding(String schemaName, String tableName, String dataColumns, String pk, String indexColumns, Object[] values, String includeColumns, String dataProps, String indexProps) throws Exception { KeyValueBuilder builder = GenericKeyValueBuilder.INSTANCE; testIndexRowKeyBuilding(schemaName, tableName, dataColumns, pk, indexColumns, values, includeColumns, dataProps, indexProps, builder); @@ -114,7 +120,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { List<IndexMaintainer> c1 = IndexMaintainer.deserialize(ptr, builder, true); assertEquals(1,c1.size()); IndexMaintainer im1 = c1.get(0); - + StringBuilder buf = new StringBuilder("UPSERT INTO " + fullTableName + " VALUES("); for (int i = 0; i < values.length; i++) { buf.append("?,"); @@ -136,7 +142,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { dataMutation.add(kv); } ValueGetter valueGetter = newValueGetter(row, valueMap); - + List<Mutation> indexMutations = IndexTestUtil.generateIndexData(index, table, dataMutation, ptr, builder); assertEquals(1,indexMutations.size()); assertTrue(indexMutations.get(0) instanceof Put); @@ -165,19 +171,19 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { public void testRowKeyVarOnlyIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL", "k1,k2", "k2, k1", new Object [] {"a",1.1}); } - + @Test public void testVarFixedndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1,k2", "k2, k1", new Object [] {"a",1.1}); } - - + + @Test public void testCompositeRowKeyVarFixedIndex() throws Exception { // TODO: using 1.1 for INTEGER didn't give error testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1,k2", "k2, k1", new Object [] {"a",1}); } - + @Test public void testCompositeRowKeyVarFixedAtEndIndex() throws Exception { // Forces trailing zero in index key for fixed length @@ -185,32 +191,32 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, k3 VARCHAR, v VARCHAR", "k1,k2,k3", "k1, k3, k2", new Object [] {"a",i, "b"}); } } - + @Test public void testSingleKeyValueIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER, v VARCHAR", "k1", "v", new Object [] {"a",1,"b"}); } - + @Test public void testMultiKeyValueIndex() throws Exception { testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 DECIMAL, v2 CHAR(2), v3 BIGINT", "k1, k2", "v2, k2, v1", new Object [] {"a",1,2.2,"bb"}); } - + @Test public void testMultiKeyValueCoveredIndex() throws Exception { testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 DECIMAL, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v2, k2, v1", new Object [] {"a",1,2.2,"bb"}, "v3, v4"); } - + @Test public void testSingleKeyValueDescIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER, v VARCHAR", "k1", "v DESC", new Object [] {"a",1,"b"}); } - + @Test public void testCompositeRowKeyVarFixedDescIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1,k2", "k2 DESC, k1", new Object [] {"a",1}); } - + @Test public void testCompositeRowKeyTimeIndex() throws Exception { long timeInMillis = System.currentTimeMillis(); @@ -219,7 +225,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { ts.setNanos((int) (timeInNanos % 1000000000)); testIndexRowKeyBuilding("ts1 DATE NOT NULL, ts2 TIME NOT NULL, ts3 TIMESTAMP NOT NULL", "ts1,ts2,ts3", "ts2, ts1", new Object [] {new Date(timeInMillis), new Time(timeInMillis), ts}); } - + @Test public void testCompositeRowKeyBytesIndex() throws Exception { long timeInMillis = System.currentTimeMillis(); @@ -228,79 +234,79 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { ts.setNanos((int) (timeInNanos % 1000000000)); testIndexRowKeyBuilding("b1 BINARY(3) NOT NULL, v VARCHAR", "b1,v", "v, b1", new Object [] {new byte[] {41,42,43}, "foo"}); } - + @Test public void testCompositeDescRowKeyVarFixedDescIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1}); } - + @Test public void testCompositeDescRowKeyVarDescIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL NOT NULL, v VARCHAR", "k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1.1,"b"}); } - + @Test public void testCompositeDescRowKeyVarAscIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 DECIMAL NOT NULL, v VARCHAR", "k1, k2 DESC", "k2, k1", new Object [] {"a",1.1,"b"}); } - + @Test public void testCompositeDescRowKeyVarFixedDescSaltedIndex() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1}, "", "", "SALT_BUCKETS=4"); } - + @Test public void testCompositeDescRowKeyVarFixedDescSaltedIndexSaltedTable() throws Exception { testIndexRowKeyBuilding("k1 VARCHAR, k2 INTEGER NOT NULL, v VARCHAR", "k1, k2 DESC", "k2 DESC, k1", new Object [] {"a",1}, "", "SALT_BUCKETS=3", "SALT_BUCKETS=3"); } - + @Test public void testMultiKeyValueCoveredSaltedIndex() throws Exception { testIndexRowKeyBuilding("k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 DECIMAL, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v2 DESC, k2 DESC, v1", new Object [] {"a",1,2.2,"bb"}, "v3, v4", "", "SALT_BUCKETS=4"); } - + @Test public void tesIndexWithBigInt() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BIGINT, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1 DESC, k2 DESC", new Object[] { "a", 1, 2.2, "bb" }); } - + @Test public void tesIndexWithAscBoolean() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1, k2 DESC", new Object[] { "a", 1, true, "bb" }); } - + @Test public void tesIndexWithAscNullBoolean() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1, k2 DESC", new Object[] { "a", 1, null, "bb" }); } - + @Test public void tesIndexWithAscFalseBoolean() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1, k2 DESC", new Object[] { "a", 1, false, "bb" }); } - + @Test public void tesIndexWithDescBoolean() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1 DESC, k2 DESC", new Object[] { "a", 1, true, "bb" }); } - + @Test public void tesIndexWithDescFalseBoolean() throws Exception { testIndexRowKeyBuilding( "k1 CHAR(1) NOT NULL, k2 INTEGER NOT NULL, v1 BOOLEAN, v2 CHAR(2), v3 BIGINT, v4 CHAR(10)", "k1, k2", "v1 DESC, k2 DESC", new Object[] { "a", 1, false, "bb" }); } - + @Test public void tesIndexedExpressionSerialization() throws Exception { Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); @@ -322,4 +328,81 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { conn.close(); } } + + @Test + public void testDeleteColumnMutation() throws Exception { + String tableName = "T_" + generateUniqueName(); + String indexName1 = "I_" + generateUniqueName(); + String indexName2 = "I_" + generateUniqueName(); + String ddl = String.format("create table %s (id varchar primary key, " + + "col1 varchar, col2 varchar, col3 bigint)", tableName); + String index1 = String.format("create index %s on %s (col2) include (col1) ", + indexName1, tableName); + String index2 = String.format("create index %s on %s (col2) include (col1) " + + "COLUMN_ENCODED_BYTES=2, IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS", + indexName2, tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute(ddl); + conn.createStatement().execute(index1); + conn.createStatement().execute(index2); + PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class); + PTable table = pconn.getTable(tableName); + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + table.getIndexMaintainers(ptr, pconn); + List<IndexMaintainer> ims = IndexMaintainer.deserialize(ptr, + GenericKeyValueBuilder.INSTANCE, true); + assertEquals(2, ims.size()); + String dml = String.format("upsert into %s values ('a', 'ab', 'abc', 2)", tableName); + assertDeleteColumnMutation(tableName, dml, false, pconn, ims); + pconn.getMutationState().rollback(); + dml = String.format("upsert into %s (id, col2) values ('a', 'ab')", tableName); + assertDeleteColumnMutation(tableName, dml, true, pconn, ims); + pconn.getMutationState().rollback(); + } + } + + private static void assertDeleteColumnMutation(String tableName, + String dml, + boolean isPartialUpdate, + PhoenixConnection pconn, + List<IndexMaintainer> ims) throws Exception { + pconn.createStatement().execute(dml); + Iterator<Pair<byte[], List<Mutation>>> iterator = + pconn.getMutationState().toMutations(); + while (iterator.hasNext()) { + Pair<byte[], List<Mutation>> mutationPair = iterator.next(); + List<Mutation> batchMutations = mutationPair.getSecond(); + assertEquals(1, batchMutations.size()); + assertTrue(batchMutations.get(0) instanceof Put); + Put dataRow = (Put) batchMutations.get(0); + ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(dataRow); + long ts = EnvironmentEdgeManager.currentTimeMillis(); + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(dataRow.getRow()); + for (IndexMaintainer im : ims) { + Put indexPut = im.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + nextDataRowVG, rowKey, ts, null, null, false); + if (indexPut == null) { + // No covered column. Just prepare an index row with the empty column + byte[] indexRowKey = im.buildRowKey(nextDataRowVG, rowKey, + null, null, ts); + indexPut = new Put(indexRowKey); + } + indexPut.addColumn( + im.getEmptyKeyValueFamily().copyBytesIfNecessary(), + im.getEmptyKeyValueQualifier(), ts, + QueryConstants.UNVERIFIED_BYTES); + Delete deleteCol = im.buildDeleteColumnMutation(indexPut, ts); + if (im.getIndexStorageScheme() == + PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + assertNull(deleteCol); + } else { + if (isPartialUpdate) { + assertNotNull(deleteCol); + } else { + assertNull(deleteCol); + } + } + } + } + } }