This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 0744a7c73a PHOENIX-7426 Generating index mutations for immutable
tables on the s… (#2245) (#2290)
0744a7c73a is described below
commit 0744a7c73a38fd8a19721fa80e5b3d0781aee308
Author: Kadir Ozdemir <[email protected]>
AuthorDate: Mon Sep 22 14:34:36 2025 -0700
PHOENIX-7426 Generating index mutations for immutable tables on the s…
(#2245) (#2290)
---
.../org/apache/phoenix/compile/DeleteCompiler.java | 23 ++++-
.../org/apache/phoenix/compile/UpsertCompiler.java | 11 ++-
.../org/apache/phoenix/execute/MutationState.java | 10 +-
.../org/apache/phoenix/index/IndexMaintainer.java | 13 ++-
.../org/apache/phoenix/query/QueryServices.java | 6 +-
.../apache/phoenix/query/QueryServicesOptions.java | 1 +
.../java/org/apache/phoenix/util/IndexUtil.java | 15 +--
.../phoenix/hbase/index/IndexRegionObserver.java | 25 ++++-
...ationBatchFailedStateMetricWithAllDeleteIT.java | 4 +-
...ationBatchFailedStateMetricWithAllUpsertIT.java | 4 +-
...atchFailedStateMetricWithDeleteAndUpsertIT.java | 4 +-
...tableIndexIT.java => BaseImmutableIndexIT.java} | 106 ++++++++++++++++-----
.../end2end/index/ClientSideImmutableIndexIT.java | 61 ++++++++++++
.../end2end/index/ServerSideImmutableIndexIT.java | 61 ++++++++++++
14 files changed, 286 insertions(+), 58 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index c76043aca7..3d1e9ffdcd 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -18,6 +18,8 @@
package org.apache.phoenix.compile;
import static
org.apache.phoenix.execute.MutationState.RowTimestampColInfo.NULL_ROWTIMESTAMP_INFO;
+import static
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static org.apache.phoenix.util.NumberUtil.add;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -212,7 +214,7 @@ public class DeleteCompiler {
// The data table is always the last one in the list if it's
// not chosen as the best of the possible plans.
dataTable = otherTableRefs.get(otherTableRefs.size() - 1).getTable();
- if (!isMaintainedOnClient(table)) {
+ if (!isMaintainedOnClient(table, connection)) {
// dataTable is a projected table and may not include all the
indexed columns and so we
// need to get
// the actual data table
@@ -257,7 +259,7 @@ public class DeleteCompiler {
// row timestamp column, then the
// row key will already have its value.
// Check for otherTableRefs being empty required when deleting
directly from the index
- if (otherTableRefs.isEmpty() || isMaintainedOnClient(table)) {
+ if (otherTableRefs.isEmpty() || isMaintainedOnClient(table,
connection)) {
mutations.put(rowKeyPtr,
new RowMutationState(PRow.DELETE_MARKER, 0,
statement.getConnection().getStatementExecutionCounter(),
NULL_ROWTIMESTAMP_INFO,
@@ -381,7 +383,10 @@ public class DeleteCompiler {
List<PTable> nonDisabledIndexes =
Lists.newArrayListWithExpectedSize(table.getIndexes().size());
for (PTable index : table.getIndexes()) {
- if (!index.getIndexState().isDisabled() &&
isMaintainedOnClient(index)) {
+ if (
+ !index.getIndexState().isDisabled()
+ && isMaintainedOnClient(index, statement.getConnection())
+ ) {
nonDisabledIndexes.add(index);
}
}
@@ -564,7 +569,7 @@ public class DeleteCompiler {
// mutations are generated on the client side. Indexed columns are needed
to identify index rows
// to be deleted
for (PTable index : table.getIndexes()) {
- if (isMaintainedOnClient(index)) {
+ if (isMaintainedOnClient(index, connection)) {
IndexMaintainer maintainer = index.getIndexMaintainer(table,
connection);
// Go through maintainer as it handles functional indexes correctly
for (Pair<String, String> columnInfo :
maintainer.getIndexedColumnInfo()) {
@@ -1130,10 +1135,18 @@ public class DeleteCompiler {
}
}
- private static boolean isMaintainedOnClient(PTable table) {
+ private static boolean isMaintainedOnClient(PTable table, PhoenixConnection
connection) {
if (CDCUtil.isCDCIndex(table)) {
return false;
}
+ if (
+ !table.isTransactional() && table.getIndexType() != IndexType.LOCAL
+ && connection.getQueryServices().getConfiguration().getBoolean(
+ SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED)
+ ) {
+ return false;
+ }
// Test for not being local (rather than being GLOBAL) so that this
doesn't fail
// when tested with our projected table.
return (table.getIndexType() != IndexType.LOCAL
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 4506e71c79..7d42690420 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.compile;
+import static
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static
org.apache.phoenix.thirdparty.com.google.common.collect.Lists.newArrayListWithCapacity;
@@ -611,11 +613,14 @@ public class UpsertCompiler {
// region space managed by region servers. So we bail out on executing
on server side.
// Disable running upsert select on server side if a table has global
mutable secondary
// indexes on it
- boolean hasGlobalMutableIndexes =
- SchemaUtil.hasGlobalIndex(table) && !table.isImmutableRows();
+ boolean serverSideImmutableIndexes =
connection.getQueryServices().getConfiguration()
+ .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
+ boolean hasGlobalServerSideIndexes = SchemaUtil.hasGlobalIndex(table)
+ && (serverSideImmutableIndexes || !table.isImmutableRows());
boolean hasWhereSubquery = select.getWhere() != null &&
select.getWhere().hasSubquery();
runOnServer =
- (sameTable || (serverUpsertSelectEnabled &&
!hasGlobalMutableIndexes)) && isAutoCommit
+ (sameTable || serverUpsertSelectEnabled &&
!hasGlobalServerSideIndexes) && isAutoCommit
// We can run the upsert select for initial index population on
server side for
// transactional
// tables since the writes do not need to be done transactionally,
since we gate the index
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
index 13bdff10a4..fb82692be5 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -31,9 +31,11 @@ import static
org.apache.phoenix.monitoring.MetricType.NUM_METADATA_LOOKUP_FAILU
import static
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_FAILURE_SQL_COUNTER;
import static
org.apache.phoenix.monitoring.MetricType.UPSERT_AGGREGATE_SUCCESS_SQL_COUNTER;
import static
org.apache.phoenix.query.QueryServices.INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB;
+import static
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
import static
org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static
org.apache.phoenix.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@@ -184,6 +186,7 @@ public class MutationState implements SQLCloseable {
private long mutationQueryParsingTimeMS = 0;
private final boolean indexRegionObserverEnabledAllTables;
+ private final boolean serverSideImmutableIndexes;
/**
* Return result back to client. To be used when client needs to read the
whole row or some
@@ -262,6 +265,9 @@ public class MutationState implements SQLCloseable {
this.indexRegionObserverEnabledAllTables =
Boolean.parseBoolean(this.connection
.getQueryServices().getConfiguration().get(INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB,
DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES));
+ this.serverSideImmutableIndexes =
this.connection.getQueryServices().getConfiguration()
+ .getBoolean(SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED);
}
public MutationState(TableRef table, MultiRowMutationState mutations, long
sizeOffset,
@@ -667,7 +673,7 @@ public class MutationState implements SQLCloseable {
final PTable table = tableRef.getTable();
final List<PTable> indexList = includeAllIndexes
?
Lists.newArrayList(IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()))
- : IndexUtil.getClientMaintainedIndexes(table);
+ : IndexUtil.getClientMaintainedIndexes(table,
serverSideImmutableIndexes);
final Iterator<PTable> indexes = indexList.iterator();
final List<Mutation> mutationList =
Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex =
@@ -1879,7 +1885,7 @@ public class MutationState implements SQLCloseable {
}
PTable logicalTable = tableInfo.getPTable();
if (
- tableInfo.getOrigTableRef().getTable().isImmutableRows()
+ !this.serverSideImmutableIndexes &&
tableInfo.getOrigTableRef().getTable().isImmutableRows()
&& (this.indexRegionObserverEnabledAllTables
|| IndexUtil.isGlobalIndexCheckerEnabled(connection,
tableInfo.getHTableName()))
) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index 7c0c4439fd..e2b33a17d7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -17,6 +17,8 @@
*/
package org.apache.phoenix.index;
+import static
org.apache.phoenix.query.QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED;
import static
org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -191,12 +193,15 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
}
public static Iterator<PTable>
maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(
- final PTable dataTable, Iterator<PTable> indexes) {
+ final PTable dataTable, Iterator<PTable> indexes, PhoenixConnection
connection) {
return Iterators.filter(indexes, new Predicate<PTable>() {
@Override
public boolean apply(PTable index) {
return sendIndexMaintainer(index) && ((index.getIndexType() ==
IndexType.GLOBAL
- && dataTable.getImmutableStorageScheme() !=
index.getImmutableStorageScheme())
+ && (dataTable.getImmutableStorageScheme() !=
index.getImmutableStorageScheme()
+ || connection.getQueryServices().getConfiguration().getBoolean(
+ SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
+ DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED)))
|| index.getIndexType() == IndexType.LOCAL ||
CDCUtil.isCDCIndex(index));
}
});
@@ -231,8 +236,8 @@ public class IndexMaintainer implements Writable,
Iterable<ColumnReference> {
!dataTable.isTransactional() ||
!dataTable.getTransactionProvider().getTransactionProvider()
.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER)
) {
- indexesItr =
-
maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(dataTable,
indexes.iterator());
+ indexesItr =
maintainedLocalOrGlobalIndexesWithoutMatchingStorageScheme(dataTable,
+ indexes.iterator(), connection);
}
} else {
indexesItr = maintainedIndexes(indexes.iterator());
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index bcb0b5b463..66b8fe5379 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -412,9 +412,13 @@ public interface QueryServices extends SQLCloseable {
// The minimum age of an unverified global index row to be eligible for
deletion
public static final String
GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB =
"phoenix.global.index.row.age.threshold.to.delete.ms";
- // Enable the IndexRegionObserver Coprocessor
+ // Enable the IndexRegionObserver coprocessor
public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB =
"phoenix.index.region.observer.enabled";
+ // Enable the IndexRegionObserver coprocessor for immutable tables
+ String SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB =
+ "phoenix.server.side.immutable.indexes.enabled";
+
// Whether IndexRegionObserver/GlobalIndexChecker is enabled for all tables
public static final String INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES_ATTRIB =
"phoenix.index.region.observer.enabled.all.tables";
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index a7f282e1f7..d398f4b240 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -424,6 +424,7 @@ public class QueryServicesOptions {
public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS
=
7 * 24 * 60 * 60 * 1000; /* 7 days */
public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+ public static final boolean DEFAULT_SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED =
false;
public static final String DEFAULT_INDEX_REGION_OBSERVER_ENABLED_ALL_TABLES =
Boolean.toString(true);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
index af1cf441f8..5262437af8 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -761,18 +761,19 @@ public class IndexUtil {
}
}
- public static List<PTable> getClientMaintainedIndexes(PTable table) {
+ public static List<PTable> getClientMaintainedIndexes(PTable table,
+ boolean serverSideImmutableIndex) {
Iterator<PTable> indexIterator = // Only maintain tables with immutable
rows through this
// client-side mechanism
(table.isTransactional() &&
table.getTransactionProvider().getTransactionProvider()
.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER))
? IndexMaintainer.maintainedIndexes(table.getIndexes().iterator())
- : (table.isImmutableRows() || table.isTransactional()) ?
- // If the data table has a different storage scheme than index table,
don't maintain this on
- // the client
- // For example, if the index is single cell but the data table is
one_cell, if there is a
- // partial update on the data table, index can't be built on the
client.
-
IndexMaintainer.maintainedGlobalIndexesWithMatchingStorageScheme(table,
+ : (table.isImmutableRows() && !serverSideImmutableIndex ||
table.isTransactional())
+ // If the data table has a different storage scheme than index
table, don't maintain
+ // this on the client. For example, if the index is single cell but
the data table is
+ // one_cell, and there is a partial update on the data table, index
can't be built
+ // on the client
+ ?
IndexMaintainer.maintainedGlobalIndexesWithMatchingStorageScheme(table,
table.getIndexes().iterator())
: Collections.<PTable> emptyIterator();
return Lists.newArrayList(indexIterator);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 40a4239804..fc55b2435d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -308,6 +308,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private boolean returnResult;
private boolean returnOldRow;
private boolean hasConditionalTTL; // table has Conditional TTL
+ private boolean immutableRows;
public BatchMutateContext() {
this.clientVersion = 0;
@@ -1361,6 +1362,21 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static void identifyIndexMaintainerTypes(PhoenixIndexMetaData
indexMetaData,
BatchMutateContext context) {
for (IndexMaintainer indexMaintainer :
indexMetaData.getIndexMaintainers()) {
+ if (indexMaintainer.isImmutableRows()) {
+ // Here we care if index is immutable in order to skip reading data
table rows. However, if
+ // the data table storage scheme does not agree with the index table
storage scheme, we
+ // cannot skip reading data table rows, and thus we cannot treat the
index as immutable.
+ // Consider the case where data table uses the single cell per column
format and index
+ // uses the single cell format. If the data table row is updated
partially, we need to
+ // read the data table row on disk to retrieve missing columns in the
partial update to
+ // build the full index row. Please note with the single cell format,
the row has single
+ // cell (and the empty cell)
+ if (
+ indexMaintainer.getDataImmutableStorageScheme() ==
indexMaintainer.getIndexStorageScheme()
+ ) {
+ context.immutableRows = true;
+ }
+ }
if (indexMaintainer instanceof TransformMaintainer) {
context.hasTransform = true;
} else if (indexMaintainer.isLocalIndex()) {
@@ -1557,10 +1573,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
context.dataRowStates =
new HashMap<ImmutableBytesPtr, Pair<Put,
Put>>(context.rowsToLock.size());
if (
- context.hasGlobalIndex || context.hasTransform || context.hasAtomic ||
context.returnResult
- || context.hasRowDelete || context.hasConditionalTTL
- || (context.hasUncoveredIndex
- && isPartialUncoveredIndexMutation(indexMetaData, miniBatchOp))
+ !context.immutableRows && context.hasGlobalIndex ||
context.hasTransform
+ || context.hasAtomic || context.returnResult || context.hasRowDelete
+ || context.hasConditionalTTL
+ || !context.immutableRows && context.hasUncoveredIndex
+ && isPartialUncoveredIndexMutation(indexMetaData, miniBatchOp)
) {
getCurrentRowStates(c, context);
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java
index 096c3e3aa5..d9d78540e0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllDeleteIT.java
@@ -30,7 +30,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
-import org.apache.phoenix.end2end.index.ImmutableIndexIT;
+import org.apache.phoenix.end2end.index.BaseImmutableIndexIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.PropertiesUtil;
@@ -70,7 +70,7 @@ public class MutationBatchFailedStateMetricWithAllDeleteIT
// mutations
// Note :- it is called before applying mutation to region
TestUtil.addCoprocessor(conn, deleteTableName,
- ImmutableIndexIT.DeleteFailingRegionObserver.class);
+ BaseImmutableIndexIT.DeleteFailingRegionObserver.class);
// trying to delete 4 rows with this single delete statement
ResultSet rs =
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java
index 9bdb3b0358..9bb8053832 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithAllUpsertIT.java
@@ -29,7 +29,7 @@ import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
-import org.apache.phoenix.end2end.index.ImmutableIndexIT;
+import org.apache.phoenix.end2end.index.BaseImmutableIndexIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.PropertiesUtil;
@@ -68,7 +68,7 @@ public class MutationBatchFailedStateMetricWithAllUpsertIT
// mutations
// Note :- it is called before applying mutation to region
TestUtil.addCoprocessor(conn, deleteTableName,
- ImmutableIndexIT.DeleteFailingRegionObserver.class);
+ BaseImmutableIndexIT.DeleteFailingRegionObserver.class);
for (int i = 0; i < numUpsertCount; i++) {
String upsertSQL = String.format(upsertStatement, deleteTableName);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java
index aa30b7722f..a7231a0769 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationBatchFailedStateMetricWithDeleteAndUpsertIT.java
@@ -31,7 +31,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
-import org.apache.phoenix.end2end.index.ImmutableIndexIT;
+import org.apache.phoenix.end2end.index.BaseImmutableIndexIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.util.PropertiesUtil;
@@ -69,7 +69,7 @@ public class
MutationBatchFailedStateMetricWithDeleteAndUpsertIT
// mutations
// Note :- it is called before applying mutation to region
TestUtil.addCoprocessor(conn, deleteTableName,
- ImmutableIndexIT.DeleteFailingRegionObserver.class);
+ BaseImmutableIndexIT.DeleteFailingRegionObserver.class);
// trying to delete 4 rows with this single delete statement
ResultSet rs =
stmt.executeQuery("SELECT COUNT(*) FROM " + deleteTableName + " where
val1 > 1");
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java
similarity index 88%
rename from
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
rename to
phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java
index 9a6a23186d..aaeeea6d07 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseImmutableIndexIT.java
@@ -34,8 +34,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -60,38 +58,32 @@ import
org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableImpl;
+import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.transaction.PhoenixTransactionProvider;
import org.apache.phoenix.transaction.PhoenixTransactionProvider.Feature;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
-import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
-@Category(NeedsOwnMiniClusterTest.class)
-@RunWith(Parameterized.class)
-public class ImmutableIndexIT extends BaseTest {
+public abstract class BaseImmutableIndexIT extends BaseTest {
private final boolean localIndex;
+ private final boolean serverSideIndex;
private final PhoenixTransactionProvider transactionProvider;
private final String tableDDLOptions;
@@ -101,10 +93,11 @@ public class ImmutableIndexIT extends BaseTest {
private static String INDEX_DDL;
public static final AtomicInteger NUM_ROWS = new AtomicInteger(0);
- public ImmutableIndexIT(boolean localIndex, boolean transactional, String
transactionProvider,
- boolean columnEncoded) {
+ public BaseImmutableIndexIT(boolean localIndex, boolean transactional,
String transactionProvider,
+ boolean columnEncoded, boolean serverSideIndex) {
StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
this.localIndex = localIndex;
+ this.serverSideIndex = serverSideIndex;
if (!columnEncoded) {
optionBuilder.append(",COLUMN_ENCODED_BYTES=0,IMMUTABLE_STORAGE_SCHEME="
+ PTableImpl.ImmutableStorageScheme.ONE_CELL_PER_COLUMN);
@@ -121,28 +114,51 @@ public class ImmutableIndexIT extends BaseTest {
}
- @BeforeClass
- public static synchronized void doSetup() throws Exception {
+ protected static Map<String, String> createServerProps() {
Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
serverProps.put("hbase.coprocessor.region.classes",
CreateIndexRegionObserver.class.getName());
+ return serverProps;
+ }
+
+ protected static Map<String, String> createClientProps() {
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
"true");
clientProps.put(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
clientProps.put(HConstants.HBASE_CLIENT_PAUSE, "1");
- setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
- new ReadOnlyProps(clientProps.entrySet().iterator()));
+ return clientProps;
}
- // name is used by failsafe as file name in reports
- @Parameters(
- name =
"ImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}")
- public static synchronized Collection<Object[]> data() {
- return Arrays.asList(new Object[][] { { false, false, null, false },
- { false, false, null, true },
- // OMID does not support local indexes or column encoding
- { false, true, "OMID", false }, { true, false, null, false }, { true,
false, null, true }, });
+ @Test
+ public void testClientVsServerSideIndexMutations() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IND_" + generateUniqueName();
+ String fullTableName =
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+ String fullIndexName =
SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ conn.setAutoCommit(false);
+ String ddl = "CREATE TABLE " + fullTableName +
TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
+ Statement stmt = conn.createStatement();
+ stmt.execute(ddl);
+ ddl = "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexName +
" ON " + fullTableName
+ + " (long_col1)";
+ stmt.execute(ddl);
+ upsertRows(conn, fullTableName, 3);
+ assertClientVsServerSideIndexMutations(conn);
+ conn.commit();
+ ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM
" + fullTableName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " +
fullIndexName);
+ assertTrue(rs.next());
+ assertEquals(3, rs.getInt(1));
+ String dml = "DELETE from " + fullTableName + " WHERE
varchar_pk='varchar1'";
+ assertEquals(1, conn.createStatement().executeUpdate(dml));
+ assertClientVsServerSideIndexMutations(conn);
+ conn.commit();
+ }
}
@Test
@@ -274,7 +290,45 @@ public class ImmutableIndexIT extends BaseTest {
}
}
+ private boolean isServerSideIndex() {
+ // An index is a server-side index if its mutations are generated and
applied on the server
+ // side, otherwise
+ // it is a client-side index
+ // 1. Non-transactional local indexes are server-side indexes
+ // 2. Transactional global indexes are client-side indexes
+ // 3. Transactional local indexes are server-side indexes unless the
transaction provider does
+ // not support it
+ // 4. Non-transactional global mutable indexes are server-side indexes
+ // 5. If configured using
QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB (that is,
+ // when serverSideIndex = true), non-transactional immutable indexes are
also server-side
+ // indexes, otherwise they
+ // are client-side indexes
+ if (
+ (localIndex && transactionProvider != null
+ &&
transactionProvider.isUnsupported(Feature.MAINTAIN_LOCAL_INDEX_ON_SERVER))
+ || (!localIndex && transactionProvider != null) || (!localIndex &&
!serverSideIndex)
+ ) {
+ return false;
+ }
+ return serverSideIndex;
+ }
+
+ private void assertClientVsServerSideIndexMutations(Connection conn) throws
SQLException {
+ boolean serverSideMutations = isServerSideIndex();
+ Iterator<Pair<byte[], List<Cell>>> iterator =
PhoenixRuntime.getUncommittedDataIterator(conn);
+ while (iterator.hasNext()) {
+ byte[] tableName = iterator.next().getFirst();
+ PTable table =
conn.unwrap(PhoenixConnection.class).getTable(Bytes.toString(tableName));
+ if (table.getType() == PTableType.INDEX) {
+ assertFalse(serverSideMutations);
+ }
+ }
+ }
+
private void assertIndexMutations(Connection conn) throws SQLException {
+ if (isServerSideIndex()) {
+ return;
+ }
Iterator<Pair<byte[], List<Cell>>> iterator =
PhoenixRuntime.getUncommittedDataIterator(conn);
assertTrue(iterator.hasNext());
iterator.next();
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java
new file mode 100644
index 0000000000..335c0884e3
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ClientSideImmutableIndexIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class ClientSideImmutableIndexIT extends BaseImmutableIndexIT {
+
+ public ClientSideImmutableIndexIT(boolean localIndex, boolean transactional,
+ String transactionProvider, boolean columnEncoded) {
+ super(localIndex, transactional, transactionProvider, columnEncoded,
false);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> serverProps = createServerProps();
+ Map<String, String> clientProps = createClientProps();
+
clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
"false");
+ driver = null;
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ // name is used by failsafe as file name in reports
+ @Parameters(
+ name =
"ClientSideImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { false, false, null, false },
+ { false, false, null, true },
+ // OMID does not support local indexes or column encoding
+ { false, true, "OMID", false }, { true, false, null, false }, { true,
false, null, true }, });
+ }
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java
new file mode 100644
index 0000000000..a8eace65dd
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ServerSideImmutableIndexIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
+public class ServerSideImmutableIndexIT extends BaseImmutableIndexIT {
+
+ public ServerSideImmutableIndexIT(boolean localIndex, boolean transactional,
+ String transactionProvider, boolean columnEncoded) {
+ super(localIndex, transactional, transactionProvider, columnEncoded, true);
+ }
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> serverProps = createServerProps();
+ Map<String, String> clientProps = createClientProps();
+
clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB,
"true");
+ driver = null;
+ setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()),
+ new ReadOnlyProps(clientProps.entrySet().iterator()));
+ }
+
+ // name is used by failsafe as file name in reports
+ @Parameters(
+ name =
"ServerSideImmutableIndexIT_localIndex={0},transactional={1},transactionProvider={2},columnEncoded={3}")
+ public static synchronized Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] { { false, false, null, false },
+ { false, false, null, true },
+ // OMID does not support local indexes or column encoding
+ { false, true, "OMID", false }, { true, false, null, false }, { true,
false, null, true }, });
+ }
+
+}