This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch tmp-ec in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 9d676e6fc4f3d3173a4e30c07c263299ac9cf6f7 Author: Viraj Jasani <[email protected]> AuthorDate: Sun Mar 15 22:57:43 2026 -0700 addendum - multi-tenant --- .../phoenix/hbase/index/IndexCDCConsumer.java | 250 +++++++-- .../MultiTenantEventualIndexGenerateIT.java | 56 ++ .../end2end/MultiTenantEventualIndexIT.java | 624 +++++++++++++++++++++ 3 files changed, 893 insertions(+), 37 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index 5616887c46..2fd66bae54 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; import org.apache.phoenix.coprocessor.generated.IndexMutationsProtos; @@ -45,9 +46,13 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.IndexConsistency; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; @@ -144,6 +149,45 @@ public class IndexCDCConsumer implements Runnable { private boolean hasParentPartitions = false; private PTable cachedDataTable; + private boolean tenantInit = false; + private boolean isMultiTenant = false; + private String tenantIdColName; + private PDataType<?> tenantIdDataType; + private TenantScanInfo ownRegionScanInfo; + + private final Map<String, TenantScanInfo> ancestorScanInfoCache = new HashMap<>(); + + private static class TenantScanInfo { + + private static final TenantScanInfo EMPTY = new TenantScanInfo("", "", null, null, null); + + private final String filter; + private final String orderBy; + private final Object startValue; + private final Object endValue; + private final PDataType<?> dataType; + + TenantScanInfo(String filter, String orderBy, Object startValue, Object endValue, + PDataType<?> dataType) { + this.filter = filter; + this.orderBy = orderBy; + this.startValue = startValue; + this.endValue = endValue; + this.dataType = dataType; + } + + int bindParams(PreparedStatement ps, int startIndex) throws SQLException { + int idx = startIndex; + if (startValue != null) { + ps.setObject(idx++, startValue, dataType.getSqlType()); + } + if (endValue != null) { + ps.setObject(idx++, endValue, dataType.getSqlType()); + } + return idx; + } + } + /** * Creates a new IndexCDCConsumer for the given region with configurable serialization mode. * @param env region coprocessor environment. @@ -224,6 +268,125 @@ public class IndexCDCConsumer implements Runnable { cachedDataTable = conn.getTable(dataTableName); } + private void initTenantInfo(PhoenixConnection conn) throws SQLException { + if (tenantInit) { + return; + } + PTable dataTable = getDataTable(conn); + isMultiTenant = dataTable.isMultiTenant(); + if (!isMultiTenant) { + ownRegionScanInfo = TenantScanInfo.EMPTY; + tenantInit = true; + return; + } + int tenantColIndex = dataTable.getBucketNum() != null ? 1 : 0; + PColumn tenantCol = dataTable.getPKColumns().get(tenantColIndex); + tenantIdColName = tenantCol.getName().getString(); + tenantIdDataType = tenantCol.getDataType(); + + byte[] regionStartKey = env.getRegion().getRegionInfo().getStartKey(); + byte[] regionEndKey = env.getRegion().getRegionInfo().getEndKey(); + ownRegionScanInfo = buildTenantScanInfo(regionStartKey, regionEndKey, dataTable); + LOG.debug( + "Initialized multi-tenant scan for table {} region {}:" + + " tenantCol {}, startTenant {}, endTenant {}", + dataTableName, encodedRegionName, tenantIdColName, ownRegionScanInfo.startValue, + ownRegionScanInfo.endValue); + tenantInit = true; + } + + private TenantScanInfo buildTenantScanInfo(byte[] startKey, byte[] endKey, PTable dataTable) { + Object startVal = extractTenantIdFromRegionKey(startKey, dataTable); + Object endVal = extractTenantIdFromRegionKey(endKey, dataTable); + StringBuilder sb = new StringBuilder(); + if (startVal != null) { + sb.append("\"").append(tenantIdColName).append("\" >= ? AND "); + } + if (endVal != null) { + sb.append("\"").append(tenantIdColName).append("\" <= ? AND "); + } + String filter = sb.toString(); + String orderBy = filter.isEmpty() ? "" : "\"" + tenantIdColName + "\" ASC,"; + return new TenantScanInfo(filter, orderBy, startVal, endVal, tenantIdDataType); + } + + private Object extractTenantIdFromRegionKey(byte[] regionKey, PTable dataTable) { + if (regionKey == null || regionKey.length == 0) { + return null; + } + final RowKeySchema schema = dataTable.getRowKeySchema(); + int pkPos = dataTable.getBucketNum() != null ? 1 : 0; + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + int maxOffset = schema.iterator(regionKey, 0, regionKey.length, ptr); + for (int i = 0; i <= pkPos; i++) { + Boolean hasValue = schema.next(ptr, i, maxOffset); + if (!Boolean.TRUE.equals(hasValue)) { + return null; + } + } + byte[] tenantBytes = ByteUtil.copyKeyBytesIfNecessary(ptr); + PColumn tenantCol = dataTable.getPKColumns().get(pkPos); + return tenantCol.getDataType().toObject(tenantBytes, 0, tenantBytes.length, + tenantCol.getDataType(), tenantCol.getSortOrder(), tenantCol.getMaxLength(), + tenantCol.getScale()); + } + + private byte[][] lookupPartitionKeys(String partitionId) throws InterruptedException { + int retryCount = 0; + final String query = "SELECT PARTITION_START_KEY, PARTITION_END_KEY FROM " + + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + + " WHERE TABLE_NAME = ? AND PARTITION_ID = ? LIMIT 1"; + while (!stopped) { + try ( + PhoenixConnection conn = + QueryUtil.getConnectionOnServer(env.getConfiguration()).unwrap(PhoenixConnection.class); + PreparedStatement ps = conn.prepareStatement(query)) { + ps.setString(1, dataTableName); + ps.setString(2, partitionId); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + byte[] startKey = rs.getBytes(1); + byte[] endKey = rs.getBytes(2); + return new byte[][] { startKey == null ? new byte[0] : startKey, + endKey == null ? new byte[0] : endKey }; + } + } + LOG.error("No CDC_STREAM entry found for partition {} table {}. This should not happen.", + partitionId, dataTableName); + return new byte[][] { new byte[0], new byte[0] }; + } catch (SQLException e) { + long sleepTime = ConnectionUtils.getPauseTime(pause, ++retryCount); + LOG.warn( + "Error while retrieving partition keys from CDC_STREAM for partition {} table {}. " + + "Retry #{}, sleeping {} ms before retrying...", + partitionId, dataTableName, retryCount, sleepTime, e); + sleepIfNotStopped(sleepTime); + } + } + return null; + } + + private TenantScanInfo getPartitionTenantScanInfo(String partitionId) + throws InterruptedException { + if (!isMultiTenant) { + return TenantScanInfo.EMPTY; + } + if (partitionId.equals(encodedRegionName)) { + return ownRegionScanInfo; + } + TenantScanInfo cached = ancestorScanInfoCache.get(partitionId); + if (cached != null) { + return cached; + } + byte[][] keys = lookupPartitionKeys(partitionId); + if (keys == null) { + return TenantScanInfo.EMPTY; + } + TenantScanInfo info = buildTenantScanInfo(keys[0], keys[1], cachedDataTable); + ancestorScanInfoCache.put(partitionId, info); + return info; + } + @Override public void run() { try { @@ -730,19 +893,23 @@ public class IndexCDCConsumer implements Runnable { dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp); try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) { + initTenantInfo(conn); String cdcObjectName = getCdcObjectName(conn); + TenantScanInfo scanInfo = getPartitionTenantScanInfo(partitionId); String cdcQuery; if (isParentReplay) { - cdcQuery = String - .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + cdcQuery = String.format( + "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); } else { - cdcQuery = String - .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + cdcQuery = String.format( + "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + "AND PHOENIX_ROW_TIMESTAMP() < ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); } List<Pair<Long, IndexMutationsProtos.IndexMutations>> batchMutations = new ArrayList<>(); long newLastTimestamp = lastProcessedTimestamp; @@ -750,7 +917,7 @@ public class IndexCDCConsumer implements Runnable { int retryCount = 0; while (hasMoreRows && batchMutations.isEmpty()) { try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { - setStatementParams(partitionId, isParentReplay, newLastTimestamp, ps); + setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); Pair<Long, Boolean> result = getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations); hasMoreRows = result.getSecond(); @@ -765,15 +932,17 @@ public class IndexCDCConsumer implements Runnable { // With predefined LIMIT, there might be more rows with the same timestamp that were not // included in this batch. if (newLastTimestamp > lastProcessedTimestamp) { - String sameTimestampQuery = String - .format("SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", cdcObjectName); + String sameTimestampQuery = String.format( + "SELECT /*+ CDC_INCLUDE(IDX_MUTATIONS) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = ? " + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); final long timestampToRefetch = newLastTimestamp; batchMutations.removeIf(pair -> pair.getFirst() == timestampToRefetch); try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) { - ps.setString(1, partitionId); - ps.setDate(2, new Date(newLastTimestamp)); + int idx = scanInfo.bindParams(ps, 1); + ps.setString(idx++, partitionId); + ps.setDate(idx, new Date(newLastTimestamp)); Pair<Long, Boolean> result = getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations); newLastTimestamp = result.getFirst(); @@ -832,19 +1001,23 @@ public class IndexCDCConsumer implements Runnable { dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp); try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) { + initTenantInfo(conn); String cdcObjectName = getCdcObjectName(conn); + TenantScanInfo scanInfo = getPartitionTenantScanInfo(partitionId); String cdcQuery; if (isParentReplay) { - cdcQuery = String - .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + cdcQuery = String.format( + "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); } else { - cdcQuery = String - .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + cdcQuery = String.format( + "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + "AND PHOENIX_ROW_TIMESTAMP() < ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); } List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new ArrayList<>(); @@ -854,7 +1027,7 @@ public class IndexCDCConsumer implements Runnable { int retryCount = 0; while (hasMoreRows && batchStates.isEmpty()) { try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { - setStatementParams(partitionId, isParentReplay, newLastTimestamp, ps); + setStatementParams(scanInfo, partitionId, isParentReplay, newLastTimestamp, ps); Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); hasMoreRows = result.getSecond(); @@ -878,15 +1051,17 @@ public class IndexCDCConsumer implements Runnable { } } if (newLastTimestamp > lastProcessedTimestamp) { - String sameTimestampQuery = String - .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " - + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = ? " - + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", cdcObjectName); + String sameTimestampQuery = String.format( + "SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE %s PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = ? " + + "ORDER BY %s PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", + cdcObjectName, scanInfo.filter, scanInfo.orderBy); final long timestampToRefetch = newLastTimestamp; batchStates.removeIf(pair -> pair.getFirst() == timestampToRefetch); try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) { - ps.setString(1, partitionId); - ps.setDate(2, new Date(newLastTimestamp)); + int idx = scanInfo.bindParams(ps, 1); + ps.setString(idx++, partitionId); + ps.setDate(idx, new Date(newLastTimestamp)); Pair<Long, Boolean> result = getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); newLastTimestamp = result.getFirst(); @@ -908,16 +1083,17 @@ public class IndexCDCConsumer implements Runnable { } } - private void setStatementParams(String partitionId, boolean isParentReplay, long newLastTimestamp, - PreparedStatement ps) throws SQLException { - ps.setString(1, partitionId); - ps.setDate(2, new Date(newLastTimestamp)); + private void setStatementParams(TenantScanInfo scanInfo, String partitionId, + boolean isParentReplay, long newLastTimestamp, PreparedStatement ps) throws SQLException { + int idx = scanInfo.bindParams(ps, 1); + ps.setString(idx++, partitionId); + ps.setDate(idx++, new Date(newLastTimestamp)); if (isParentReplay) { - ps.setInt(3, batchSize); + ps.setInt(idx, batchSize); } else { long currentTime = EnvironmentEdgeManager.currentTimeMillis() - timestampBufferMs; - ps.setDate(3, new Date(currentTime)); - ps.setInt(4, batchSize); + ps.setDate(idx++, new Date(currentTime)); + ps.setInt(idx, batchSize); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java new file mode 100644 index 0000000000..857abf5e81 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexGenerateIT.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Map; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class MultiTenantEventualIndexGenerateIT extends MultiTenantEventualIndexIT { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(12); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Integer.toString(MAX_LOOKBACK_AGE)); + props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(3500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(4000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Long.toString(10)); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, Boolean.TRUE.toString()); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java new file mode 100644 index 0000000000..33b7bd7465 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MultiTenantEventualIndexIT.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class MultiTenantEventualIndexIT extends ParallelStatsDisabledIT { + + private static final Logger LOG = LoggerFactory.getLogger(MultiTenantEventualIndexIT.class); + protected static final int MAX_LOOKBACK_AGE = 1000000; + private static final long WAIT_MS = 25000; + private static final int ROWS_PER_TENANT_PER_PHASE = 10; + private static final String[] TENANT_PREFIXES = { "AA_", "BB_", "CC_", "DD_", "EE_", "FF_" }; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(12); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Integer.toString(MAX_LOOKBACK_AGE)); + props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(3500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(4000)); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, Boolean.TRUE.toString()); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + private Connection getTenantConnection(String tenantId) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); + return DriverManager.getConnection(getUrl(), props); + } + + private Connection getGlobalConnection() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES); + return DriverManager.getConnection(getUrl(), props); + } + + private void waitForEventualConsistency() throws InterruptedException { + Thread.sleep(WAIT_MS); + } + + private String[] createTenants() { + String[] tenants = new String[TENANT_PREFIXES.length]; + for (int i = 0; i < TENANT_PREFIXES.length; i++) { + tenants[i] = TENANT_PREFIXES[i] + generateUniqueName(); + } + return tenants; + } + + private void insertRows(String[] tenants, String tableName, String phase, int startRow, + int endRow) throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + for (int j = startRow; j <= endRow; j++) { + conn.createStatement().execute( + String.format("UPSERT INTO %s(PK2, V1, V2) VALUES ('%s_r%d', '%s_v%d', '%s_d%d')", + tableName, phase, j, phase, j, phase, j)); + } + conn.commit(); + } + } + } + + private void updateRows(String[] tenants, String tableName, String phase, int startRow, + int endRow, String suffix) throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + for (int j = startRow; j <= endRow; j++) { + conn.createStatement() + .execute(String.format( + "UPSERT INTO %s(PK2, V1, V2) VALUES ('%s_r%d', '%s_v%d_%s', '%s_d%d_%s')", tableName, + phase, j, phase, j, suffix, phase, j, suffix)); + } + conn.commit(); + } + } + } + + private void deleteRows(String[] tenants, String tableName, String phase, int startRow, + int endRow) throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + for (int j = startRow; j <= endRow; j++) { + conn.createStatement() + .execute(String.format("DELETE FROM %s WHERE PK2 = '%s_r%d'", tableName, phase, j)); + } + conn.commit(); + } + } + } + + private void verifyRowCount(String[] tenants, String tableName, int expectedCount, String phase) + throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + ResultSet rs = conn.createStatement() + .executeQuery("SELECT COUNT(*) FROM " + tableName + " WHERE V1 IS NOT NULL"); + assertTrue(rs.next()); + assertEquals(phase + ": tenant " + tenant, expectedCount, rs.getInt(1)); + } + } + } + + private void verifyIndexLookup(String[] tenants, String tableName, String v1Value, + String expectedPk2, String expectedV2) throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + ResultSet rs = conn.createStatement() + .executeQuery("SELECT PK2, V2 FROM " + tableName + " WHERE V1 = '" + v1Value + "'"); + assertTrue("Row with V1=" + v1Value + " not found for " + tenant, rs.next()); + assertEquals(expectedPk2, rs.getString(1)); + if (expectedV2 != null) { + assertEquals(expectedV2, rs.getString(2)); + } + assertFalse(rs.next()); + } + } + } + + private void verifyNoResult(String[] tenants, String tableName, String v1Value, String message) + throws Exception { + for (String tenant : tenants) { + try (Connection conn = getTenantConnection(tenant)) { + ResultSet rs = conn.createStatement() + .executeQuery("SELECT PK2 FROM " + tableName + " WHERE V1 = '" + v1Value + "'"); + assertFalse(message + " for " + tenant, rs.next()); + } + } + } + + private int getRegionCount(Connection conn, String tableName) throws Exception { + List<HRegionLocation> regions = TestUtil.getAllTableRegions(conn, tableName); + return regions.size(); + } + + @Test + public void testBasicMultiTenantEventualIndex() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String tenantA = "TENANT_A_" + generateUniqueName(); + String tenantB = "TENANT_B_" + generateUniqueName(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL"); + } + + try (Connection connA = getTenantConnection(tenantA)) { + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'a1', 'x1')"); + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 'a2', 'x2')"); + connA.commit(); + } + + try (Connection connB = getTenantConnection(tenantB)) { + connB.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'b1', 'y1')"); + connB.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 'b2', 'y2')"); + connB.commit(); + } + + waitForEventualConsistency(); + + try (Connection connA = getTenantConnection(tenantA)) { + ResultSet rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a1'"); + assertTrue(rs.next()); + assertEquals("r1", rs.getString(1)); + assertEquals("a1", rs.getString(2)); + assertEquals("x1", rs.getString(3)); + assertFalse(rs.next()); + + rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a2'"); + assertTrue(rs.next()); + assertEquals("r2", rs.getString(1)); + assertEquals("a2", rs.getString(2)); + assertEquals("x2", rs.getString(3)); + assertFalse(rs.next()); + } + + try (Connection connB = getTenantConnection(tenantB)) { + ResultSet rs = connB.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'b1'"); + assertTrue(rs.next()); + assertEquals("r1", rs.getString(1)); + assertEquals("b1", rs.getString(2)); + assertEquals("y1", rs.getString(3)); + assertFalse(rs.next()); + } + + try (Connection globalConn = getGlobalConnection()) { + long rowCount = verifyIndexTable(tableName, indexName, globalConn); + assertEquals(4, rowCount); + } + } + + @Test + public void testMultiTenantDeleteAndUpsert() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String tenantA = "TENANT_A_" + generateUniqueName(); + String tenantB = "TENANT_B_" + generateUniqueName(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL"); + } + + try (Connection connA = getTenantConnection(tenantA)) { + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'a1', 'x1')"); + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 'a2', 'x2')"); + connA.commit(); + } + + try (Connection connB = getTenantConnection(tenantB)) { + connB.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'b1', 'y1')"); + connB.commit(); + } + + waitForEventualConsistency(); + + try (Connection connA = getTenantConnection(tenantA)) { + connA.createStatement().execute("DELETE FROM " + tableName + " WHERE PK2 = 'r1'"); + connA.commit(); + } + + try (Connection connB = getTenantConnection(tenantB)) { + connB.createStatement().execute( + "UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'b1_updated', 'y1_updated')"); + connB.commit(); + } + + waitForEventualConsistency(); + + try (Connection connA = getTenantConnection(tenantA)) { + ResultSet rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a1'"); + assertFalse("Deleted row should not be visible", rs.next()); + + rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a2'"); + assertTrue(rs.next()); + assertEquals("r2", rs.getString(1)); + assertFalse(rs.next()); + } + + try (Connection connB = getTenantConnection(tenantB)) { + ResultSet rs = connB.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'b1_updated'"); + assertTrue(rs.next()); + assertEquals("r1", rs.getString(1)); + assertEquals("y1_updated", rs.getString(3)); + assertFalse(rs.next()); + } + + try (Connection globalConn = getGlobalConnection()) { + long rowCount = verifyIndexTable(tableName, indexName, globalConn); + assertEquals(2, rowCount); + } + } + + @Test + public void testMultiTenantUncoveredEventualIndex() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String tenantA = "TENANT_A_" + generateUniqueName(); + String tenantB = "TENANT_B_" + generateUniqueName(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute( + "CREATE UNCOVERED INDEX " + indexName + " ON " + tableName + "(V1) CONSISTENCY=EVENTUAL"); + } + + try (Connection connA = getTenantConnection(tenantA)) { + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'a1', 'x1')"); + connA.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r2', 'a2', 'x2')"); + connA.commit(); + } + + try (Connection connB = getTenantConnection(tenantB)) { + connB.createStatement() + .execute("UPSERT INTO " + tableName + "(PK2, V1, V2) VALUES ('r1', 'b1', 'y1')"); + connB.commit(); + } + + waitForEventualConsistency(); + + try (Connection connA = getTenantConnection(tenantA)) { + ResultSet rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a1'"); + assertTrue(rs.next()); + assertEquals("r1", rs.getString(1)); + assertEquals("a1", rs.getString(2)); + assertEquals("x1", rs.getString(3)); + assertFalse(rs.next()); + } + + try (Connection connB = getTenantConnection(tenantB)) { + ResultSet rs = connB.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'b1'"); + assertTrue(rs.next()); + assertEquals("r1", rs.getString(1)); + assertEquals("b1", rs.getString(2)); + assertEquals("y1", rs.getString(3)); + assertFalse(rs.next()); + } + + try (Connection connA = getTenantConnection(tenantA)) { + connA.createStatement().execute("DELETE FROM " + tableName + " WHERE PK2 = 'r1'"); + connA.commit(); + } + + waitForEventualConsistency(); + + try (Connection connA = getTenantConnection(tenantA)) { + ResultSet rs = connA.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'a1'"); + assertFalse("Deleted row should not appear for tenant A", rs.next()); + } + + try (Connection connB = getTenantConnection(tenantB)) { + ResultSet rs = connB.createStatement() + .executeQuery("SELECT PK2, V1, V2 FROM " + tableName + " WHERE V1 = 'b1'"); + assertTrue("Tenant B row should still be visible", rs.next()); + assertEquals("r1", rs.getString(1)); + assertFalse(rs.next()); + } + + try (Connection globalConn = getGlobalConnection()) { + long rowCount = verifyIndexTable(tableName, indexName, globalConn); + assertEquals(2, rowCount); + } + } + + @Test + public void testMultiTenantCoveredIndexWithSplits() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String[] tenants = createTenants(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + + "(V1) INCLUDE (V2) CONSISTENCY=EVENTUAL"); + } + + // Phase 1: Insert 10 rows per tenant (60 total), single region + insertRows(tenants, tableName, "p1", 1, ROWS_PER_TENANT_PER_PHASE); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, ROWS_PER_TENANT_PER_PHASE, "Phase 1"); + verifyIndexLookup(tenants, tableName, "p1_v5", "p1_r5", "p1_d5"); + + try (Connection globalConn = getGlobalConnection()) { + TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("CC")); + assertEquals("Region count after split 1", 2, getRegionCount(globalConn, tableName)); + } + + // Phase 2: Insert 10 more rows per tenant after first split + insertRows(tenants, tableName, "p2", 1, ROWS_PER_TENANT_PER_PHASE); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE, "Phase 2"); + verifyIndexLookup(tenants, tableName, "p1_v3", "p1_r3", "p1_d3"); + verifyIndexLookup(tenants, tableName, "p2_v7", "p2_r7", "p2_d7"); + + try (Connection globalConn = getGlobalConnection()) { + TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("EE")); + assertEquals("Region count after split 2", 3, getRegionCount(globalConn, tableName)); + } + + insertRows(tenants, tableName, "p3", 1, 5); + updateRows(tenants, tableName, "p1", 1, 5, "upd"); + waitForEventualConsistency(); + // 10 (p1, 5 updated) + 10 (p2) + 5 (p3) = 25 unique PKs + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 5, "Phase 3"); + + verifyIndexLookup(tenants, tableName, "p1_v1_upd", "p1_r1", "p1_d1_upd"); + verifyIndexLookup(tenants, tableName, "p1_v5_upd", "p1_r5", "p1_d5_upd"); + verifyNoResult(tenants, tableName, "p1_v1", "Old p1_v1 should not exist"); + verifyNoResult(tenants, tableName, "p1_v5", "Old p1_v5 should not exist"); + verifyIndexLookup(tenants, tableName, "p1_v6", "p1_r6", "p1_d6"); + + deleteRows(tenants, tableName, "p2", 1, 2); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 3, "Phase 4"); + verifyNoResult(tenants, tableName, "p2_v1", "Deleted p2_r1 should not be visible via index"); + verifyNoResult(tenants, tableName, "p2_v2", "Deleted p2_r2 should not be visible via index"); + verifyIndexLookup(tenants, tableName, "p2_v3", "p2_r3", "p2_d3"); + + try (Connection globalConn = getGlobalConnection()) { + long rowCount = verifyIndexTable(tableName, indexName, globalConn); + assertEquals(23 * tenants.length, rowCount); + } + } + + @Test + public void testMultiTenantUncoveredIndexWithSplits() throws Exception { + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + String[] tenants = createTenants(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + " (TENANT_ID VARCHAR NOT NULL, PK2 VARCHAR NOT NULL, V1 VARCHAR, V2 VARCHAR" + + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, PK2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute( + "CREATE UNCOVERED INDEX " + indexName + " ON " + tableName + "(V1) CONSISTENCY=EVENTUAL"); + } + + insertRows(tenants, tableName, "p1", 1, ROWS_PER_TENANT_PER_PHASE); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, ROWS_PER_TENANT_PER_PHASE, "Phase 1"); + verifyIndexLookup(tenants, tableName, "p1_v5", "p1_r5", "p1_d5"); + + try (Connection globalConn = getGlobalConnection()) { + TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("CC")); + assertEquals(2, getRegionCount(globalConn, tableName)); + } + + insertRows(tenants, tableName, "p2", 1, ROWS_PER_TENANT_PER_PHASE); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE, "Phase 2"); + + try (Connection globalConn = getGlobalConnection()) { + TestUtil.splitTable(globalConn, tableName, Bytes.toBytes("EE")); + assertEquals(3, getRegionCount(globalConn, tableName)); + } + + insertRows(tenants, tableName, "p3", 1, 5); + updateRows(tenants, tableName, "p1", 1, 5, "upd"); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 5, "Phase 3"); + verifyIndexLookup(tenants, tableName, "p1_v3_upd", "p1_r3", "p1_d3_upd"); + verifyNoResult(tenants, tableName, "p1_v3", "Old value should not exist after update"); + + deleteRows(tenants, tableName, "p2", 1, 2); + waitForEventualConsistency(); + verifyRowCount(tenants, tableName, 2 * ROWS_PER_TENANT_PER_PHASE + 3, "Phase 4"); + verifyNoResult(tenants, tableName, "p2_v1", "Deleted row should not be visible"); + verifyIndexLookup(tenants, tableName, "p2_v5", "p2_r5", "p2_d5"); + + try (Connection globalConn = getGlobalConnection()) { + long rowCount = verifyIndexTable(tableName, indexName, globalConn); + assertEquals(23 * tenants.length, rowCount); + } + } + + @Test(timeout = 1800000) + @Ignore("too aggressive for jenkins builds") + public void testConcurrentUpsertsWithTableSplits() throws Exception { + int nThreads = 8; + final int batchSize = 100; + final int nRows = 777; + final int nIndexValues = 23; + final int nSplits = 3; + final int totalUpserts = 10000; + final String tableName = generateUniqueName(); + final String indexName1 = generateUniqueName(); + final String indexName2 = generateUniqueName(); + final String indexName3 = generateUniqueName(); + final String indexName4 = generateUniqueName(); + final String indexName5 = generateUniqueName(); + final String[] tenants = createTenants(); + + try (Connection globalConn = getGlobalConnection()) { + globalConn.createStatement() + .execute("CREATE TABLE " + tableName + + "(TENANT_ID VARCHAR NOT NULL, K2 INTEGER NOT NULL, V1 INTEGER, V2 INTEGER," + + " V3 INTEGER, V4 INTEGER," + " CONSTRAINT pk PRIMARY KEY (TENANT_ID, K2))" + + " MULTI_TENANT=true, COLUMN_ENCODED_BYTES=0"); + globalConn.createStatement().execute("CREATE INDEX " + indexName1 + " ON " + tableName + + "(V1) INCLUDE (V2, V3) CONSISTENCY=EVENTUAL"); + globalConn.createStatement().execute( + "CREATE UNCOVERED INDEX " + indexName2 + " ON " + tableName + "(V2) CONSISTENCY=EVENTUAL"); + globalConn.createStatement().execute("CREATE INDEX " + indexName3 + " ON " + tableName + + "(V3) INCLUDE (V1, V2) CONSISTENCY=EVENTUAL"); + globalConn.createStatement().execute( + "CREATE UNCOVERED INDEX " + indexName4 + " ON " + tableName + "(V4) CONSISTENCY=EVENTUAL"); + globalConn.createStatement().execute("CREATE INDEX " + indexName5 + " ON " + tableName + + "(V1, V2) INCLUDE (V3, V4) CONSISTENCY=EVENTUAL"); + } + + final CountDownLatch doneSignal = new CountDownLatch(nThreads); + Runnable[] runnables = new Runnable[nThreads]; + Thread.sleep(3000); + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + + for (int i = 0; i < nThreads; i++) { + final String tenant = tenants[i % tenants.length]; + runnables[i] = () -> { + try (Connection conn = getTenantConnection(tenant)) { + ThreadLocalRandom rand = ThreadLocalRandom.current(); + for (int j = 0; j < totalUpserts; j++) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES (" + (j % nRows) + ", " + + (rand.nextBoolean() ? null : (rand.nextInt() % nIndexValues)) + ", " + + (rand.nextBoolean() ? null : rand.nextInt()) + ", " + + (rand.nextBoolean() ? null : rand.nextInt()) + ", " + + (rand.nextBoolean() ? null : rand.nextInt()) + ")"); + if ((j % batchSize) == 0) { + conn.commit(); + } + } + conn.commit(); + } catch (SQLException e) { + LOG.warn("Exception during concurrent upsert for tenant {}", tenant, e); + } finally { + doneSignal.countDown(); + } + }; + } + + Thread splitThread = new Thread(() -> TestUtil.splitTable(getUrl(), tableName, nSplits, 8000)); + splitThread.start(); + for (int i = 0; i < nThreads; i++) { + if (i >= (nThreads - 4)) { + Thread.sleep(12000); + } + Thread t = new Thread(runnables[i]); + t.start(); + } + assertTrue("Ran out of time", doneSignal.await(350, TimeUnit.SECONDS)); + splitThread.join(10000); + LOG.info("Total upsert time: {} ms", EnvironmentEdgeManager.currentTimeMillis() - startTime); + + int expectedTotal = tenants.length * nRows; + List<String> allIndexes = + new ArrayList<>(Arrays.asList(indexName1, indexName2, indexName3, indexName4, indexName5)); + Collections.shuffle(allIndexes, ThreadLocalRandom.current()); + LOG.info("Randomly selected indexes to verify: {}, {}", allIndexes.get(0), allIndexes.get(1)); + try (Connection globalConn = getGlobalConnection()) { + Thread.sleep(500000); + long rowCount = verifyIndexTable(tableName, allIndexes.get(0), globalConn, false); + assertEquals("Index " + allIndexes.get(0), expectedTotal, rowCount); + rowCount = verifyIndexTable(tableName, allIndexes.get(1), globalConn, false); + assertEquals("Index " + allIndexes.get(1), expectedTotal, rowCount); + } + } + +}
