This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch tmp-ec in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit a6f8d158b70e6249157f5669ff57ceac3e3de720 Author: Viraj Jasani <[email protected]> AuthorDate: Fri Mar 13 16:30:43 2026 -0700 addendum --- Jenkinsfile | 2 +- Jenkinsfile.yetus | 2 +- phoenix-core-client/src/main/antlr3/PhoenixSQL.g | 3 +- .../java/org/apache/phoenix/schema/PTable.java | 5 + .../org/apache/phoenix/util/CDCChangeBuilder.java | 71 ++++- .../src/main/protobuf/IndexMutations.proto | 8 + .../coprocessor/CDCGlobalIndexRegionScanner.java | 193 +++++++++---- .../phoenix/hbase/index/IndexCDCConsumer.java | 316 ++++++++++++++++++--- .../phoenix/hbase/index/IndexRegionObserver.java | 211 +++++++++----- .../java/org/apache/phoenix/end2end/Bson5IT.java | 20 +- ...currentMutationsCoveredEventualGenerateIT.java} | 38 ++- ... => ConcurrentMutationsExtendedGenerateIT.java} | 34 ++- .../end2end/ConcurrentMutationsExtendedIT.java | 10 +- .../ConcurrentMutationsExtendedIndexIT.java | 6 +- .../ConcurrentMutationsLazyPostBatchWriteIT.java | 6 +- ...rrentMutationsUncoveredEventualGenerateIT.java} | 38 ++- ...xToolForNonTxGlobalIndexEventualGenerateIT.java | 93 ++++++ .../IndexToolForNonTxGlobalIndexEventualIT.java | 91 ++++++ .../end2end/IndexToolForNonTxGlobalIndexIT.java | 29 +- .../phoenix/end2end/VarBinaryEncoded2IT.java | 8 +- .../GlobalIndexCheckerEventualGenerateIT.java | 89 ++++++ .../index/GlobalIndexCheckerEventualIT.java | 87 ++++++ .../end2end/index/GlobalIndexCheckerIT.java | 49 +++- .../java/org/apache/phoenix/query/BaseTest.java | 2 +- 24 files changed, 1200 insertions(+), 211 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 0308d61c2f..49a44d3766 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -66,7 +66,7 @@ pipeline { stage('BuildAndTest') { options { - timeout(time: 7, unit: 'HOURS') + timeout(time: 9, unit: 'HOURS') } steps { dir("HBASE_${HBASE_PROFILE}") { diff --git a/Jenkinsfile.yetus b/Jenkinsfile.yetus index cdd97f283e..7e99f45b05 100644 --- a/Jenkinsfile.yetus +++ b/Jenkinsfile.yetus @@ -37,7 +37,7 @@ pipeline { } options { - timeout(time: 7, unit: 'HOURS') + timeout(time: 9, unit: 'HOURS') } steps { diff --git a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g index 6eaefd5813..bb7d3d8a89 100644 --- a/phoenix-core-client/src/main/antlr3/PhoenixSQL.g +++ b/phoenix-core-client/src/main/antlr3/PhoenixSQL.g @@ -86,6 +86,7 @@ tokens POST='post'; CHANGE='change'; IDX_MUTATIONS='idx_mutations'; + DATA_ROW_STATE='data_row_state'; LATEST='latest'; ALL='all'; INDEX='index'; @@ -608,7 +609,7 @@ cdc_change_scopes returns [Set<CDCChangeScope> ret] ; cdc_change_scope returns [CDCChangeScope ret] - : v=(PRE | POST | CHANGE | IDX_MUTATIONS) + : v=(PRE | POST | CHANGE | IDX_MUTATIONS | DATA_ROW_STATE) { ret = CDCChangeScope.valueOf(v.getText().toUpperCase()); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java index bd314e00c9..effd7773ec 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java @@ -1162,5 +1162,10 @@ public interface PTable extends PMetaDataEntity { * Include index mutations for eventually consistent indexes. */ IDX_MUTATIONS, + + /** + * Include raw before/after data row states as serialized Puts for index mutation generation. + */ + DATA_ROW_STATE, } } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java index 34408cfbf1..880a0a327b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/CDCChangeBuilder.java @@ -26,9 +26,12 @@ import static org.apache.phoenix.query.QueryConstants.CDC_TTL_DELETE_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import org.apache.hadoop.hbase.Cell; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.CDCTableInfo; import org.apache.phoenix.schema.PTable; @@ -37,6 +40,7 @@ public class CDCChangeBuilder { private final boolean isPreImageInScope; private final boolean isPostImageInScope; private final boolean isIdxMutationsInScope; + private final boolean isDataRowStateInScope; private final CDCTableInfo cdcDataTableInfo; private String changeType; private long lastDeletedTimestamp; @@ -44,6 +48,12 @@ public class CDCChangeBuilder { private Map<String, Object> preImage = null; private Map<String, Object> changeImage = null; + private boolean isFullRowDelete; + private Map<ImmutableBytesPtr, Cell> rawLatestBeforeChange; + private Map<ImmutableBytesPtr, Cell> rawAtChange; + private Set<ImmutableBytesPtr> rawDeletedColumnsAtChange; + private Map<ImmutableBytesPtr, Long> rawDeletedColumnsBeforeChange; + public CDCChangeBuilder(CDCTableInfo cdcDataTableInfo) { this.cdcDataTableInfo = cdcDataTableInfo; Set<PTable.CDCChangeScope> changeScopes = cdcDataTableInfo.getIncludeScopes(); @@ -51,6 +61,7 @@ public class CDCChangeBuilder { isPreImageInScope = changeScopes.contains(PTable.CDCChangeScope.PRE); isPostImageInScope = changeScopes.contains(PTable.CDCChangeScope.POST); isIdxMutationsInScope = changeScopes.contains(PTable.CDCChangeScope.IDX_MUTATIONS); + isDataRowStateInScope = changeScopes.contains(PTable.CDCChangeScope.DATA_ROW_STATE); } public void initChange(long ts) { @@ -63,6 +74,13 @@ public class CDCChangeBuilder { if (isChangeImageInScope || isPostImageInScope) { changeImage = new HashMap<>(); } + if (isDataRowStateInScope) { + isFullRowDelete = false; + rawLatestBeforeChange = new LinkedHashMap<>(); + rawAtChange = new LinkedHashMap<>(); + rawDeletedColumnsAtChange = new HashSet<>(); + rawDeletedColumnsBeforeChange = new HashMap<>(); + } } public long getChangeTimestamp() { @@ -79,6 +97,9 @@ public class CDCChangeBuilder { public void markAsDeletionEvent() { changeType = CDC_DELETE_EVENT_TYPE; + if (isDataRowStateInScope) { + isFullRowDelete = true; + } } public long getLastDeletedTimestamp() { @@ -143,9 +164,49 @@ public class CDCChangeBuilder { } public boolean isOlderThanChange(Cell cell) { - return (cell.getTimestamp() < changeTimestamp && cell.getTimestamp() > lastDeletedTimestamp) - ? true - : false; + return cell.getTimestamp() < changeTimestamp && cell.getTimestamp() > lastDeletedTimestamp; + } + + public void registerRawPut(Cell cell, ImmutableBytesPtr colKey) { + if (cell.getTimestamp() == changeTimestamp) { + rawAtChange.putIfAbsent(colKey, cell); + } else if (isOlderThanChange(cell)) { + Long colDeleteTs = rawDeletedColumnsBeforeChange.get(colKey); + if ( + (colDeleteTs == null || cell.getTimestamp() > colDeleteTs) + && !rawLatestBeforeChange.containsKey(colKey) + ) { + rawLatestBeforeChange.put(colKey, cell); + } + } + } + + public void registerRawDeleteColumn(Cell cell, ImmutableBytesPtr colKey) { + if (cell.getTimestamp() == changeTimestamp) { + rawDeletedColumnsAtChange.add(colKey); + } else if (isOlderThanChange(cell)) { + rawDeletedColumnsBeforeChange.putIfAbsent(colKey, cell.getTimestamp()); + } + } + + public boolean hasValidDataRowStateChanges() { + return isFullRowDelete || !rawAtChange.isEmpty() || !rawDeletedColumnsAtChange.isEmpty(); + } + + public boolean isFullRowDelete() { + return isFullRowDelete; + } + + public Map<ImmutableBytesPtr, Cell> getRawLatestBeforeChange() { + return rawLatestBeforeChange; + } + + public Map<ImmutableBytesPtr, Cell> getRawAtChange() { + return rawAtChange; + } + + public Set<ImmutableBytesPtr> getRawDeletedColumnsAtChange() { + return rawDeletedColumnsAtChange; } public boolean isPreImageInScope() { @@ -164,4 +225,8 @@ public class CDCChangeBuilder { return isIdxMutationsInScope; } + public boolean isDataRowStateInScope() { + return isDataRowStateInScope; + } + } diff --git a/phoenix-core-client/src/main/protobuf/IndexMutations.proto b/phoenix-core-client/src/main/protobuf/IndexMutations.proto index eb5dc2bbce..cba4360dd7 100644 --- a/phoenix-core-client/src/main/protobuf/IndexMutations.proto +++ b/phoenix-core-client/src/main/protobuf/IndexMutations.proto @@ -29,3 +29,11 @@ message IndexMutations { repeated bytes tables = 1; repeated bytes mutations = 2; } + +// Raw data row states for generating index mutations. +// Contains the data row key and serialized MutationProto for the before and after states. +message DataRowStates { + optional bytes dataRowKey = 1; + optional bytes currentDataRowState = 2; + optional bytes nextDataRowState = 3; +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java index c096c443ac..da196c83c8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Arrays; import java.util.Base64; @@ -29,11 +30,13 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilder; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; @@ -62,6 +65,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * CDC (Change Data Capture) enabled region scanner for global indexes that processes uncovered CDC * index queries by reconstructing CDC events from index and data table rows. @@ -90,6 +96,7 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann private static final Logger LOGGER = LoggerFactory.getLogger(CDCGlobalIndexRegionScanner.class); private CDCTableInfo cdcDataTableInfo; private CDCChangeBuilder changeBuilder; + private static final byte[] SEPARATOR = { 0 }; private static final byte[] EMPTY_IDX_MUTATIONS = PVarchar.INSTANCE.toBytes(Base64.getEncoder() .encodeToString(IndexMutationsProtos.IndexMutations.getDefaultInstance().toByteArray())); @@ -110,8 +117,9 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { if ( - changeBuilder.isIdxMutationsInScope() && !changeBuilder.isChangeImageInScope() - && !changeBuilder.isPreImageInScope() && !changeBuilder.isPostImageInScope() + changeBuilder.isIdxMutationsInScope() && !changeBuilder.isDataRowStateInScope() + && !changeBuilder.isChangeImageInScope() && !changeBuilder.isPreImageInScope() + && !changeBuilder.isPostImageInScope() ) { return null; } @@ -125,7 +133,11 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( // scan.getStopRow(), 0, SortOrder.getDefault()); // } - return CDCUtil.setupScanForCDC(prepareDataTableScan(dataRowKeys, true)); + Scan dataScan = prepareDataTableScan(dataRowKeys, true); + if (dataScan == null) { + return null; + } + return CDCUtil.setupScanForCDC(dataScan); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { @@ -173,59 +185,74 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann // marker after that. changeBuilder.setLastDeletedTimestamp(cell.getTimestamp()); } - } else if ( - (cell.getType() == Cell.Type.DeleteColumn || cell.getType() == Cell.Type.Put) - && !Arrays.equals(cellQual, emptyCQ) - ) { - if (!changeBuilder.isChangeRelevant(cell)) { - // We don't need to build the change image, just skip it. - continue; - } - // In this case, cell is the row, meaning we loop over rows.. - if (isSingleCell) { - while (curColumnNum < cdcColumnInfoList.size()) { - boolean hasValue = dataTableProjector.getSchema().extractValue(cell, - (SingleCellColumnExpression) expressions[curColumnNum], ptr); - if (hasValue) { - Object cellValue = getColumnValue(ptr.get(), ptr.getOffset(), ptr.getLength(), - cdcColumnInfoList.get(curColumnNum).getColumnType()); - changeBuilder.registerChange(cell, curColumnNum, cellValue); + } else + if (cell.getType() == Cell.Type.DeleteColumn || cell.getType() == Cell.Type.Put) { + boolean isEmptyCQ = Arrays.equals(cellQual, emptyCQ); + if (changeBuilder.isDataRowStateInScope()) { + ImmutableBytesPtr colKey = + new ImmutableBytesPtr(Bytes.add(cellFam, SEPARATOR, cellQual)); + if (cell.getType() == Cell.Type.DeleteColumn) { + changeBuilder.registerRawDeleteColumn(cell, colKey); + } else { + changeBuilder.registerRawPut(cell, colKey); } - ++curColumnNum; } - break cellLoop; - } - while (true) { - CDCTableInfo.CDCColumnInfo currentColumnInfo = cdcColumnInfoList.get(curColumnNum); - int columnComparisonResult = - CDCUtil.compareCellFamilyAndQualifier(cellFam, cellQual, - currentColumnInfo.getColumnFamily(), currentColumnInfo.getColumnQualifier()); - if (columnComparisonResult > 0) { - if (++curColumnNum >= cdcColumnInfoList.size()) { - // Have no more column definitions, so the rest of the cells - // must be for dropped columns and so can be ignored. - break cellLoop; - } - // Continue looking for the right column definition - // for this cell. + if ( + isEmptyCQ || (changeBuilder.isDataRowStateInScope() + && !changeBuilder.isChangeImageInScope() && !changeBuilder.isPreImageInScope() + && !changeBuilder.isPostImageInScope()) + ) { continue; - } else if (columnComparisonResult < 0) { - // We didn't find a column definition for this cell, ignore the - // current cell but continue working on the rest of the cells. - continue cellLoop; } + // In this case, cell is the row, meaning we loop over rows.. + if (isSingleCell) { + while (curColumnNum < cdcColumnInfoList.size()) { + boolean hasValue = dataTableProjector.getSchema().extractValue(cell, + (SingleCellColumnExpression) expressions[curColumnNum], ptr); + if (hasValue) { + Object cellValue = getColumnValue(ptr.get(), ptr.getOffset(), ptr.getLength(), + cdcColumnInfoList.get(curColumnNum).getColumnType()); + changeBuilder.registerChange(cell, curColumnNum, cellValue); + } + ++curColumnNum; + } + break cellLoop; + } + while (true) { + CDCTableInfo.CDCColumnInfo currentColumnInfo = + cdcColumnInfoList.get(curColumnNum); + int columnComparisonResult = + CDCUtil.compareCellFamilyAndQualifier(cellFam, cellQual, + currentColumnInfo.getColumnFamily(), currentColumnInfo.getColumnQualifier()); + if (columnComparisonResult > 0) { + if (++curColumnNum >= cdcColumnInfoList.size()) { + // Have no more column definitions, so the rest of the cells + // must be for dropped columns and so can be ignored. + break cellLoop; + } + // Continue looking for the right column definition + // for this cell. + continue; + } else if (columnComparisonResult < 0) { + // We didn't find a column definition for this cell, ignore the + // current cell but continue working on the rest of the cells. + continue cellLoop; + } - // else, found the column definition. - Object cellValue = cell.getType() == Cell.Type.DeleteColumn - ? null - : getColumnValue(cell, cdcColumnInfoList.get(curColumnNum).getColumnType()); - changeBuilder.registerChange(cell, curColumnNum, cellValue); - // Done processing the current cell, check the next cell. - break; + // else, found the column definition. + Object cellValue = cell.getType() == Cell.Type.DeleteColumn + ? null + : getColumnValue(cell, cdcColumnInfoList.get(curColumnNum).getColumnType()); + changeBuilder.registerChange(cell, curColumnNum, cellValue); + // Done processing the current cell, check the next cell. + break; + } } - } } - if (changeBuilder.isNonEmptyEvent()) { + if (changeBuilder.isDataRowStateInScope()) { + buildDataRowStateResult(dataRowKey.copyBytesIfNecessary(), indexRowKey, indexCell, + result); + } else if (changeBuilder.isNonEmptyEvent()) { Result cdcRow = getCDCImage(indexRowKey, indexCell); if (cdcRow != null && tupleProjector != null) { if (indexCell.getType() == Cell.Type.DeleteFamily) { @@ -250,7 +277,12 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann result.clear(); } } else { - result.clear(); + if (changeBuilder.isDataRowStateInScope()) { + buildDataRowStateResult(dataRowKey.copyBytesIfNecessary(), indexRowKey, indexCell, + result); + } else { + result.clear(); + } } return true; @@ -336,6 +368,67 @@ public class CDCGlobalIndexRegionScanner extends UncoveredGlobalIndexRegionScann return true; } + /** + * Builds a DataRowStates protobuf result from the raw cell maps collected by CDCChangeBuilder + * during the raw cell iteration. Constructs before/after HBase Put objects representing the row + * state before and after the change, serializes them, and populates the result. If no valid + * changes were found at the change timestamp (data not yet visible), only the dataRowKey is + * included, the consumer needs to retry. + * @param dataRowKey The data table row key bytes. + * @param indexRowKey The CDC index row key. + * @param indexCell The index cell. + * @param result The result list to populate. + * @throws IOException if serialization fails. + */ + private void buildDataRowStateResult(byte[] dataRowKey, byte[] indexRowKey, Cell indexCell, + List<Cell> result) throws IOException { + Put currentDataRowState = null; + Put nextDataRowState = null; + Map<ImmutableBytesPtr, Cell> latestBeforeChange = changeBuilder.getRawLatestBeforeChange(); + Map<ImmutableBytesPtr, Cell> atChange = changeBuilder.getRawAtChange(); + Set<ImmutableBytesPtr> deletedColumnsAtChange = changeBuilder.getRawDeletedColumnsAtChange(); + if (changeBuilder.hasValidDataRowStateChanges()) { + if (!latestBeforeChange.isEmpty()) { + currentDataRowState = new Put(dataRowKey); + for (Cell cell : latestBeforeChange.values()) { + currentDataRowState.add(cell); + } + } + if (!changeBuilder.isFullRowDelete()) { + Put nextState = new Put(dataRowKey); + for (Map.Entry<ImmutableBytesPtr, Cell> entry : latestBeforeChange.entrySet()) { + if ( + !atChange.containsKey(entry.getKey()) + && !deletedColumnsAtChange.contains(entry.getKey()) + ) { + nextState.add(entry.getValue()); + } + } + for (Cell cell : atChange.values()) { + nextState.add(cell); + } + if (!nextState.isEmpty()) { + nextDataRowState = nextState; + } + } + } + IndexMutationsProtos.DataRowStates.Builder builder = + IndexMutationsProtos.DataRowStates.newBuilder(); + builder.setDataRowKey(ByteString.copyFrom(dataRowKey)); + if (currentDataRowState != null) { + builder.setCurrentDataRowState(ByteString.copyFrom( + ProtobufUtil.toMutation(ClientProtos.MutationProto.MutationType.PUT, currentDataRowState) + .toByteArray())); + } + if (nextDataRowState != null) { + builder.setNextDataRowState(ByteString.copyFrom(ProtobufUtil + .toMutation(ClientProtos.MutationProto.MutationType.PUT, nextDataRowState).toByteArray())); + } + String base64String = Base64.getEncoder().encodeToString(builder.build().toByteArray()); + byte[] cdcEventBytes = PVarchar.INSTANCE.toBytes(base64String); + addResult(indexRowKey, indexCell, result, indexCell, cdcEventBytes); + } + /** * Handles CDC event when IDX_MUTATIONS scope is enabled. Returns the index mutations as a * serialized IndexMutations, or an empty proto if no mutations are present. Skips the data table diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java index c940765499..5616887c46 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexCDCConsumer.java @@ -30,9 +30,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment; @@ -41,6 +41,7 @@ import org.apache.phoenix.coprocessorclient.MetaDataProtocol; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.query.QueryConstants; @@ -105,12 +106,26 @@ public class IndexCDCConsumer implements Runnable { private static final long DEFAULT_POLL_INTERVAL_MS = 1000; /** - * The time buffer in milliseconds subtracted from current time when querying CDC mutations. This - * buffer helps avoid reading mutations that are too recent. + * The time buffer in milliseconds subtracted from current time when querying CDC mutations to + * help avoid reading mutations that are too recent. */ public static final String INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS = "phoenix.index.cdc.consumer.timestamp.buffer.ms"; - private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 1000; + private static final long DEFAULT_TIMESTAMP_BUFFER_MS = 25000; + + /** + * Maximum number of retries when CDC events exist but the corresponding data table mutations are + * not yet visible (or permanently failed). After exceeding this limit, the consumer advances past + * the unprocessable events to avoid blocking indefinitely. This is only used for index mutation + * generation approach (serializeCDCMutations = false). + */ + public static final String INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES = + "phoenix.index.cdc.consumer.max.data.visibility.retries"; + private static final int DEFAULT_MAX_DATA_VISIBILITY_RETRIES = 15; + + public static final String INDEX_CDC_CONSUMER_RETRY_PAUSE_MS = + "phoenix.index.cdc.consumer.retry.pause.ms"; + private static final long DEFAULT_RETRY_PAUSE_MS = 2000; private final RegionCoprocessorEnvironment env; private final String dataTableName; @@ -121,26 +136,31 @@ public class IndexCDCConsumer implements Runnable { private final int batchSize; private final long pollIntervalMs; private final long timestampBufferMs; + private final int maxDataVisibilityRetries; private final Configuration config; + private final boolean serializeCDCMutations; private volatile boolean stopped = false; private Thread consumerThread; private boolean hasParentPartitions = false; private PTable cachedDataTable; /** - * Creates a new IndexCDCConsumer for the given region. - * @param env region coprocessor environment. - * @param dataTableName name of the data table. - * @param serverName server name. + * Creates a new IndexCDCConsumer for the given region with configurable serialization mode. + * @param env region coprocessor environment. + * @param dataTableName name of the data table. + * @param serverName server name. + * @param serializeCDCMutations when true, consumes pre-serialized index mutations; when false, + * generates index mutations from data row states. * @throws IOException if the IndexWriter cannot be created. */ - public IndexCDCConsumer(RegionCoprocessorEnvironment env, String dataTableName, String serverName) - throws IOException { + public IndexCDCConsumer(RegionCoprocessorEnvironment env, String dataTableName, String serverName, + boolean serializeCDCMutations) throws IOException { this.env = env; this.dataTableName = dataTableName; this.encodedRegionName = env.getRegion().getRegionInfo().getEncodedName(); this.config = env.getConfiguration(); - this.pause = config.getLong(HConstants.HBASE_CLIENT_PAUSE, 300); + this.serializeCDCMutations = serializeCDCMutations; + this.pause = config.getLong(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, DEFAULT_RETRY_PAUSE_MS); this.startupDelayMs = config.getLong(INDEX_CDC_CONSUMER_STARTUP_DELAY_MS, DEFAULT_STARTUP_DELAY_MS); this.batchSize = config.getInt(INDEX_CDC_CONSUMER_BATCH_SIZE, DEFAULT_CDC_BATCH_SIZE); @@ -148,6 +168,8 @@ public class IndexCDCConsumer implements Runnable { config.getLong(INDEX_CDC_CONSUMER_POLL_INTERVAL_MS, DEFAULT_POLL_INTERVAL_MS); this.timestampBufferMs = config.getLong(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, DEFAULT_TIMESTAMP_BUFFER_MS); + this.maxDataVisibilityRetries = config.getInt(INDEX_CDC_CONSUMER_MAX_DATA_VISIBILITY_RETRIES, + DEFAULT_MAX_DATA_VISIBILITY_RETRIES); DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION); this.indexWriter = @@ -249,8 +271,13 @@ public class IndexCDCConsumer implements Runnable { while (!stopped) { try { long previousTimestamp = lastProcessedTimestamp; - lastProcessedTimestamp = - processCDCBatch(encodedRegionName, encodedRegionName, lastProcessedTimestamp, false); + if (serializeCDCMutations) { + lastProcessedTimestamp = + processCDCBatch(encodedRegionName, encodedRegionName, lastProcessedTimestamp, false); + } else { + lastProcessedTimestamp = processCDCBatchGenerated(encodedRegionName, encodedRegionName, + lastProcessedTimestamp, false); + } if (lastProcessedTimestamp == previousTimestamp) { sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount)); } else { @@ -605,8 +632,14 @@ public class IndexCDCConsumer implements Runnable { currentLastProcessedTimestamp = getParentProgress(partitionId); } } - long newTimestamp = - processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); + long newTimestamp; + if (serializeCDCMutations) { + newTimestamp = + processCDCBatch(partitionId, ownerPartitionId, currentLastProcessedTimestamp, true); + } else { + newTimestamp = processCDCBatchGenerated(partitionId, ownerPartitionId, + currentLastProcessedTimestamp, true); + } batchCount++; retryCount = 0; if (newTimestamp == currentLastProcessedTimestamp) { @@ -697,18 +730,7 @@ public class IndexCDCConsumer implements Runnable { dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp); try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) { - PTable dataTable = getDataTable(conn); - String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false); - if (cdcObjectName == null) { - throw new SQLException("No CDC object found for table " + dataTableName); - } - String schemaName = dataTable.getSchemaName().getString(); - if (schemaName == null || schemaName.isEmpty()) { - cdcObjectName = "\"" + cdcObjectName + "\""; - } else { - cdcObjectName = - "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + cdcObjectName + "\""; - } + String cdcObjectName = getCdcObjectName(conn); String cdcQuery; if (isParentReplay) { cdcQuery = String @@ -728,15 +750,7 @@ public class IndexCDCConsumer implements Runnable { int retryCount = 0; while (hasMoreRows && batchMutations.isEmpty()) { try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { - ps.setString(1, partitionId); - ps.setDate(2, new Date(newLastTimestamp)); - if (isParentReplay) { - ps.setInt(3, batchSize); - } else { - long currentTime = EnvironmentEdgeManager.currentTimeMillis() - timestampBufferMs; - ps.setDate(3, new Date(currentTime)); - ps.setInt(4, batchSize); - } + setStatementParams(partitionId, isParentReplay, newLastTimestamp, ps); Pair<Long, Boolean> result = getMutationsAndTimestamp(ps, newLastTimestamp, batchMutations); hasMoreRows = result.getSecond(); @@ -778,6 +792,236 @@ public class IndexCDCConsumer implements Runnable { } } + private String getCdcObjectName(PhoenixConnection conn) throws SQLException { + PTable dataTable = getDataTable(conn); + String cdcObjectName = CDCUtil.getCDCObjectName(dataTable, false); + if (cdcObjectName == null) { + throw new SQLException("No CDC object found for table " + dataTableName); + } + String schemaName = dataTable.getSchemaName().getString(); + if (schemaName == null || schemaName.isEmpty()) { + cdcObjectName = "\"" + cdcObjectName + "\""; + } else { + cdcObjectName = + "\"" + schemaName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + cdcObjectName + "\""; + } + return cdcObjectName; + } + + /** + * Processes a batch of CDC events for the given partition starting from the specified timestamp + * by generating index mutations from data row states. This method queries the CDC index with the + * DATA_ROW_STATE scope, which triggers a server-side data table scan to reconstruct the + * before-image ({@code currentDataRowState}) and after-image ({@code nextDataRowState}) for each + * change. + * @param partitionId the partition (region) ID to process CDC events for. + * @param ownerPartitionId the owner partition ID. + * @param lastProcessedTimestamp the timestamp to start processing CDC events from. + * @param isParentReplay true if replaying a closed parent partition. + * @return the new last processed timestamp after this batch, or the same timestamp if no new + * records were found. + * @throws SQLException if a SQL error occurs. + * @throws IOException if an I/O error occurs. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + private long processCDCBatchGenerated(String partitionId, String ownerPartitionId, + long lastProcessedTimestamp, boolean isParentReplay) + throws SQLException, IOException, InterruptedException { + LOG.debug( + "Processing CDC batch (generated mode) for table {} partition {} owner {} from timestamp {}", + dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp); + try (PhoenixConnection conn = + QueryUtil.getConnectionOnServer(config).unwrap(PhoenixConnection.class)) { + String cdcObjectName = getCdcObjectName(conn); + String cdcQuery; + if (isParentReplay) { + cdcQuery = String + .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + } else { + cdcQuery = String + .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() > ? " + + "AND PHOENIX_ROW_TIMESTAMP() < ? " + + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC LIMIT ?", cdcObjectName); + } + + List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates = new ArrayList<>(); + long newLastTimestamp = lastProcessedTimestamp; + long[] lastScannedTimestamp = { lastProcessedTimestamp }; + boolean hasMoreRows = true; + int retryCount = 0; + while (hasMoreRows && batchStates.isEmpty()) { + try (PreparedStatement ps = conn.prepareStatement(cdcQuery)) { + setStatementParams(partitionId, isParentReplay, newLastTimestamp, ps); + Pair<Long, Boolean> result = + getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + hasMoreRows = result.getSecond(); + if (hasMoreRows) { + if (!batchStates.isEmpty()) { + newLastTimestamp = result.getFirst(); + } else if (retryCount >= maxDataVisibilityRetries) { + LOG.warn( + "Skipping CDC events for table {} partition {} from timestamp {}" + + " to {} after {} retries — data table mutations may have failed", + dataTableName, partitionId, newLastTimestamp, lastScannedTimestamp[0], retryCount); + newLastTimestamp = lastScannedTimestamp[0]; + break; + } else { + // CDC index entries are written but the data is not yet visible. + // Don't advance newLastTimestamp so the same events are re-fetched + // once the data becomes visible. + sleepIfNotStopped(ConnectionUtils.getPauseTime(pause, ++retryCount)); + } + } + } + } + if (newLastTimestamp > lastProcessedTimestamp) { + String sameTimestampQuery = String + .format("SELECT /*+ CDC_INCLUDE(DATA_ROW_STATE) */ PHOENIX_ROW_TIMESTAMP(), \"CDC JSON\" " + + "FROM %s WHERE PARTITION_ID() = ? AND PHOENIX_ROW_TIMESTAMP() = ? " + + "ORDER BY PARTITION_ID() ASC, PHOENIX_ROW_TIMESTAMP() ASC", cdcObjectName); + final long timestampToRefetch = newLastTimestamp; + batchStates.removeIf(pair -> pair.getFirst() == timestampToRefetch); + try (PreparedStatement ps = conn.prepareStatement(sameTimestampQuery)) { + ps.setString(1, partitionId); + ps.setDate(2, new Date(newLastTimestamp)); + Pair<Long, Boolean> result = + getDataRowStatesAndTimestamp(ps, newLastTimestamp, batchStates, lastScannedTimestamp); + newLastTimestamp = result.getFirst(); + if (batchStates.isEmpty()) { + newLastTimestamp = timestampToRefetch; + } else if (newLastTimestamp != timestampToRefetch) { + throw new IOException("Unexpected timestamp mismatch: expected " + timestampToRefetch + + " but got " + newLastTimestamp); + } + } + } + generateAndApplyIndexMutations(conn, batchStates, partitionId, ownerPartitionId, + newLastTimestamp); + if (newLastTimestamp > lastProcessedTimestamp) { + updateTrackerProgress(conn, partitionId, ownerPartitionId, newLastTimestamp, + PhoenixDatabaseMetaData.TRACKER_STATUS_IN_PROGRESS); + } + return newLastTimestamp; + } + } + + private void setStatementParams(String partitionId, boolean isParentReplay, long newLastTimestamp, + PreparedStatement ps) throws SQLException { + ps.setString(1, partitionId); + ps.setDate(2, new Date(newLastTimestamp)); + if (isParentReplay) { + ps.setInt(3, batchSize); + } else { + long currentTime = EnvironmentEdgeManager.currentTimeMillis() - timestampBufferMs; + ps.setDate(3, new Date(currentTime)); + ps.setInt(4, batchSize); + } + } + + private static Pair<Long, Boolean> getDataRowStatesAndTimestamp(PreparedStatement ps, + long initialLastTimestamp, List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates, + long[] lastScannedTimestamp) throws SQLException, IOException { + boolean hasRows = false; + long lastTimestamp = initialLastTimestamp; + lastScannedTimestamp[0] = initialLastTimestamp; + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + hasRows = true; + long rowTimestamp = rs.getDate(1).getTime(); + lastScannedTimestamp[0] = rowTimestamp; + String cdcValue = rs.getString(2); + if (cdcValue != null && !cdcValue.isEmpty()) { + byte[] protoBytes = Base64.getDecoder().decode(cdcValue); + IndexMutationsProtos.DataRowStates dataRowStates = + IndexMutationsProtos.DataRowStates.parseFrom(protoBytes); + if ( + dataRowStates.hasDataRowKey() + && (dataRowStates.hasCurrentDataRowState() || dataRowStates.hasNextDataRowState()) + ) { + batchStates.add(Pair.newPair(rowTimestamp, dataRowStates)); + lastTimestamp = rowTimestamp; + } + } + } + } + return Pair.newPair(lastTimestamp, hasRows); + } + + private void generateAndApplyIndexMutations(PhoenixConnection conn, + List<Pair<Long, IndexMutationsProtos.DataRowStates>> batchStates, String partitionId, + String ownerPartitionId, long lastProcessedTimestamp) throws SQLException, IOException { + if (batchStates.isEmpty()) { + return; + } + refreshDataTableCache(conn); + PTable dataTable = getDataTable(conn); + byte[] encodedRegionNameBytes = env.getRegion().getRegionInfo().getEncodedNameAsBytes(); + List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables = new ArrayList<>(); + for (PTable index : dataTable.getIndexes()) { + IndexConsistency consistency = index.getIndexConsistency(); + if (consistency != null && consistency.isAsynchronous()) { + IndexMaintainer maintainer = index.getIndexMaintainer(dataTable, conn); + HTableInterfaceReference tableRef = + new HTableInterfaceReference(new ImmutableBytesPtr(maintainer.getIndexTableName())); + indexTables.add(new Pair<>(maintainer, tableRef)); + } + } + if (indexTables.isEmpty()) { + return; + } + ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = ArrayListMultimap.create(); + int totalMutations = 0; + for (Pair<Long, IndexMutationsProtos.DataRowStates> entry : batchStates) { + long ts = entry.getFirst(); + IndexMutationsProtos.DataRowStates dataRowStates = entry.getSecond(); + byte[] dataRowKey = dataRowStates.getDataRowKey().toByteArray(); + ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(dataRowKey); + + Put currentDataRowState = null; + if (dataRowStates.hasCurrentDataRowState()) { + ClientProtos.MutationProto currentProto = ClientProtos.MutationProto + .parseFrom(dataRowStates.getCurrentDataRowState().toByteArray()); + Mutation currentMutation = ProtobufUtil.toMutation(currentProto); + if (currentMutation instanceof Put) { + currentDataRowState = (Put) currentMutation; + } + } + Put nextDataRowState = null; + if (dataRowStates.hasNextDataRowState()) { + ClientProtos.MutationProto nextProto = + ClientProtos.MutationProto.parseFrom(dataRowStates.getNextDataRowState().toByteArray()); + Mutation nextMutation = ProtobufUtil.toMutation(nextProto); + if (nextMutation instanceof Put) { + nextDataRowState = (Put) nextMutation; + } + } + if (currentDataRowState == null && nextDataRowState == null) { + continue; + } + IndexRegionObserver.generateIndexMutationsForRow(rowKeyPtr, currentDataRowState, + nextDataRowState, ts, encodedRegionNameBytes, QueryConstants.VERIFIED_BYTES, indexTables, + indexUpdates); + if (indexUpdates.size() >= batchSize) { + indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION); + totalMutations += indexUpdates.size(); + indexUpdates.clear(); + } + } + if (!indexUpdates.isEmpty()) { + indexWriter.write(indexUpdates, false, MetaDataProtocol.PHOENIX_VERSION); + totalMutations += indexUpdates.size(); + } + if (totalMutations > 0) { + LOG.debug( + "Applied total {} index mutations for table {} partition {} owner {} " + + ", last processed timestamp {}", + totalMutations, dataTableName, partitionId, ownerPartitionId, lastProcessedTimestamp); + } + } + private void executeIndexMutations(String partitionId, List<Pair<Long, IndexMutationsProtos.IndexMutations>> batchMutations, String ownerPartitionId, long lastProcessedTimestamp) throws SQLException, IOException { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index a3934f6b0f..48c823d258 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -178,6 +178,56 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { public static final String PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED = "phoenix.index.cdc.mutations.compress.enabled"; public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED = false; + /** + * Controls which approach is used for implementing eventually consistent global secondary indexes + * via the {@link IndexCDCConsumer}. + * <p> + * <b>Approach 1: Serialized mutations (default, value = true)</b> + * </p> + * <p> + * During {@code preBatchMutate}, {@link IndexRegionObserver} generates index mutations for each + * data table mutation and serializes them into a Protobuf {@code IndexMutations} message. This + * serialized payload is written as a column value in the CDC index table row alongside the CDC + * event. The {@link IndexCDCConsumer} later reads these pre-computed mutations from the CDC + * index, deserializes them, and applies them directly to the index table(s). In this approach, + * the consumer does not need to understand index structure or re-derive mutations — it simply + * replays what was already computed on the write path. The trade-off is increased CDC index row + * size due to the serialized mutation payload. + * </p> + * <p> + * <b>Approach 2: Generated mutations from data row states (value = false)</b> + * </p> + * <p> + * During {@code preBatchMutate}, {@link IndexRegionObserver} writes only a lightweight CDC index + * entry without serialized index mutations. Instead, the CDC event is created with the + * {@code DATA_ROW_STATE} scope. When the {@link IndexCDCConsumer} processes these events, it + * reads the CDC index rows which trigger a server-side scan of the data table (via + * {@code CDCGlobalIndexRegionScanner}) to reconstruct the before-image + * ({@code currentDataRowState}) and after-image ({@code nextDataRowState}) of the data row at the + * change timestamp. These raw row states are returned as a Protobuf {@code DataRowStates} + * message. The consumer then feeds these states into {@code generateIndexMutationsForRow()} — the + * same core utility used by {@link IndexRegionObserver#prepareIndexMutations} on the write path — + * to derive index mutations at consume time. This approach keeps CDC index rows small and + * generates mutations based on the current index definition, but requires an additional data + * table read per CDC event and is sensitive to data visibility timing. Make sure max lookback age + * is long enough to retain before and after images of the row. + * </p> + * <p> + * <b>When to use which approach:</b> + * </p> + * <ul> + * <li>Use <b>Approach 1</b> (serialize = true) when scanning each data table row at consume time + * could be an IO bottleneck, and slightly higher write-path latency due to index mutation + * serialization is acceptable.</li> + * <li>Use <b>Approach 2</b> (serialize = false) when uniform and predictable write latency is a + * strict requirement regardless of the number and type (covered or uncovered) of the eventually + * consistent global secondary indexes, and the additional data table point-lookup with raw scan + * per CDC event at consume time is not a big IO concern.</li> + * </ul> + */ + public static final String PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = + "phoenix.index.cdc.mutation.serialize"; + public static final boolean DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE = true; /** * Class to represent pending data table rows @@ -433,6 +483,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL; private boolean indexCDCConsumerEnabled = DEFAULT_PHOENIX_INDEX_CDC_CONSUMER_ENABLED; private boolean compressCDCMutations = DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED; + private boolean serializeCDCMutations = DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; private boolean isNamespaceEnabled = false; private boolean useBloomFilter = false; private long lastTimestamp = 0; @@ -494,6 +545,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { this.compressCDCMutations = env.getConfiguration().getBoolean(PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED, DEFAULT_PHOENIX_INDEX_CDC_MUTATIONS_COMPRESS_ENABLED); + this.serializeCDCMutations = env.getConfiguration().getBoolean( + PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, DEFAULT_PHOENIX_INDEX_CDC_MUTATION_SERIALIZE); this.isNamespaceEnabled = SchemaUtil.isNamespaceMappingEnabled(PTableType.INDEX, env.getConfiguration()); TableDescriptor tableDescriptor = env.getRegion().getTableDescriptor(); @@ -504,7 +557,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { this.indexCDCConsumerEnabled && !this.dataTableName.startsWith("SYSTEM.") && !this.dataTableName.startsWith("SYSTEM:") ) { - this.indexCDCConsumer = new IndexCDCConsumer(env, this.dataTableName, serverName); + this.indexCDCConsumer = + new IndexCDCConsumer(env, this.dataTableName, serverName, this.serializeCDCMutations); this.indexCDCConsumer.start(); } } catch (NoSuchMethodError ex) { @@ -1208,6 +1262,76 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { IndexMaintainer.DeleteType.ALL_VERSIONS, ts); } + public static void generateIndexMutationsForRow(ImmutableBytesPtr rowKeyPtr, + Put currentDataRowState, Put nextDataRowState, long ts, byte[] encodedRegionName, + byte[] emptyColumnValue, List<Pair<IndexMaintainer, HTableInterfaceReference>> indexTables, + ListMultimap<HTableInterfaceReference, Mutation> indexUpdates) throws IOException { + for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) { + IndexMaintainer indexMaintainer = pair.getFirst(); + HTableInterfaceReference hTableInterfaceReference = pair.getSecond(); + if ( + nextDataRowState != null && indexMaintainer.shouldPrepareIndexMutations(nextDataRowState) + ) { + ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(nextDataRowState); + Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, + nextDataRowVG, rowKeyPtr, ts, null, null, false, encodedRegionName); + if (indexPut == null) { + // No covered column. Just prepare an index row with the empty column + byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr, null, null, ts, + encodedRegionName); + indexPut = new Put(indexRowKey); + } else { + IndexUtil.removeEmptyColumn(indexPut, + indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier()); + } + byte[] finalEmptyColumnValue = + indexMaintainer.isUncovered() ? QueryConstants.UNVERIFIED_BYTES : emptyColumnValue; + indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), + indexMaintainer.getEmptyKeyValueQualifier(), ts, finalEmptyColumnValue); + indexUpdates.put(hTableInterfaceReference, indexPut); + if (!ignoreWritingDeleteColumnsToIndex) { + Delete deleteColumn = indexMaintainer.buildDeleteColumnMutation(indexPut, ts); + if (deleteColumn != null) { + indexUpdates.put(hTableInterfaceReference, deleteColumn); + } + } + // Delete the current index row if the new index key is different from the + // current one and the index is not a CDC index + if (currentDataRowState != null) { + ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState); + byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, + rowKeyPtr, null, null, ts, encodedRegionName); + if ( + !indexMaintainer.isCDCIndex() + && Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0 + ) { + Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, + IndexMaintainer.DeleteType.ALL_VERSIONS, ts); + indexUpdates.put(hTableInterfaceReference, del); + } + } + } else if ( + currentDataRowState != null + && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState) + ) { + if (indexMaintainer.isCDCIndex()) { + // CDC Index needs two a delete marker for referencing the data table + // delete mutation with the right index row key, that is, the index row key + // starting with ts + Put cdcDataRowState = new Put(currentDataRowState.getRow()); + cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), + indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, ByteUtil.EMPTY_BYTE_ARRAY); + indexUpdates.put(hTableInterfaceReference, getDeleteIndexMutation(cdcDataRowState, + indexMaintainer, ts, rowKeyPtr, encodedRegionName)); + } else { + indexUpdates.put(hTableInterfaceReference, getDeleteIndexMutation(currentDataRowState, + indexMaintainer, ts, rowKeyPtr, encodedRegionName)); + } + } + } + } + /** * Generate the index update for a data row from the mutation that are obtained by merging the * previous data row state with the pending row mutation. @@ -1220,6 +1344,12 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (indexMaintainer.isLocalIndex()) { continue; } + if ( + !serializeCDCMutations && indexMaintainer.getIndexConsistency() != null + && indexMaintainer.getIndexConsistency().isAsynchronous() + ) { + continue; + } HTableInterfaceReference hTableInterfaceReference = new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName())); indexTables.add(new Pair<>(indexMaintainer, hTableInterfaceReference)); @@ -1232,73 +1362,12 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (currentDataRowState == null && nextDataRowState == null) { continue; } - for (Pair<IndexMaintainer, HTableInterfaceReference> pair : indexTables) { - IndexMaintainer indexMaintainer = pair.getFirst(); - HTableInterfaceReference hTableInterfaceReference = pair.getSecond(); - if ( - nextDataRowState != null && indexMaintainer.shouldPrepareIndexMutations(nextDataRowState) - ) { - ValueGetter nextDataRowVG = new IndexUtil.SimpleValueGetter(nextDataRowState); - Put indexPut = indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, - nextDataRowVG, rowKeyPtr, ts, null, null, false, encodedRegionName); - if (indexPut == null) { - // No covered column. Just prepare an index row with the empty column - byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr, null, null, - ts, encodedRegionName); - indexPut = new Put(indexRowKey); - } else { - IndexUtil.removeEmptyColumn(indexPut, - indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), - indexMaintainer.getEmptyKeyValueQualifier()); - } - indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), - indexMaintainer.getEmptyKeyValueQualifier(), ts, QueryConstants.UNVERIFIED_BYTES); - context.indexUpdates.put(hTableInterfaceReference, - new Pair<Mutation, byte[]>(indexPut, rowKeyPtr.get())); - if (!ignoreWritingDeleteColumnsToIndex) { - Delete deleteColumn = indexMaintainer.buildDeleteColumnMutation(indexPut, ts); - if (deleteColumn != null) { - context.indexUpdates.put(hTableInterfaceReference, - new Pair<Mutation, byte[]>(deleteColumn, rowKeyPtr.get())); - } - } - // Delete the current index row if the new index key is different from the - // current one and the index is not a CDC index - if (currentDataRowState != null) { - ValueGetter currentDataRowVG = new IndexUtil.SimpleValueGetter(currentDataRowState); - byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, - rowKeyPtr, null, null, ts, encodedRegionName); - if ( - !indexMaintainer.isCDCIndex() - && Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0 - ) { - Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, - IndexMaintainer.DeleteType.ALL_VERSIONS, ts); - context.indexUpdates.put(hTableInterfaceReference, - new Pair<Mutation, byte[]>(del, rowKeyPtr.get())); - } - } - } else if ( - currentDataRowState != null - && indexMaintainer.shouldPrepareIndexMutations(currentDataRowState) - ) { - if (indexMaintainer.isCDCIndex()) { - // CDC Index needs two a delete marker for referencing the data table - // delete mutation with the right index row key, that is, the index row key - // starting with ts - Put cdcDataRowState = new Put(currentDataRowState.getRow()); - cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), - indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, - ByteUtil.EMPTY_BYTE_ARRAY); - context.indexUpdates.put(hTableInterfaceReference, - new Pair<Mutation, byte[]>(getDeleteIndexMutation(cdcDataRowState, indexMaintainer, - ts, rowKeyPtr, encodedRegionName), rowKeyPtr.get())); - } else { - context.indexUpdates.put(hTableInterfaceReference, - new Pair<Mutation, byte[]>(getDeleteIndexMutation(currentDataRowState, - indexMaintainer, ts, rowKeyPtr, encodedRegionName), rowKeyPtr.get())); - } - } + ListMultimap<HTableInterfaceReference, Mutation> idxUpdates = ArrayListMultimap.create(); + generateIndexMutationsForRow(rowKeyPtr, currentDataRowState, nextDataRowState, ts, + encodedRegionName, QueryConstants.UNVERIFIED_BYTES, indexTables, idxUpdates); + for (Map.Entry<HTableInterfaceReference, Mutation> idxUpdate : idxUpdates.entries()) { + context.indexUpdates.put(idxUpdate.getKey(), + new Pair<>(idxUpdate.getValue(), rowKeyPtr.get())); } } } @@ -1326,8 +1395,10 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>> create(); prepareIndexMutations(context, maintainers, batchTimestamp); - prepareEventuallyConsistentIndexMutations(context, batchTimestamp, maintainers, - compressCDCMutations); + if (serializeCDCMutations) { + prepareEventuallyConsistentIndexMutations(context, batchTimestamp, maintainers, + compressCDCMutations); + } context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation> create(); int updateCount = 0; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java index 5ca60cbd9c..3b68adb884 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson5IT.java @@ -17,6 +17,8 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -42,10 +44,13 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.ExplainPlanAttributes; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; import org.bson.BsonArray; import org.bson.BsonBinary; import org.bson.BsonDocument; @@ -54,19 +59,32 @@ import org.bson.BsonNull; import org.bson.BsonString; import org.bson.RawBsonDocument; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; /** * Tests for BSON with expression field key alias. */ -@Category(ParallelStatsDisabledTest.class) +@Category(NeedsOwnMiniClusterTest.class) public class Bson5IT extends ParallelStatsDisabledIT { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Integer.toString(60 * 60)); + props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + private static String getJsonString(String jsonFilePath) throws IOException { URL fileUrl = Bson5IT.class.getClassLoader().getResource(jsonFilePath); Preconditions.checkArgument(fileUrl != null, "File path " + jsonFilePath + " seems invalid"); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java similarity index 52% copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java index fd54b57ffb..1bf63c7579 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsCoveredEventualGenerateIT.java @@ -17,37 +17,55 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Arrays; +import java.util.Collection; import java.util.Map; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; -import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -/** - * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch write enabled. - */ @Category(NeedsOwnMiniClusterTest.class) -public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutationsExtendedIT { +public class ConcurrentMutationsCoveredEventualGenerateIT + extends ConcurrentMutationsExtendedIndexIT { + + private static final int MAX_LOOKBACK_AGE = 1000000; - public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean eventual) { + public ConcurrentMutationsCoveredEventualGenerateIT(boolean uncovered, boolean eventual) { super(uncovered, eventual); - Assume.assumeFalse("Only covered index supports lazy post batch write mode", uncovered); } @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(4); + Map<String, String> props = Maps.newHashMapWithExpectedSize(10); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); - props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true"); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put("hbase.rowlock.wait.duration", "100"); props.put("phoenix.index.concurrent.wait.duration.ms", "10"); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200)); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + + @Parameterized.Parameters(name = "uncovered={0}, eventual={1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { false, true } }); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java similarity index 55% copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java index fd54b57ffb..7a5cd7fb67 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedGenerateIT.java @@ -17,37 +17,51 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Arrays; +import java.util.Collection; import java.util.Map; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; -import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -/** - * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch write enabled. - */ @Category(NeedsOwnMiniClusterTest.class) -public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutationsExtendedIT { +public class ConcurrentMutationsExtendedGenerateIT extends ConcurrentMutationsExtendedIT { - public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean eventual) { + public ConcurrentMutationsExtendedGenerateIT(boolean uncovered, boolean eventual) { super(uncovered, eventual); - Assume.assumeFalse("Only covered index supports lazy post batch write mode", uncovered); } @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(4); + Map<String, String> props = Maps.newHashMapWithExpectedSize(10); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); - props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true"); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put("hbase.rowlock.wait.duration", "100"); props.put("phoenix.index.concurrent.wait.duration.ms", "10"); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2500)); + // props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200)); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + + @Parameterized.Parameters(name = "uncovered={0}, eventual={1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { false, true }, { true, true } }); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index ecbabb2473..671d130501 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -19,6 +19,7 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.end2end.IndexToolIT.verifyIndexTable; import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -95,6 +96,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(1000)); props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -177,7 +179,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { doneSignal.await(60, TimeUnit.SECONDS); if (eventual) { - Thread.sleep(15000); + Thread.sleep(35000); } verifyIndexTable(tableName, indexName, conn); } @@ -249,7 +251,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { doneSignal.await(60, TimeUnit.SECONDS); if (eventual) { - Thread.sleep(15000); + Thread.sleep(35000); } verifyIndexTable(tableName, indexName, conn); verifyIndexTable(tableName, singleCellindexName, conn); @@ -374,7 +376,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); assertNull(failedMsg[0], failedMsg[0]); if (eventual) { - Thread.sleep(15000); + Thread.sleep(35000); } long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertEquals(1, actualRowCount); @@ -435,7 +437,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS); if (eventual) { - Thread.sleep(15000); + Thread.sleep(35000); } long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName); assertEquals(1, actualRowCount); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java index 61ba4f02b5..531e036a9c 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIndexIT.java @@ -62,7 +62,8 @@ public abstract class ConcurrentMutationsExtendedIndexIT extends ParallelStatsDi ConcurrentMutationsExtendedIT.doSetup(); } - @Test(timeout = 2000000) + // This test is heavy and it might exhaust jenkins resources + @Test(timeout = 1800000) public void testConcurrentUpsertsWithTableSplits() throws Exception { int nThreads = 12; final int batchSize = 100; @@ -169,7 +170,8 @@ public abstract class ConcurrentMutationsExtendedIndexIT extends ParallelStatsDi assertEquals(nRows, actualRowCount); } - @Test(timeout = 5000000) + // This test is heavy and it might exhaust jenkins resources + @Test(timeout = 1800000) public void testConcurrentUpsertsWithTableSplitsMerges() throws Exception { Assume.assumeFalse(uncovered); int nThreads = 13; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java index fd54b57ffb..a6ecfa7e60 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.end2end; import java.util.Map; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.query.QueryServices; @@ -41,13 +42,16 @@ public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutations @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(4); + Map<String, String> props = Maps.newHashMapWithExpectedSize(8); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true"); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put("hbase.rowlock.wait.duration", "100"); props.put("phoenix.index.concurrent.wait.duration.ms", "10"); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java similarity index 52% copy from phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java copy to phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java index fd54b57ffb..5ecfc1c036 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsLazyPostBatchWriteIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsUncoveredEventualGenerateIT.java @@ -17,37 +17,55 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_BATCH_SIZE; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Arrays; +import java.util.Collection; import java.util.Map; +import org.apache.phoenix.coprocessor.PhoenixMasterObserver; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; -import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.ReadOnlyProps; -import org.junit.Assume; import org.junit.BeforeClass; import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -/** - * Test class that extends ConcurrentMutationsExtendedIT with lazy post batch write enabled. - */ @Category(NeedsOwnMiniClusterTest.class) -public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutationsExtendedIT { +public class ConcurrentMutationsUncoveredEventualGenerateIT + extends ConcurrentMutationsExtendedIndexIT { + + private static final int MAX_LOOKBACK_AGE = 1000000; - public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered, boolean eventual) { + public ConcurrentMutationsUncoveredEventualGenerateIT(boolean uncovered, boolean eventual) { super(uncovered, eventual); - Assume.assumeFalse("Only covered index supports lazy post batch write mode", uncovered); } @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(4); + Map<String, String> props = Maps.newHashMapWithExpectedSize(10); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); - props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true"); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); props.put("hbase.rowlock.wait.duration", "100"); props.put("phoenix.index.concurrent.wait.duration.ms", "10"); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + props.put(INDEX_CDC_CONSUMER_BATCH_SIZE, Integer.toString(4500)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(5000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(200)); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + props.put("hbase.coprocessor.master.classes", PhoenixMasterObserver.class.getName()); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + + @Parameterized.Parameters(name = "uncovered={0}, eventual={1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList(new Object[][] { { true, true } }); + } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java new file mode 100644 index 0000000000..533427174d --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualGenerateIT.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class IndexToolForNonTxGlobalIndexEventualGenerateIT extends IndexToolForNonTxGlobalIndexIT { + + public IndexToolForNonTxGlobalIndexEventualGenerateIT(boolean mutable, boolean singleCell) { + super(mutable, singleCell); + if (indexDDLOptions.trim().isEmpty()) { + indexDDLOptions = " CONSISTENCY=EVENTUAL"; + } else { + indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions; + } + } + + @Override + protected void waitForEventualConsistency() throws Exception { + Thread.sleep(18000); + } + + @BeforeClass + public static synchronized void setup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(13); + serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, + Long.toString(5)); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); + serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Long.toString(MAX_LOOKBACK_AGE)); + serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, + Long.toString(0)); + serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0)); + serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", Integer.toString(0)); + serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + serverProps.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); + serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5); + clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); + clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); + clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString()); + clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + Boolean.TRUE.toString()); + destroyDriver(); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1"); + } + + @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList( + new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java new file mode 100644 index 0000000000..287f95c57a --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexEventualIT.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class IndexToolForNonTxGlobalIndexEventualIT extends IndexToolForNonTxGlobalIndexIT { + + public IndexToolForNonTxGlobalIndexEventualIT(boolean mutable, boolean singleCell) { + super(mutable, singleCell); + if (indexDDLOptions.trim().isEmpty()) { + indexDDLOptions = " CONSISTENCY=EVENTUAL"; + } else { + indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions; + } + } + + @Override + protected void waitForEventualConsistency() throws Exception { + Thread.sleep(15000); + } + + @BeforeClass + public static synchronized void setup() throws Exception { + Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(12); + serverProps.put(QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB, Long.toString(20)); + serverProps.put(QueryServices.MAX_SERVER_METADATA_CACHE_TIME_TO_LIVE_MS_ATTRIB, + Long.toString(5)); + serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, + QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS); + serverProps.put(QueryServices.INDEX_REBUILD_PAGE_SIZE_IN_ROWS, Long.toString(8)); + serverProps.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, + Long.toString(MAX_LOOKBACK_AGE)); + serverProps.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + serverProps.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + serverProps.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, + Long.toString(0)); + serverProps.put("hbase.regionserver.rpc.retry.interval", Long.toString(0)); + serverProps.put("hbase.procedure.remote.dispatcher.delay.msec", Integer.toString(0)); + serverProps.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + serverProps.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + serverProps.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(5); + clientProps.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(true)); + clientProps.put(QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB, Long.toString(5)); + clientProps.put(QueryServices.TRANSACTIONS_ENABLED, Boolean.TRUE.toString()); + clientProps.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, Boolean.TRUE.toString()); + clientProps.put(QueryServices.SERVER_SIDE_IMMUTABLE_INDEXES_ENABLED_ATTRIB, + Boolean.TRUE.toString()); + destroyDriver(); + setUpTestDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), + new ReadOnlyProps(clientProps.entrySet().iterator())); + getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1"); + } + + @Parameterized.Parameters(name = "mutable={0}, singleCellIndex={1}") + public static synchronized Collection<Object[]> data() { + return Arrays.asList( + new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } }); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java index d6061a8380..5bf5c905ce 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForNonTxGlobalIndexIT.java @@ -128,11 +128,11 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; public class IndexToolForNonTxGlobalIndexIT extends BaseTest { public static final int MAX_LOOKBACK_AGE = 3600; - private final String tableDDLOptions; + protected final String tableDDLOptions; private final boolean useSnapshot = false; - private final boolean mutable; - private final String indexDDLOptions; + protected final boolean mutable; + protected String indexDDLOptions; private boolean singleCell; @Rule @@ -194,6 +194,9 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { getUtility().getConfiguration().set(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, "1"); } + protected void waitForEventualConsistency() throws Exception { + } + @After public void cleanup() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); @@ -239,15 +242,18 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { assertEquals(NROWS, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); assertTrue("Index rebuild failed!", indexTool.getJob().isSuccessful()); TestUtil.assertIndexState(conn, indexTableFullName, PIndexState.ACTIVE, null); + waitForEventualConsistency(); long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); IndexToolIT.setEveryNthRowWithNull(NROWS, 5, stmt); conn.commit(); + waitForEventualConsistency(); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); IndexToolIT.setEveryNthRowWithNull(NROWS, 7, stmt); conn.commit(); + waitForEventualConsistency(); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); @@ -444,6 +450,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { } assertEquals(0, indexTool.getJob().getCounters() .findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + waitForEventualConsistency(); long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(NROWS, actualRowCount); @@ -453,6 +460,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { IndexToolIT.upsertRow(stmt1, i); } conn.commit(); + waitForEventualConsistency(); indexTool = IndexToolIT.runIndexTool(useSnapshot, schemaName, dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.BOTH, new String[0]); assertEquals(2 * NROWS, @@ -491,6 +499,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { .findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters() .findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); + waitForEventualConsistency(); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(2 * NROWS, actualRowCount); } @@ -553,6 +562,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters() .findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue()); + waitForEventualConsistency(); long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(N_ROWS, actualRowCount); @@ -579,6 +589,7 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { null, 0, IndexTool.IndexVerifyType.BEFORE, new String[0]); assertEquals(0, indexTool.getJob().getCounters() .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); + waitForEventualConsistency(); actualRowCount = IndexScrutiny.scrutinizeIndex(conn, dataTableFullName, indexTableFullName); assertEquals(N_ROWS, actualRowCount); } @@ -586,6 +597,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testIndexToolVerifyBeforeAndBothOptions() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String schemaName = generateUniqueName(); @@ -629,6 +642,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testIndexToolVerifyAfterOption() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String schemaName = generateUniqueName(); @@ -842,6 +857,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testIndexToolForIncrementalVerify() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); ManualEnvironmentEdge customEdge = new ManualEnvironmentEdge(); String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); @@ -977,6 +994,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testIndexToolForIncrementalVerify_viewIndex() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); ManualEnvironmentEdge customeEdge = new ManualEnvironmentEdge(); String schemaName = generateUniqueName(); String dataTableName = generateUniqueName(); @@ -1387,6 +1406,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testUpdatablePKFilterViewIndexRebuild() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); if (!mutable) { return; } @@ -1456,6 +1477,8 @@ public class IndexToolForNonTxGlobalIndexIT extends BaseTest { @Test public void testUpdatableNonPkFilterViewIndexRebuild() throws Exception { + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); if (!mutable) { return; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java index bddd8d384a..6b05b7dd0a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/VarBinaryEncoded2IT.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.end2end; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import java.sql.Connection; @@ -47,7 +50,7 @@ import org.junit.runners.Parameterized; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; -@Category(ParallelStatsDisabledTest.class) +@Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class VarBinaryEncoded2IT extends ParallelStatsDisabledIT { @@ -72,6 +75,9 @@ public class VarBinaryEncoded2IT extends ParallelStatsDisabledIT { props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(60 * 60)); props.put(QueryServices.USE_STATS_FOR_PARALLELIZATION, Boolean.toString(false)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java new file mode 100644 index 0000000000..a862df07ad --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualGenerateIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; +import static org.apache.phoenix.hbase.index.IndexRegionObserver.PHOENIX_INDEX_CDC_MUTATION_SERIALIZE; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class GlobalIndexCheckerEventualGenerateIT extends GlobalIndexCheckerIT { + + public GlobalIndexCheckerEventualGenerateIT(boolean async, boolean encoded) { + super(async, encoded); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(10); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + props.put(PHOENIX_INDEX_CDC_MUTATION_SERIALIZE, Boolean.FALSE.toString()); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void appendEventualConsistency() { + if (indexDDLOptions.trim().isEmpty()) { + indexDDLOptions = " CONSISTENCY=EVENTUAL"; + } else { + indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions; + } + } + + @After + public void after() throws Exception { + } + + @Override + protected void waitForEventualConsistency() throws Exception { + Thread.sleep(18000); + } + + @Parameterized.Parameters(name = "async={0},encoded={1}") + public static synchronized Collection<Object[]> data() { + List<Object[]> list = Lists.newArrayListWithExpectedSize(4); + boolean[] Booleans = new boolean[] { true, false }; + for (boolean async : Booleans) { + for (boolean encoded : Booleans) { + list.add(new Object[] { async, encoded }); + } + } + return list; + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java new file mode 100644 index 0000000000..aee8564ab5 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerEventualIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end.index; + +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_RETRY_PAUSE_MS; +import static org.apache.phoenix.hbase.index.IndexCDCConsumer.INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; +import org.junit.runners.Parameterized; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +@Category(NeedsOwnMiniClusterTest.class) +public class GlobalIndexCheckerEventualIT extends GlobalIndexCheckerIT { + + public GlobalIndexCheckerEventualIT(boolean async, boolean encoded) { + super(async, encoded); + } + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(8); + props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); + props.put(INDEX_CDC_CONSUMER_TIMESTAMP_BUFFER_MS, Integer.toString(2000)); + props.put(INDEX_CDC_CONSUMER_RETRY_PAUSE_MS, Integer.toString(5)); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Integer.toString(-1)); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(2)); + props.put(QueryServices.TASK_HANDLING_INITIAL_DELAY_MS_ATTRIB, Long.toString(1)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void appendEventualConsistency() { + if (indexDDLOptions.trim().isEmpty()) { + indexDDLOptions = " CONSISTENCY=EVENTUAL"; + } else { + indexDDLOptions = " CONSISTENCY=EVENTUAL," + indexDDLOptions; + } + } + + @After + public void after() throws Exception { + } + + @Override + protected void waitForEventualConsistency() throws Exception { + Thread.sleep(15000); + } + + @Parameterized.Parameters(name = "async={0},encoded={1}") + public static synchronized Collection<Object[]> data() { + List<Object[]> list = Lists.newArrayListWithExpectedSize(4); + boolean[] Booleans = new boolean[] { true, false }; + for (boolean async : Booleans) { + for (boolean encoded : Booleans) { + list.add(new Object[] { async, encoded }); + } + } + return list; + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 67ccf665e2..8d7d4c546a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -85,8 +85,8 @@ import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; public class GlobalIndexCheckerIT extends BaseTest { private final boolean async; - private String indexDDLOptions; - private String tableDDLOptions; + protected String indexDDLOptions; + protected String tableDDLOptions; private StringBuilder optionBuilder; private StringBuilder indexOptionBuilder; private final boolean encoded; @@ -266,6 +266,7 @@ public class GlobalIndexCheckerIT extends BaseTest { + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; // Verify that we will read from the index table assertExplainPlan(conn, query, dataTableName, indexTableName); + waitForEventualConsistency(); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("bc", rs.getString(1)); @@ -300,6 +301,7 @@ public class GlobalIndexCheckerIT extends BaseTest { "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'de'"; // Verify that we will read from the index table assertExplainPlan(conn, query, dataTableName, indexTableName); + waitForEventualConsistency(); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("de", rs.getString(1)); @@ -322,6 +324,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg', 'efgh')"); conn.commit(); + waitForEventualConsistency(); // Write a query to get all the rows in the order of their timestamps query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString() @@ -365,6 +368,7 @@ public class GlobalIndexCheckerIT extends BaseTest { + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; assertExplainPlan(conn, query, dataTableName, indexTableName); + waitForEventualConsistency(); rs = conn.createStatement().executeQuery(query); assertTrue(rs.next()); assertEquals("ab", rs.getString(1)); @@ -565,6 +569,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.commit(); // Now the expected state of the index table is {('ab', 'a', 'abcc' , null), ('ab', 'b', null, // 'bcde')} + waitForEventualConsistency(); ResultSet rs = conn.createStatement().executeQuery("SELECT * from " + indexTableName); assertTrue(rs.next()); assertEquals("ab", rs.getString(1)); @@ -606,6 +611,7 @@ public class GlobalIndexCheckerIT extends BaseTest { String dml = "DELETE from " + dataTableName + " WHERE id = 'a'"; assertEquals(1, conn.createStatement().executeUpdate(dml)); conn.commit(); + waitForEventualConsistency(); // The index rows are actually not deleted yet because IndexRegionObserver failed delete // operation. However, they are @@ -653,6 +659,7 @@ public class GlobalIndexCheckerIT extends BaseTest { String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'"; // Verify that we will read from the index table assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + waitForEventualConsistency(); ResultSet rs = conn.createStatement().executeQuery(selectSql); assertTrue(rs.next()); assertEquals("a", rs.getString(1)); @@ -666,6 +673,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.commit(); // Verify that we will read from the index table assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + waitForEventualConsistency(); rs = conn.createStatement().executeQuery(selectSql); assertTrue(rs.next()); assertEquals("a", rs.getString(1)); @@ -795,6 +803,7 @@ public class GlobalIndexCheckerIT extends BaseTest { // run the index MR job. IndexToolIT.runIndexTool(false, null, dataTableName, indexTableName); } + waitForEventualConsistency(); // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update // phase) where the verify flag is set // to true and/or index rows are deleted and check that this does not impact the correctness @@ -805,6 +814,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd','cde')"); conn.commit(); + waitForEventualConsistency(); IndexTool indexTool = IndexToolIT.runIndexTool(false, "", dataTableName, indexTableName, null, 0, IndexTool.IndexVerifyType.ONLY); assertEquals(3, indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue()); @@ -824,8 +834,10 @@ public class GlobalIndexCheckerIT extends BaseTest { .findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters() .findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue()); - assertEquals(2, indexTool.getJob().getCounters() - .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); + if (!indexDDLOptions.contains("CONSISTENCY=EVENTUAL")) { + assertEquals(2, indexTool.getJob().getCounters() + .findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue()); + } assertEquals(0, indexTool.getJob().getCounters() .findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters() @@ -872,6 +884,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val1, val2) values ('a', 'ab','abc')"); conn.commit(); + waitForEventualConsistency(); // At this moment val3 in the data table row has null value String selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'"; // Verify that we will read from the index table @@ -919,6 +932,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " values ('a', 'ab','abc', 'abcd')"); conn.commit(); + waitForEventualConsistency(); // At this moment val3 in the data table row should not have null value selectSql = "SELECT val3 from " + dataTableName + " WHERE val1 = 'ab'"; // Verify that we will read from the index table @@ -932,6 +946,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab','abcde')"); commitWithException(conn); + waitForEventualConsistency(); // The above upsert will create an unverified index row // Configure IndexRegionObserver to allow the data write phase IndexRegionObserver.setFailDataTableUpdatesForTesting(false); @@ -983,6 +998,7 @@ public class GlobalIndexCheckerIT extends BaseTest { String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'cd'"; // Verify that we will read from the first index table assertExplainPlan(conn, selectSql, dataTableName, indexTableName + "1"); + waitForEventualConsistency(); // Verify the first write is visible but the second one is not ResultSet rs = conn.createStatement().executeQuery(selectSql); assertTrue(rs.next()); @@ -1120,6 +1136,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val3) values ('a', 'abcdd')"); conn.commit(); + waitForEventualConsistency(); String selectSql = "SELECT val2, val3 from " + dataTableName + " WHERE val1 = 'ab'"; // Verify that we will read from the first index table assertExplainPlan(conn, selectSql, dataTableName, indexName + "1"); @@ -1326,6 +1343,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " " + "values ('g', 'val1', 'val2g', null)"); conn.commit(); + waitForEventualConsistency(); // Fail phase 3 IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); String selectSql = "SELECT id from " + dataTableName @@ -1390,6 +1408,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a2', 'a1', 'val1', 'val2a', 'val31', 'val4')"); conn.commit(); + waitForEventualConsistency(); // create an unverified update to the index row pointing to an existing data row IndexRegionObserver.setFailDataTableUpdatesForTesting(true); @@ -1400,6 +1419,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a2', 'a2', 'val1', 'val1b', 'val3', 'val4')"); conn.commit(); + waitForEventualConsistency(); ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2"); String selectSql = @@ -1415,6 +1435,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')"); conn.commit(); + waitForEventualConsistency(); IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); expectedValues = Lists.newArrayList("a1", "a2", "a3"); selectSql = @@ -1431,6 +1452,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a5', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')"); conn.commit(); + waitForEventualConsistency(); IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); expectedValues = Lists.newArrayList("a4", "a5"); selectSql = @@ -1475,6 +1497,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a2', 'a1', 'val1a', 'val2a', 'val31', 'val4')"); conn.commit(); + waitForEventualConsistency(); ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2"); // condition on val1 in WHERE clause so that query will use the uncovered index @@ -1499,6 +1522,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement().execute("upsert into " + dataTableName + " " + "values ('a4', 'a1', 'val1b', 'val2a', 'val31', 'val4')"); conn.commit(); + waitForEventualConsistency(); expectedValues = Lists.newArrayList("a3", "a4"); selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1b'"; verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, expectedValues); @@ -1589,6 +1613,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val3) values ('a', null)"); conn.commit(); + waitForEventualConsistency(); String dql = String.format("select id, val2 from %s where val1='ab' and val3='abcd'", dataTableName); @@ -1613,6 +1638,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " values ('a', 'ac', null, null)"); conn.commit(); + waitForEventualConsistency(); dql = String.format("select id, val2 from %s where val1='ac' and val3 is null", dataTableName); @@ -1651,6 +1677,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val1, val2) values ('c', 'cd', 'cde')"); conn.commit(); + waitForEventualConsistency(); IndexRegionObserver.setIgnoreWritingDeleteColumnsToIndex(false); IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, indexName, null, 0, IndexTool.IndexVerifyType.BEFORE); @@ -1689,6 +1716,7 @@ public class GlobalIndexCheckerIT extends BaseTest { String delete = String.format("DELETE FROM %s where id = 'a'", dataTableName); conn.createStatement().execute(delete); conn.commit(); + waitForEventualConsistency(); // skip phase2, inserts an unverified row in index IndexRegionObserver.setFailDataTableUpdatesForTesting(true); String dml = "upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab', ?)"; @@ -1715,6 +1743,7 @@ public class GlobalIndexCheckerIT extends BaseTest { conn.createStatement() .execute("upsert into " + dataTableName + " (id, val1, val3) values ('a', 'ab', null)"); conn.commit(); + waitForEventualConsistency(); IndexTool it = IndexToolIT.runIndexTool(false, null, dataTableName, indexName, null, 0, IndexTool.IndexVerifyType.ONLY); CounterGroup mrJobCounters = IndexToolIT.getMRJobCounters(it); @@ -1737,6 +1766,8 @@ public class GlobalIndexCheckerIT extends BaseTest { // No need to run the same test twice one for async = true and the other for async = false return; } + Assume.assumeFalse("View indexes do not support CONSISTENCY=EVENTUAL", + indexDDLOptions.contains("CONSISTENCY=EVENTUAL")); try (Connection conn = DriverManager.getConnection(getUrl())) { // Create a base table String dataTableName = generateUniqueName(); @@ -1813,6 +1844,7 @@ public class GlobalIndexCheckerIT extends BaseTest { + "val1 = val1 || val1, val2 = val2 || val2"; conn.createStatement().execute(upsertSql); conn.commit(); + waitForEventualConsistency(); String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'abab'"; assertExplainPlan(conn, selectSql, dataTableName, indexTableName); ResultSet rs = conn.createStatement().executeQuery(selectSql); @@ -1849,6 +1881,7 @@ public class GlobalIndexCheckerIT extends BaseTest { ps.executeUpdate(); } conn.commit(); + waitForEventualConsistency(); String distinctQuery = "SELECT DISTINCT val1 FROM " + dataTableName; try (ResultSet rs = conn.createStatement().executeQuery(distinctQuery)) { PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class); @@ -1876,14 +1909,18 @@ public class GlobalIndexCheckerIT extends BaseTest { } } - static private void verifyTableHealth(Connection conn, String dataTableName, - String indexTableName) throws Exception { + protected void waitForEventualConsistency() throws Exception { + } + + protected void verifyTableHealth(Connection conn, String dataTableName, String indexTableName) + throws Exception { // Add two rows and check everything is still okay conn.createStatement() .execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc', 'abcd')"); conn.createStatement() .execute("upsert into " + dataTableName + " values ('z', 'za', 'zab', 'zabc')"); conn.commit(); + waitForEventualConsistency(); String selectSql = "SELECT * from " + dataTableName + " WHERE val1 = 'ab'"; /// Verify that we will read from the index table assertExplainPlan(conn, selectSql, dataTableName, indexTableName); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 7b14996fba..e2e3afedef 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -2070,7 +2070,7 @@ public abstract class BaseTest { * @throws IOException if something went wrong while connecting to Admin */ public synchronized static boolean isAnyStoreRefCountLeaked(Admin admin) throws IOException { - int retries = 5; + int retries = 15; while (retries > 0) { boolean isStoreRefCountLeaked = isStoreRefCountLeaked(admin); if (!isStoreRefCountLeaked) {
