This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 8ca227480b PHOENIX-7426 Generating index mutations for immutable tables on the s… (#2245) 8ca227480b is described below commit 8ca227480b962851b78acdf2aaf8639cf89ca6fb Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Thu Sep 18 09:22:06 2025 -0700 PHOENIX-7426 Generating index mutations for immutable tables on the s… (#2245) --- .../org/apache/phoenix/compile/DeleteCompiler.java | 23 ++++- .../org/apache/phoenix/compile/UpsertCompiler.java | 11 ++- .../org/apache/phoenix/execute/MutationState.java | 10 +- .../org/apache/phoenix/index/IndexMaintainer.java | 13 ++- .../org/apache/phoenix/query/QueryServices.java | 6 +- .../apache/phoenix/query/QueryServicesOptions.java | 1 + .../java/org/apache/phoenix/util/IndexUtil.java | 15 +-- .../phoenix/hbase/index/IndexRegionObserver.java | 25 ++++- ...ationBatchFailedStateMetricWithAllDeleteIT.java | 4 +- ...ationBatchFailedStateMetricWithAllUpsertIT.java | 4 +- ...atchFailedStateMetricWithDeleteAndUpsertIT.java | 4 +- ...tableIndexIT.java => BaseImmutableIndexIT.java} | 106 ++++++++++++++++----- .../end2end/index/ClientSideImmutableIndexIT.java | 61 ++++++++++++ .../end2end/index/ServerSideImmutableIndexIT.java | 61 ++++++++++++ 14 files changed, 286 insertions(+), 58 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index c76043aca7..3d1e9ffdcd 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -18,6 +18,8 @@ package org.apache.phoenix.compile; import static org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO; +import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED; import static org.apache.phoenix.util.NumberUtil.add; import edu.umd.cs.findbugs.annotations.NonNull; @@ -212,7 +214,7 @@ public class DeleteCompiler { // The data table is always the last one in the list if it's // not chosen as the best of the possible plans. dataTable = otherTableRefs.get(otherTableRefs.size() - 1).getTable(); - if (!isMaintainedOnClient(table)) { + if (!isMaintainedOnClient(table, connection)) { // dataTable is a projected table and may not include all the indexed columns and so we // need to get // the actual data table @@ -257,7 +259,7 @@ public class DeleteCompiler { // row timestamp column, then the // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index - if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) { + if (otherTableRefs.isEmpty() || isMaintainedOnClient(table, connection)) { mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, @@ -381,7 +383,10 @@ public class DeleteCompiler { List<PTable> nonDisabledIndexes = Lists.newArrayListWithExpectedSize(table.getIndexes().size()); for (PTable index : table.getIndexes()) { - if (!index.getIndexState().isDisabled() && isMaintainedOnClient(index)) { + if ( + !index.getIndexState().isDisabled() + && isMaintainedOnClient(index, statement.getConnection()) + ) { nonDisabledIndexes.add(index); } } @@ -564,7 +569,7 @@ public class DeleteCompiler { // mutations are generated on the client side. Indexed columns are needed to identify index rows // to be deleted for (PTable index : table.getIndexes()) { - if (isMaintainedOnClient(index)) { + if (isMaintainedOnClient(index, connection)) { IndexMaintainer maintainer = index.getIndexMaintainer(table, connection); // Go through maintainer as it handles functional indexes correctly for (Pair<String, String> columnInfo : maintainer.getIndexedColumnInfo()) { @@ -1130,10 +1135,18 @@ public class DeleteCompiler { } } - private static boolean isMaintainedOnClient(PTable table) { + private static boolean isMaintainedOnClient(PTable table, PhoenixConnection connection) { if (CDCUtil.isCDCIndex(table)) { return false; } + if ( + !table.isTransactional() && table.getIndexType() != IndexType.LOCAL + && connection.getQueryServices().getConfiguration().getBoolean( + SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED) + ) { + return false; + } // Test for not being local (rather than being GLOBAL) so that this doesn't fail // when tested with our projected table. return (table.getIndexType() != IndexType.LOCAL diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 4506e71c79..7d42690420 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED; import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity; @@ -611,11 +613,14 @@ public class UpsertCompiler { // region space managed by region servers. So we bail out on executing on server side. // Disable running upsert select on server side if a table has global mutable secondary // indexes on it - boolean hasGlobalMutableIndexes = - SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows(); + boolean serverSideImmutableIndexes = connection.getQueryServices().getConfiguration() + .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED); + boolean hasGlobalServerSideIndexes = SchemaUtil.hasGlobalIndex(table) + && (serverSideImmutableIndexes || !table.isImmutableRows()); boolean hasWhereSubquery = select.getWhere() != null && select.getWhere().hasSubquery(); runOnServer = - (sameTable || (serverUpsertSelectEnabled && !hasGlobalMutableIndexes)) && isAutoCommit + (sameTable || serverUpsertSelectEnabled && !hasGlobalServerSideIndexes) && isAutoCommit // We can run the upsert select for initial index population on server side for // transactional // tables since the writes do not need to be done transactionally, since we gate the index diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 13bdff10a4..fb82692be5 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -31,9 +31,11 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILU import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER; import static org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB; +import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB; import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB; import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED; import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB; import static org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull; @@ -184,6 +186,7 @@ public class MutationState implements SQLCloseable { private long mutationQueryParsingTimeMS = 0; private final boolean indexRegionObserverEnabledAllTables; + private final boolean serverSideImmutableIndexes; /** * Return result back to client. To be used when client needs to read the whole row or some @@ -262,6 +265,9 @@ public class MutationState implements SQLCloseable { this.indexRegionObserverEnabledAllTables = Boolean.parseBoolean(this.connection .getQueryServices().getConfiguration().get(INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB, DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES)); + this.serverSideImmutableIndexes = this.connection.getQueryServices().getConfiguration() + .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED); } public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, @@ -667,7 +673,7 @@ public class MutationState implements SQLCloseable { final PTable table = tableRef.getTable(); final List<PTable> indexList = includeAllIndexes ? Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())) - : IndexUtil.getClientMaintainedIndexes(table); + : IndexUtil.getClientMaintainedIndexes(table, serverSideImmutableIndexes); final Iterator<PTable> indexes = indexList.iterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = @@ -1879,7 +1885,7 @@ public class MutationState implements SQLCloseable { } PTable logicalTable = tableInfo.getPTable(); if ( - tableInfo.getOrigTableRef().getTable().isImmutableRows() + !this.serverSideImmutableIndexes && tableInfo.getOrigTableRef().getTable().isImmutableRows() && (this.indexRegionObserverEnabledAllTables || IndexUtil.isGlobalIndexCheckerEnabled(connection, tableInfo.getHTableName())) ) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 7c0c4439fd..e2b33a17d7 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.index; +import static org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import com.google.protobuf.InvalidProtocolBufferException; @@ -191,12 +193,15 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { } public static Iterator<PTable> maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme( - final PTable dataTable, Iterator<PTable> indexes) { + final PTable dataTable, Iterator<PTable> indexes, PhoenixConnection connection) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override public boolean apply(PTable index) { return sendIndexMaintainer(index) && ((index.getIndexType() == IndexType.GLOBAL - && dataTable.getImmutableStorageScheme() != index.getImmutableStorageScheme()) + && (dataTable.getImmutableStorageScheme() != index.getImmutableStorageScheme() + || connection.getQueryServices().getConfiguration().getBoolean( + SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED))) || index.getIndexType() == IndexType.LOCAL || CDCUtil.isCDCIndex(index)); } }); @@ -231,8 +236,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { !dataTable.isTransactional() || !dataTable.getTransactionProvider().getTransactionProvider() .isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER) ) { - indexesItr = - maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(dataTable, indexes.iterator()); + indexesItr = maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(dataTable, + indexes.iterator(), connection); } } else { indexesItr = maintainedIndexes(indexes.iterator()); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index bcb0b5b463..66b8fe5379 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -412,9 +412,13 @@ public interface QueryServices extends SQLCloseable { // The minimum age of an unverified global index row to be eligible for deletion public static final String GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.global.index.row.age.threshold.to.delete.ms"; - // Enable the IndexRegionObserver Coprocessor + // Enable the IndexRegionObserver coprocessor public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled"; + // Enable the IndexRegionObserver coprocessor for immutable tables + String SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB = + "phoenix.server.side.immutable.indexes.enabled"; + // Whether IndexRegionObserver/GlobalIndexChecker is enabled for all tables public static final String INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB = "phoenix.index.region.observer.enabled.all.tables"; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index a7f282e1f7..d398f4b240 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -424,6 +424,7 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7 * 24 * 60 * 60 * 1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; + public static final boolean DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED = false; public static final String DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES = Boolean.toString(true); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java index af1cf441f8..5262437af8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -761,18 +761,19 @@ public class IndexUtil { } } - public static List<PTable> getClientMaintainedIndexes(PTable table) { + public static List<PTable> getClientMaintainedIndexes(PTable table, + boolean serverSideImmutableIndex) { Iterator<PTable> indexIterator = // Only maintain tables with immutable rows through this // client-side mechanism (table.isTransactional() && table.getTransactionProvider().getTransactionProvider() .isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) ? IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) - : (table.isImmutableRows() || table.isTransactional()) ? - // If the data table has a different storage scheme than index table, don't maintain this on - // the client - // For example, if the index is single cell but the data table is one_cell, if there is a - // partial update on the data table, index can't be built on the client. - IndexMaintainer.maintainedGlobalIndexesWithMatchingStorageScheme(table, + : (table.isImmutableRows() && !serverSideImmutableIndex || table.isTransactional()) + // If the data table has a different storage scheme than index table, don't maintain + // this on the client. For example, if the index is single cell but the data table is + // one_cell, and there is a partial update on the data table, index can't be built + // on the client + ? IndexMaintainer.maintainedGlobalIndexesWithMatchingStorageScheme(table, table.getIndexes().iterator()) : Collections.<PTable> emptyIterator(); return Lists.newArrayList(indexIterator); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 40a4239804..fc55b2435d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -308,6 +308,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private boolean returnResult; private boolean returnOldRow; private boolean hasConditionalTTL; // table has Conditional TTL + private boolean immutableRows; public BatchMutateContext() { this.clientVersion = 0; @@ -1361,6 +1362,21 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static void identifyIndexMaintainerTypes(PhoenixIndexMetaData indexMetaData, BatchMutateContext context) { for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) { + if (indexMaintainer.isImmutableRows()) { + // Here we care if index is immutable in order to skip reading data table rows. However, if + // the data table storage scheme does not agree with the index table storage scheme, we + // cannot skip reading data table rows, and thus we cannot treat the index as immutable. + // Consider the case where data table uses the single cell per column format and index + // uses the single cell format. If the data table row is updated partially, we need to + // read the data table row on disk to retrieve missing columns in the partial update to + // build the full index row. Please note with the single cell format, the row has single + // cell (and the empty cell) + if ( + indexMaintainer.getDataImmutableStorageScheme() == indexMaintainer.getIndexStorageScheme() + ) { + context.immutableRows = true; + } + } if (indexMaintainer instanceof TransformMaintainer) { context.hasTransform = true; } else if (indexMaintainer.isLocalIndex()) { @@ -1557,10 +1573,11 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size()); if ( - context.hasGlobalIndex || context.hasTransform || context.hasAtomic || context.returnResult - || context.hasRowDelete || context.hasConditionalTTL - || (context.hasUncoveredIndex - && isPartialUncoveredIndexMutation(indexMetaData, miniBatchOp)) + !context.immutableRows && context.hasGlobalIndex || context.hasTransform + || context.hasAtomic || context.returnResult || context.hasRowDelete + || context.hasConditionalTTL + || !context.immutableRows && context.hasUncoveredIndex + && isPartialUncoveredIndexMutation(indexMetaData, miniBatchOp) ) { getCurrentRowStates(c, context); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java index 096c3e3aa5..d9d78540e0 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java @@ -30,7 +30,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import java.util.Properties; -import org.apache.phoenix.end2end.index.ImmutableIndexIT; +import org.apache.phoenix.end2end.index.BaseImmutableIndexIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.util.PropertiesUtil; @@ -70,7 +70,7 @@ public class MutationBatchFailedStateMetricWithAllDeleteIT // mutations // Note :- it is called before applying mutation to region TestUtil.addCoprocessor(conn, deleteTableName, - ImmutableIndexIT.DeleteFailingRegionObserver.class); + BaseImmutableIndexIT.DeleteFailingRegionObserver.class); // trying to delete 4 rows with this single delete statement ResultSet rs = diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java index 9bdb3b0358..9bb8053832 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java @@ -29,7 +29,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Map; import java.util.Properties; -import org.apache.phoenix.end2end.index.ImmutableIndexIT; +import org.apache.phoenix.end2end.index.BaseImmutableIndexIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.util.PropertiesUtil; @@ -68,7 +68,7 @@ public class MutationBatchFailedStateMetricWithAllUpsertIT // mutations // Note :- it is called before applying mutation to region TestUtil.addCoprocessor(conn, deleteTableName, - ImmutableIndexIT.DeleteFailingRegionObserver.class); + BaseImmutableIndexIT.DeleteFailingRegionObserver.class); for (int i = 0; i < numUpsertCount; i++) { String upsertSQL = String.format(upsertStatement, deleteTableName); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java index aa30b7722f..a7231a0769 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java @@ -31,7 +31,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import java.util.Properties; -import org.apache.phoenix.end2end.index.ImmutableIndexIT; +import org.apache.phoenix.end2end.index.BaseImmutableIndexIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.monitoring.MetricType; import org.apache.phoenix.util.PropertiesUtil; @@ -69,7 +69,7 @@ public class MutationBatchFailedStateMetricWithDeleteAndUpsertIT // mutations // Note :- it is called before applying mutation to region TestUtil.addCoprocessor(conn, deleteTableName, - ImmutableIndexIT.DeleteFailingRegionObserver.class); + BaseImmutableIndexIT.DeleteFailingRegionObserver.class); // trying to delete 4 rows with this single delete statement ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM " + deleteTableName + " where val1 > 1"); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java similarity index 88% rename from phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java rename to phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java index 9a6a23186d..aaeeea6d07 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java @@ -34,8 +34,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -60,38 +58,32 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableImpl; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.transaction.PhoenixTransactionProvider; import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature; import org.apache.phoenix.transaction.TransactionFactory; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryUtil; -import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; -import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -@Category(NeedsOwnMiniClusterTest.class) -@RunWith(Parameterized.class) -public class ImmutableIndexIT extends BaseTest { +public abstract class BaseImmutableIndexIT extends BaseTest { private final boolean localIndex; + private final boolean serverSideIndex; private final PhoenixTransactionProvider transactionProvider; private final String tableDDLOptions; @@ -101,10 +93,11 @@ public class ImmutableIndexIT extends BaseTest { private static String INDEX_DDL; public static final AtomicInteger NUM_ROWS = new AtomicInteger(0); - public ImmutableIndexIT(boolean localIndex, boolean transactional, String transactionProvider, - boolean columnEncoded) { + public BaseImmutableIndexIT(boolean localIndex, boolean transactional, String transactionProvider, + boolean columnEncoded, boolean serverSideIndex) { StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true"); this.localIndex = localIndex; + this.serverSideIndex = serverSideIndex; if (!columnEncoded) { optionBuilder.append(",COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME=" + PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN); @@ -121,28 +114,51 @@ public class ImmutableIndexIT extends BaseTest { } - @BeforeClass - public static synchronized void doSetup() throws Exception { + protected static Map<String, String> createServerProps() { Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1); serverProps.put("hbase.coprocessor.region.classes", CreateIndexRegionObserver.class.getName()); + return serverProps; + } + + protected static Map<String, String> createClientProps() { Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5); clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true"); clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000"); clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB, "true"); clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1"); clientProps.put(HConstants.HBASE_CLIENT_PAUSE, "1"); - setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), - new ReadOnlyProps(clientProps.entrySet().iterator())); + return clientProps; } - // name is used by failsafe as file name in reports - @Parameters( - name = "ImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") - public static synchronized Collection<Object[]> data() { - return Arrays.asList(new Object[][] { { false, false, null, false }, - { false, false, null, true }, - // OMID does not support local indexes or column encoding - { false, true, "OMID", false }, { true, false, null, false }, { true, false, null, true }, }); + @Test + public void testClientVsServerSideIndexMutations() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String tableName = "TBL_" + generateUniqueName(); + String indexName = "IND_" + generateUniqueName(); + String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName); + String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.setAutoCommit(false); + String ddl = "CREATE TABLE " + fullTableName + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions; + Statement stmt = conn.createStatement(); + stmt.execute(ddl); + ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName + " ON " + fullTableName + + " (long_col1)"; + stmt.execute(ddl); + upsertRows(conn, fullTableName, 3); + assertClientVsServerSideIndexMutations(conn); + conn.commit(); + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullTableName); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + String dml = "DELETE from " + fullTableName + " WHERE varchar_pk='varchar1'"; + assertEquals(1, conn.createStatement().executeUpdate(dml)); + assertClientVsServerSideIndexMutations(conn); + conn.commit(); + } } @Test @@ -274,7 +290,45 @@ public class ImmutableIndexIT extends BaseTest { } } + private boolean isServerSideIndex() { + // An index is a server-side index if its mutations are generated and applied on the server + // side, otherwise + // it is a client-side index + // 1. Non-transactional local indexes are server-side indexes + // 2. Transactional global indexes are client-side indexes + // 3. Transactional local indexes are server-side indexes unless the transaction provider does + // not support it + // 4. Non-transactional global mutable indexes are server-side indexes + // 5. If configured using QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB (that is, + // when serverSideIndex = true), non-transactional immutable indexes are also server-side + // indexes, otherwise they + // are client-side indexes + if ( + (localIndex && transactionProvider != null + && transactionProvider.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)) + || (!localIndex && transactionProvider != null) || (!localIndex && !serverSideIndex) + ) { + return false; + } + return serverSideIndex; + } + + private void assertClientVsServerSideIndexMutations(Connection conn) throws SQLException { + boolean serverSideMutations = isServerSideIndex(); + Iterator<Pair<byte[], List<Cell>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); + while (iterator.hasNext()) { + byte[] tableName = iterator.next().getFirst(); + PTable table = conn.unwrap(PhoenixConnection.class).getTable(Bytes.toString(tableName)); + if (table.getType() == PTableType.INDEX) { + assertFalse(serverSideMutations); + } + } + } + private void assertIndexMutations(Connection conn) throws SQLException { + if (isServerSideIndex()) { + return; + } Iterator<Pair<byte[], List<Cell>>> iterator = PhoenixRuntime.getUncommittedDataIterator(conn); assertTrue(iterator.hasNext()); iterator.next(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java new file mode 100644 index 0000000000..335c0884e3 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) +public class ClientSideImmutableIndexIT extends BaseImmutableIndexIT { + + public ClientSideImmutableIndexIT(boolean localIndex, boolean transactional, + String transactionProvider, boolean columnEncoded) { + super(localIndex, transactional, transactionProvider, columnEncoded, false); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> serverProps = createServerProps(); + Map<String, String> clientProps = createClientProps(); + clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, "false"); + driver = null; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + // name is used by failsafe as file name in reports + @Parameters( + name = "ClientSideImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { false, false, null, false }, + { false, false, null, true }, + // OMID does not support local indexes or column encoding + { false, true, "OMID", false }, { true, false, null, false }, { true, false, null, true }, }); + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java new file mode 100644 index 0000000000..a8eace65dd --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +@Category(NeedsOwnMiniClusterTest.class) +@RunWith(Parameterized.class) +public class ServerSideImmutableIndexIT extends BaseImmutableIndexIT { + + public ServerSideImmutableIndexIT(boolean localIndex, boolean transactional, + String transactionProvider, boolean columnEncoded) { + super(localIndex, transactional, transactionProvider, columnEncoded, true); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> serverProps = createServerProps(); + Map<String, String> clientProps = createClientProps(); + clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, "true"); + driver = null; + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + } + + // name is used by failsafe as file name in reports + @Parameters( + name = "ServerSideImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { false, false, null, false }, + { false, false, null, true }, + // OMID does not support local indexes or column encoding + { false, true, "OMID", false }, { true, false, null, false }, { true, false, null, true }, }); + } + +}