This is an automated email from the ASF dual-hosted git repository. gjacoby pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 049b9d8 PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc (#1200) 049b9d8 is described below commit 049b9d836ca2f0fb89213e224e4c35aeae3e57b5 Author: Rushabh Shah <shahr...@gmail.com> AuthorDate: Wed Apr 21 11:55:58 2021 -0700 PHOENIX-6444 Extend Cell Tags to Delete object for Indexer coproc (#1200) Co-authored-by: Rushabh <rushabh.s...@salesforce.com> --- .../java/org/apache/phoenix/end2end/DeleteIT.java | 36 ++++++- .../end2end/ParameterizedIndexUpgradeToolIT.java | 7 +- .../phoenix/end2end/index/IndexCoprocIT.java | 104 +++----------------- .../phoenix/end2end/index/IndexTestUtil.java | 109 +++++++++++++++++++++ .../phoenix/hbase/index/IndexRegionObserver.java | 47 +-------- .../org/apache/phoenix/hbase/index/Indexer.java | 12 ++- .../java/org/apache/phoenix/util/IndexUtil.java | 42 ++++++++ 7 files changed, 209 insertions(+), 148 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java index 0c71b1d..bfdc3a2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/DeleteIT.java @@ -44,12 +44,15 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.PhoenixTagType; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.DeleteCompiler; import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.end2end.index.IndexTestUtil; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.parse.DeleteStatement; import org.apache.phoenix.parse.SQLParser; @@ -976,7 +979,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { // Add tag "customer-delete" to delete marker. props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue); - createAndUpsertTable(tableName, indexName, props); + createAndUpsertTable(tableName, indexName, props, false); // Make sure that the plan creates is of ClientSelectDeleteMutationPlan verifyDeletePlan(delete, DeleteCompiler.ClientSelectDeleteMutationPlan.class, props); executeDelete(delete, props, 1); @@ -1001,7 +1004,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue); - createAndUpsertTable(tableName, indexName, props); + createAndUpsertTable(tableName, indexName, props, false); // Make sure that the plan creates is of ServerSelectDeleteMutationPlan verifyDeletePlan(delete, DeleteCompiler.ServerSelectDeleteMutationPlan.class, props); executeDelete(delete, props, 2); @@ -1027,7 +1030,7 @@ public class DeleteIT extends ParallelStatsDisabledIT { props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue); // Don't create index table. We will use MultiRowDeleteMutationPlan // if there is no index present for a table. - createAndUpsertTable(tableName, null, props); + createAndUpsertTable(tableName, null, props, false); // Make sure that the plan creates is of MultiRowDeleteMutationPlan verifyDeletePlan(delete, DeleteCompiler.MultiRowDeleteMutationPlan.class, props); executeDelete(delete, props, 1); @@ -1052,8 +1055,8 @@ public class DeleteIT extends ParallelStatsDisabledIT { assertEquals(plan.getClass(), planClass); } } - private void createAndUpsertTable(String tableName, String indexName, Properties props) - throws SQLException { + private void createAndUpsertTable(String tableName, String indexName, Properties props, + boolean useOldCoproc) throws Exception { String ddl = "CREATE TABLE " + tableName + " (k INTEGER NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)"; try(Connection conn = DriverManager.getConnection(getUrl(), props)) { @@ -1064,6 +1067,10 @@ public class DeleteIT extends ParallelStatsDisabledIT { String indexDdl1 = "CREATE INDEX " + indexName + " ON " + tableName + "(v1,v2)"; statement.execute(indexDdl1); } + if (useOldCoproc) { + Admin admin = ((PhoenixConnection) conn).getQueryServices().getAdmin(); + IndexTestUtil.downgradeCoprocs(tableName, indexName, admin); + } statement.execute( "upsert into " + tableName + " values (1, 'foo', 'foo1')"); statement.execute( @@ -1119,4 +1126,23 @@ public class DeleteIT extends ParallelStatsDisabledIT { assertEquals(0, tags.size()); } } + + /* + Test whether source of operation tags are added to Delete mutations if we are using + old index coproc. + */ + @Test + public void testDeleteTagsWithOldIndexCoproc() throws Exception { + String tableName = generateUniqueName(); + String tagValue = "customer-delete"; + String delete = "DELETE FROM " + tableName + " WHERE k = 1"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(ConnectionQueryServices.SOURCE_OPERATION_ATTRIB, tagValue); + // The new table will always have new index coproc. Downgrade it to use older one. + createAndUpsertTable(tableName, null, props, true); + executeDelete(delete, props, 1); + String startRowKeyForBaseTable = "1"; + // Make sure that Delete Marker has cell tag for base table. + checkTagPresentInDeleteMarker(tableName, startRowKeyForBaseTable, true, tagValue); + } } \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java index 7c53701..8795100 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ParameterizedIndexUpgradeToolIT.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.phoenix.end2end.index.IndexCoprocIT; +import org.apache.phoenix.end2end.index.IndexTestUtil; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.index.GlobalIndexChecker; @@ -398,7 +399,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest { indexDesc.hasCoprocessor(IndexRegionObserver.class.getName())); Assert.assertFalse("Found Indexer on " + table, indexDesc.hasCoprocessor(Indexer.class.getName())); - IndexCoprocIT.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(), + IndexTestUtil.assertCoprocConfig(indexDesc, IndexRegionObserver.class.getName(), IndexCoprocIT.INDEX_REGION_OBSERVER_CONFIG); } } @@ -409,7 +410,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest { HTableDescriptor indexDesc = admin.getTableDescriptor(TableName.valueOf(index)); Assert.assertTrue("Couldn't find GlobalIndexChecker on " + index, indexDesc.hasCoprocessor(GlobalIndexChecker.class.getName())); - IndexCoprocIT.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(), + IndexTestUtil.assertCoprocConfig(indexDesc, GlobalIndexChecker.class.getName(), IndexCoprocIT.GLOBAL_INDEX_CHECKER_CONFIG); } } @@ -439,7 +440,7 @@ public class ParameterizedIndexUpgradeToolIT extends BaseTest { indexDesc.hasCoprocessor(Indexer.class.getName())); Assert.assertFalse("Found IndexRegionObserver on " + table, indexDesc.hasCoprocessor(IndexRegionObserver.class.getName())); - IndexCoprocIT.assertCoprocConfig(indexDesc, Indexer.class.getName(), + IndexTestUtil.assertCoprocConfig(indexDesc, Indexer.class.getName(), IndexCoprocIT.INDEXER_CONFIG); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java index 839d46b..3bcbfd0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexCoprocIT.java @@ -22,20 +22,14 @@ import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.hbase.index.Indexer; -import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.index.GlobalIndexChecker; -import org.apache.phoenix.index.PhoenixIndexBuilder; -import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; -import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.util.SchemaUtil; -import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -45,10 +39,12 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; +import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocConfig; +import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsContains; +import static org.apache.phoenix.end2end.index.IndexTestUtil.assertCoprocsNotContains; + @RunWith(Parameterized.class) public class IndexCoprocIT extends ParallelStatsDisabledIT { private boolean isNamespaceMapped = false; @@ -87,30 +83,15 @@ public class IndexCoprocIT extends ParallelStatsDisabledIT { createBaseTable(schemaName, tableName, isMultiTenant, 0, null); createIndexTable(schemaName, tableName, indexName); - - HTableDescriptor baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); - HTableDescriptor indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); - - assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); - assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); - - removeCoproc(IndexRegionObserver.class, baseDescriptor, admin); - removeCoproc(IndexRegionObserver.class, indexDescriptor, admin); - removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin); - - Map<String, String> props = new HashMap<String, String>(); - props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName()); - Indexer.enableIndexing(baseDescriptor, PhoenixIndexBuilder.class, - props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); - admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor); - baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); - indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); - assertUsingOldCoprocs(baseDescriptor, indexDescriptor); + IndexTestUtil.downgradeCoprocs(physicalTableName, physicalIndexName, + admin); createBaseTable(schemaName, tableName, true, 0, null); - baseDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalTableName)); - indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); - assertUsingOldCoprocs(baseDescriptor, indexDescriptor); + HTableDescriptor baseDescriptor = admin.getTableDescriptor( + TableName.valueOf(physicalTableName)); + HTableDescriptor indexDescriptor = admin.getTableDescriptor( + TableName.valueOf(physicalIndexName)); + IndexTestUtil.assertUsingOldCoprocs(baseDescriptor, indexDescriptor); } @Test @@ -164,15 +145,6 @@ public class IndexCoprocIT extends ParallelStatsDisabledIT { assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); } - private void assertUsingOldCoprocs(HTableDescriptor baseDescriptor, - HTableDescriptor indexDescriptor) { - assertCoprocsContains(Indexer.class, baseDescriptor); - assertCoprocConfig(baseDescriptor, Indexer.class.getName(), - INDEXER_CONFIG); - assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor); - assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor); - assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor); - } private void assertUsingNewCoprocs(HTableDescriptor baseDescriptor) { assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); @@ -191,57 +163,6 @@ public class IndexCoprocIT extends ParallelStatsDisabledIT { GLOBAL_INDEX_CHECKER_CONFIG); } - private void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) { - String expectedCoprocName = clazz.getName(); - boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); - Assert.assertTrue("Could not find coproc " + expectedCoprocName + - " in descriptor " + descriptor,foundCoproc); - } - - private void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) { - String expectedCoprocName = clazz.getName(); - boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); - Assert.assertFalse("Could find coproc " + expectedCoprocName + - " in descriptor " + descriptor,foundCoproc); - } - - private boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) { - boolean foundCoproc = false; - for (String coprocName : descriptor.getCoprocessors()){ - if (coprocName.equals(expectedCoprocName)){ - foundCoproc = true; - break; - } - } - return foundCoproc; - } - - public static void assertCoprocConfig(HTableDescriptor indexDesc, - String className, String expectedConfigValue){ - boolean foundConfig = false; - for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry : - indexDesc.getValues().entrySet()){ - String propKey = Bytes.toString(entry.getKey().get()); - String propValue = Bytes.toString(entry.getValue().get()); - //Unfortunately, a good API to read coproc properties didn't show up until - //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x - if (propKey.contains("coprocessor")){ - if (propValue.contains(className)){ - Assert.assertEquals(className + " is configured incorrectly", - expectedConfigValue, - propValue); - foundConfig = true; - break; - } - } - } - Assert.assertTrue("Couldn't find config for " + className, foundConfig); - } - - private void removeCoproc(Class clazz, HTableDescriptor descriptor, Admin admin) throws Exception { - descriptor.removeCoprocessor(clazz.getName()); - admin.modifyTable(descriptor.getTableName(), descriptor); - } private void createIndexTable(String schemaName, String tableName, String indexName) throws SQLException { @@ -297,5 +218,4 @@ public class IndexCoprocIT extends ParallelStatsDisabledIT { props.setProperty(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceMapped)); return (PhoenixConnection) DriverManager.getConnection(getUrl(),props); } - -} +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java index 52af966..847cef7 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexTestUtil.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end.index; +import static org.apache.phoenix.end2end.index.IndexCoprocIT.INDEXER_CONFIG; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; @@ -30,15 +31,26 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.BitSet; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.hbase.index.Indexer; +import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.GlobalIndexChecker; +import org.apache.phoenix.index.PhoenixIndexBuilder; +import org.apache.phoenix.index.PhoenixIndexCodec; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; @@ -53,6 +65,7 @@ import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.SchemaUtil; import com.google.common.collect.Lists; +import org.junit.Assert; public class IndexTestUtil { @@ -165,4 +178,100 @@ public class IndexTestUtil { return row.toRowMutations(); } + public static void downgradeCoprocs(String physicalTableName, + String physicalIndexName, Admin admin) throws Exception { + HTableDescriptor baseDescriptor = + admin.getTableDescriptor(TableName.valueOf(physicalTableName)); + + assertCoprocsContains(IndexRegionObserver.class, baseDescriptor); + removeCoproc(IndexRegionObserver.class, baseDescriptor, admin); + + if (physicalIndexName != null) { + HTableDescriptor indexDescriptor = admin.getTableDescriptor( + TableName.valueOf(physicalIndexName)); + assertCoprocsContains(GlobalIndexChecker.class, indexDescriptor); + + removeCoproc(IndexRegionObserver.class, indexDescriptor, admin); + removeCoproc(GlobalIndexChecker.class, indexDescriptor, admin); + } + + Map<String, String> props = new HashMap<String, String>(); + props.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, + PhoenixIndexCodec.class.getName()); + Indexer.enableIndexing(baseDescriptor, PhoenixIndexBuilder.class, + props, QueryServicesOptions.DEFAULT_COPROCESSOR_PRIORITY); + admin.modifyTable(baseDescriptor.getTableName(), baseDescriptor); + baseDescriptor = admin.getTableDescriptor( + TableName.valueOf(physicalTableName)); + HTableDescriptor indexDescriptor = null; + if (physicalIndexName != null) { + indexDescriptor = admin.getTableDescriptor(TableName.valueOf(physicalIndexName)); + } + assertUsingOldCoprocs(baseDescriptor, indexDescriptor); + } + + public static void assertCoprocsContains(Class clazz, HTableDescriptor descriptor) { + String expectedCoprocName = clazz.getName(); + boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); + Assert.assertTrue("Could not find coproc " + expectedCoprocName + + " in descriptor " + descriptor,foundCoproc); + } + + private static boolean isCoprocPresent(HTableDescriptor descriptor, String expectedCoprocName) { + boolean foundCoproc = false; + for (String coprocName : descriptor.getCoprocessors()){ + if (coprocName.equals(expectedCoprocName)){ + foundCoproc = true; + break; + } + } + return foundCoproc; + } + + public static void removeCoproc(Class clazz, HTableDescriptor descriptor, + Admin admin) throws Exception { + descriptor.removeCoprocessor(clazz.getName()); + admin.modifyTable(descriptor.getTableName(), descriptor); + } + + public static void assertUsingOldCoprocs(HTableDescriptor baseDescriptor, + HTableDescriptor indexDescriptor) { + assertCoprocsContains(Indexer.class, baseDescriptor); + assertCoprocConfig(baseDescriptor, Indexer.class.getName(), + INDEXER_CONFIG); + assertCoprocsNotContains(IndexRegionObserver.class, baseDescriptor); + if (indexDescriptor != null) { + assertCoprocsNotContains(IndexRegionObserver.class, indexDescriptor); + assertCoprocsNotContains(GlobalIndexChecker.class, indexDescriptor); + } + } + + public static void assertCoprocConfig(HTableDescriptor indexDesc, + String className, String expectedConfigValue){ + boolean foundConfig = false; + for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry : + indexDesc.getValues().entrySet()){ + String propKey = Bytes.toString(entry.getKey().get()); + String propValue = Bytes.toString(entry.getValue().get()); + //Unfortunately, a good API to read coproc properties didn't show up until + //HBase 2.0. Doing this the painful String-matching way to be compatible with 1.x + if (propKey.contains("coprocessor")){ + if (propValue.contains(className)){ + Assert.assertEquals(className + " is configured incorrectly", + expectedConfigValue, + propValue); + foundConfig = true; + break; + } + } + } + Assert.assertTrue("Couldn't find config for " + className, foundConfig); + } + + static void assertCoprocsNotContains(Class clazz, HTableDescriptor descriptor) { + String expectedCoprocName = clazz.getName(); + boolean foundCoproc = isCoprocPresent(descriptor, expectedCoprocName); + Assert.assertFalse("Could find coproc " + expectedCoprocName + + " in descriptor " + descriptor,foundCoproc); + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index cb7cf0d..aeaa505 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -32,11 +32,6 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.PhoenixTagType; -import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.TagRewriteCell; -import org.apache.phoenix.query.QueryServices; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; @@ -93,6 +88,7 @@ import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; @@ -961,7 +957,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { context.populateOriginalMutations(miniBatchOp); // Need to add cell tags to Delete Marker before we do any index processing // since we add tags to tables which doesn't have indexes also. - setDeleteAttributes(miniBatchOp); + IndexUtil.setDeleteAttributes(miniBatchOp); /* * Exclusively lock all rows so we get a consistent read @@ -1018,42 +1014,6 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { } } - /** - * Set Cell Tags to delete markers with source of operation attribute. - * @param miniBatchOp - * @throws IOException - */ - private void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp) - throws IOException { - for (int i = 0; i < miniBatchOp.size(); i++) { - Mutation m = miniBatchOp.getOperation(i); - if (!(m instanceof Delete)) { - // Ignore if it is not Delete type. - continue; - } - byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB); - if (sourceOpAttr == null) { - continue; - } - Tag sourceOpTag = new Tag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr); - List<Cell> updatedCells = new ArrayList<>(); - for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { - Cell cell = cellScanner.current(); - List<Tag> tags = Tag.asList(cell.getTagsArray(), - cell.getTagsOffset(), cell.getTagsLength()); - tags.add(sourceOpTag); - Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags)); - updatedCells.add(updatedCell); - } - m.getFamilyCellMap().clear(); - // Clear and add new Cells to the Mutation. - for (Cell cell : updatedCells) { - Delete d = (Delete) m; - d.addDeleteMarker(cell); - } - } - } - private void setBatchMutateContext(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { this.batchMutateContext.set(context); } @@ -1205,5 +1165,4 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { properties.put(IndexRegionObserver.INDEX_BUILDER_CONF_KEY, builder.getName()); desc.addCoprocessor(IndexRegionObserver.class.getName(), null, priority, properties); } -} - +} \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java index f383976..d36c9c7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java @@ -83,6 +83,7 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.trace.TracingUtils; import org.apache.phoenix.trace.util.NullSpan; import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ServerUtil.ConnectionType; @@ -371,12 +372,15 @@ public class Indexer extends BaseRegionObserver { public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { + // Need to add cell tags to Delete Marker before we do any index processing + // since we add tags to tables which doesn't have indexes also. + IndexUtil.setDeleteAttributes(miniBatchOp); // first group all the updates for a single row into a single update to be processed - Map<ImmutableBytesPtr, MultiMutation> mutationsMap = - new HashMap<ImmutableBytesPtr, MultiMutation>(); + Map<ImmutableBytesPtr, MultiMutation> mutationsMap + = new HashMap<ImmutableBytesPtr, MultiMutation>(); Durability defaultDurability = Durability.SYNC_WAL; - if(c.getEnvironment().getRegion() != null) { + if (c.getEnvironment().getRegion() != null) { defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability(); defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? Durability.SYNC_WAL : defaultDurability; @@ -518,7 +522,7 @@ public class Indexer extends BaseRegionObserver { byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator(); List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size()); - while(indexUpdatesItr.hasNext()) { + while (indexUpdatesItr.hasNext()) { Pair<Mutation, byte[]> next = indexUpdatesItr.next(); if (Bytes.compareTo(next.getSecond(), tableName) == 0) { localUpdates.add(next.getFirst()); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index ec1a4f2..6d552e3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -42,11 +42,16 @@ import java.util.concurrent.TimeUnit; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.PhoenixTagType; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.TagRewriteCell; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -62,6 +67,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; @@ -101,6 +107,7 @@ import org.apache.phoenix.parse.SQLParser; import org.apache.phoenix.parse.SelectStatement; import org.apache.phoenix.protobuf.ProtobufUtil; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException; @@ -934,4 +941,39 @@ public class IndexUtil { } } + /** + * Set Cell Tags to delete markers with source of operation attribute. + * @param miniBatchOp miniBatchOp + * @throws IOException IOException + */ + public static void setDeleteAttributes(MiniBatchOperationInProgress<Mutation> miniBatchOp) + throws IOException { + for (int i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + if (!(m instanceof Delete)) { + // Ignore if it is not Delete type. + continue; + } + byte[] sourceOpAttr = m.getAttribute(QueryServices.SOURCE_OPERATION_ATTRIB); + if (sourceOpAttr == null) { + continue; + } + Tag sourceOpTag = new Tag(PhoenixTagType.SOURCE_OPERATION_TAG_TYPE, sourceOpAttr); + List<Cell> updatedCells = new ArrayList<>(); + for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) { + Cell cell = cellScanner.current(); + List<Tag> tags = Tag.asList(cell.getTagsArray(), + cell.getTagsOffset(), cell.getTagsLength()); + tags.add(sourceOpTag); + Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags)); + updatedCells.add(updatedCell); + } + m.getFamilyCellMap().clear(); + // Clear and add new Cells to the Mutation. + for (Cell cell : updatedCells) { + Delete d = (Delete) m; + d.addDeleteMarker(cell); + } + } + } }