This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this push: new 337ce8cf27 PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write path (#2192) 337ce8cf27 is described below commit 337ce8cf27bb9e2c9e8f3a5c2d192c6475e391e4 Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Mon Jul 7 15:54:35 2025 -0700 PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write path (#2192) PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write path --------- Co-authored-by: Tanuj Khurana <tkhur...@apache.org> --- .../hbase/index/metrics/MetricsIndexerSource.java | 10 + .../index/metrics/MetricsIndexerSourceImpl.java | 10 +- .../org/apache/phoenix/query/QueryServices.java | 3 + .../apache/phoenix/query/QueryServicesOptions.java | 1 + .../java/org/apache/phoenix/util/SchemaUtil.java | 29 ++ .../phoenix/hbase/index/IndexRegionObserver.java | 385 +++++++++++++++++---- .../phoenix/hbase/index/wal/IndexedKeyValue.java | 9 +- .../replication/ReplicationLogGroupWriter.java | 13 + .../replication/SystemCatalogWALEntryFilter.java | 69 ++-- .../phoenix/replication/tool/LogFileAnalyzer.java | 140 ++++++-- .../phoenix/replication/ReplicationLogGroupIT.java | 351 +++++++++++++++++++ .../java/org/apache/phoenix/query/BaseTest.java | 9 +- .../replication/ReplicationLogGroupTest.java | 2 +- 13 files changed, 906 insertions(+), 125 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java index 9707244148..df938c1a8b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java @@ -78,6 +78,9 @@ public interface MetricsIndexerSource extends BaseSource { String POST_INDEX_UPDATE_FAILURE = "postIndexUpdateFailure"; String POST_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index updates post data updates"; + String REPLICATION_SYNC_TIME = "replicationSyncTime"; + String REPLICATION_SYNC_TIME_DESC = "Histogram for the time in milliseconds to synchronously replicate a batch of mutations"; + /** * Updates the index preparation time histogram (preBatchMutate). * @param dataTableName Physical data table name @@ -209,4 +212,11 @@ public interface MetricsIndexerSource extends BaseSource { * @param dataTableName Physical data table name */ void incrementPostIndexUpdateFailures(String dataTableName); + + /** + * Updates the replication sync time histogram. + * @param dataTableName Physical data table name + * @param t time taken in milliseconds + */ + void updateReplicationSyncTime(String dataTableName, long t); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java index 79060fa986..cf258aa30e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java @@ -39,6 +39,7 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI private final MutableFastCounter slowPostOpenCalls; private final MetricHistogram duplicateKeyTimeHisto; private final MutableFastCounter slowDuplicateKeyCalls; + private final MetricHistogram replicationSyncTimeHisto; private final MetricHistogram preIndexUpdateTimeHisto; private final MetricHistogram postIndexUpdateTimeHisto; @@ -69,7 +70,8 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, SLOW_POST_OPEN_DESC, 0L); duplicateKeyTimeHisto = getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC); slowDuplicateKeyCalls = getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 0L); - + replicationSyncTimeHisto = getMetricsRegistry().newHistogram( + REPLICATION_SYNC_TIME, REPLICATION_SYNC_TIME_DESC); postIndexUpdateTimeHisto = getMetricsRegistry().newHistogram( POST_INDEX_UPDATE_TIME, POST_INDEX_UPDATE_TIME_DESC); preIndexUpdateTimeHisto = getMetricsRegistry().newHistogram( @@ -219,4 +221,10 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl implements MetricsI private String getCounterName(String baseCounterName, String tableName) { return baseCounterName + "." + tableName; } + + @Override + public void updateReplicationSyncTime(String dataTableName, long t) { + incrementTableSpecificHistogram(REPLICATION_SYNC_TIME, dataTableName, t); + replicationSyncTimeHisto.add(t); + } } \ No newline at end of file diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java index e16e716958..4f6926144a 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -557,6 +557,9 @@ public interface QueryServices extends SQLCloseable { String CQSI_THREAD_POOL_METRICS_ENABLED = "phoenix.cqsi.thread.pool.metrics.enabled"; + public static final String SYNCHRONOUS_REPLICATION_ENABLED = "phoenix.synchronous.replication.enabled"; + + /** * Get executor service used for parallel scans */ diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index bb5ef67e49..5a88e45270 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -470,6 +470,7 @@ public class QueryServicesOptions { public static final Boolean DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true; public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = false; + public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false; private final Configuration config; diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java index fb0bdc0c82..eb0e2a4ad4 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java @@ -1445,4 +1445,33 @@ public class SchemaUtil { } return false; } + + /** + * Data of non system tables is always replicated. + * Data of the system tables in the list below is not replicated because this data is very + * specific to the cluster. + * SYSTEM.SEQUENCE + * SYSTEM.STATS + * SYSTEM.LOG + * SYSTEM.TASK + * SYSTEM.FUNCTION + * SYSTEM.MUTEX + * SYSTEM.TRANSFORM + * SYSTEM.CDC_STREAM_STATUS + * SYSTEM.CDC_STREAM + * For SYSTEM.CATALOG and SYSTEM.CHILD_LINK we only replicate rows with tenant information. + * Non tenant (Global) rows are assumed to be executed by an admin or an admin process in each + * cluster separately and thus not replicated. + * @param tableName full name of the table + * @return true if the table data should be replicated, else false + */ + public static boolean shouldReplicateTable(byte[] tableName) { + if (!isSystemTable(tableName)) { + return true; + } + if (isMetaTable(tableName) || isChildLinkTable(tableName)) { + return true; + } + return false; + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index ab1c7ad95e..7ade7ca48e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -77,6 +78,7 @@ import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory; import org.apache.phoenix.hbase.index.table.HTableInterfaceReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.hbase.index.wal.IndexedKeyValue; import org.apache.phoenix.hbase.index.write.IndexWriter; import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter; import org.apache.phoenix.index.IndexMaintainer; @@ -86,6 +88,8 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.ReplicationLogGroup; +import org.apache.phoenix.replication.SystemCatalogWALEntryFilter; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PRow; @@ -134,9 +138,11 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS; import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew; @@ -146,6 +152,8 @@ import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException; import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB; import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT; +import static org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; /** @@ -163,10 +171,15 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static final OperationStatus NOWRITE = new OperationStatus(SUCCESS); public static final String PHOENIX_APPEND_METADATA_TO_WAL = "phoenix.append.metadata.to.wal"; public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false; + // Mutation attribute to ignore the mutation for replication + public static final String IGNORE_REPLICATION_ATTRIB = "_IGNORE_REPLICATION"; + private static final byte[] IGNORE_REPLICATION_ATTRIB_VAL = new byte[]{0}; + // TODO hardcoded for now, will fix later + public static final String DEFAULT_HA_GROUP = "DEFAULT_HA_GROUP"; /** * Class to represent pending data table rows - * */ + */ private class PendingRow { private int count; private boolean usable; @@ -202,33 +215,45 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } public int getCount() { - return count; - } + return count; + } public BatchMutateContext getLastContext() { - return lastContext; - } + return lastContext; + } } - private static boolean ignoreIndexRebuildForTesting = false; + + private static boolean ignoreIndexRebuildForTesting = false; private static boolean failPreIndexUpdatesForTesting = false; private static boolean failPostIndexUpdatesForTesting = false; private static boolean failDataTableUpdatesForTesting = false; private static boolean ignoreWritingDeleteColumnsToIndex = false; + private static boolean ignoreSyncReplicationForTesting = false; + public static void setIgnoreIndexRebuildForTesting(boolean ignore) { ignoreIndexRebuildForTesting = ignore; } + public static void setFailPreIndexUpdatesForTesting(boolean fail) { failPreIndexUpdatesForTesting = fail; } + public static void setFailPostIndexUpdatesForTesting(boolean fail) { failPostIndexUpdatesForTesting = fail; } + public static void setFailDataTableUpdatesForTesting(boolean fail) { failDataTableUpdatesForTesting = fail; } + public static void setIgnoreWritingDeleteColumnsToIndex(boolean ignore) { ignoreWritingDeleteColumnsToIndex = ignore; } + + public static void setIgnoreSyncReplicationForTesting(boolean ignore) { + ignoreSyncReplicationForTesting = ignore; + } + public enum BatchMutatePhase { INIT, PRE, POST, FAILED } @@ -237,13 +262,13 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { // coprocessor calls. TODO: remove after HBASE-18127 when available /* - * The concurrent batch of mutations is a set such that every pair of batches in this set has at - * least one common row. Since a BatchMutateContext object of a batch is modified only after the - * row locks for all the rows that are mutated by this batch are acquired, there can be only one - * thread can acquire the locks for its batch and safely access all the batch contexts in the - * set of concurrent batches. Because of this, we do not read atomic variables or additional - * locks to serialize the access to the BatchMutateContext objects. - */ + * The concurrent batch of mutations is a set such that every pair of batches in this set has at + * least one common row. Since a BatchMutateContext object of a batch is modified only after the + * row locks for all the rows that are mutated by this batch are acquired, there can be only one + * thread can acquire the locks for its batch and safely access all the batch contexts in the + * set of concurrent batches. Because of this, we do not read atomic variables or additional + * locks to serialize the access to the BatchMutateContext objects. + */ public static class BatchMutateContext { private volatile BatchMutatePhase currentPhase = BatchMutatePhase.INIT; // The max of reference counts on the pending rows of this batch at the time this @@ -301,7 +326,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } public BatchMutateContext(int clientVersion) { - this.clientVersion = clientVersion; + this.clientVersion = clientVersion; } public void populateOriginalMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp) { @@ -355,50 +380,92 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return maxPendingRowCount; } } + private ThreadLocal<BatchMutateContext> batchMutateContext = new ThreadLocal<BatchMutateContext>(); - /** - * Configuration key for if the indexer should check the version of HBase is running. Generally, - * you only want to ignore this for testing or for custom versions of HBase. - */ - public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion"; - - public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write"; - private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false; - - private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold"; - private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000; - private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment"; - private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000; - - // Index writers get invoked before and after data table updates - protected IndexWriter preWriter; - protected IndexWriter postWriter; - - protected IndexBuildManager builder; - private LockManager lockManager; - - // The collection of pending data table rows - private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>(); - - private MetricsIndexerSource metricSource; - - private boolean stopped; - private boolean disabled; - private long slowIndexPrepareThreshold; - private long slowPreIncrementThreshold; - private int rowLockWaitDuration; - private int concurrentMutationWaitDuration; - private String dataTableName; - private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL; - private boolean isNamespaceEnabled = false; - private boolean useBloomFilter = false; - private long lastTimestamp = 0; - private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new ArrayList<>(); - private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; - private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; - private byte[] encodedRegionName; + /** + * Configuration key for if the indexer should check the version of HBase is running. Generally, + * you only want to ignore this for testing or for custom versions of HBase. + */ + public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion"; + public static final String INDEX_LAZY_POST_BATCH_WRITE = + "org.apache.hadoop.hbase.index.lazy.post_batch.write"; + private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false; + private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = + "phoenix.indexer.slow.post.batch.mutate.threshold"; + private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000; + private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = + "phoenix.indexer.slow.pre.increment"; + private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000; + + // Index writers get invoked before and after data table updates + protected IndexWriter preWriter; + protected IndexWriter postWriter; + + protected IndexBuildManager builder; + private LockManager lockManager; + + // The collection of pending data table rows + private Map<ImmutableBytesPtr, PendingRow> pendingRows = new ConcurrentHashMap<>(); + + private MetricsIndexerSource metricSource; + + private boolean stopped; + private boolean disabled; + private long slowIndexPrepareThreshold; + private long slowPreIncrementThreshold; + private int rowLockWaitDuration; + private int concurrentMutationWaitDuration; + private String dataTableName; + private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL; + private boolean isNamespaceEnabled = false; + private boolean useBloomFilter = false; + private long lastTimestamp = 0; + private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new ArrayList<>(); + private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; + private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; + private byte[] encodedRegionName; + private boolean shouldReplicate; + private ReplicationLogGroup replicationLog; + + // Don't replicate the mutation if this attribute is set + private static final Predicate<Mutation> IGNORE_REPLICATION = mutation -> + mutation.getAttribute(IGNORE_REPLICATION_ATTRIB) != null; + + // Don't replicate the mutation for syscat/child link if the tenantid is not + // leading in the row key + private static final Predicate<Mutation> NOT_TENANT_ID_ROW_KEY_PREFIX = mutation -> + !SystemCatalogWALEntryFilter.isTenantIdLeadingInKey(mutation.getRow(), 0); + + // Don't replicate the mutation for child link if child is not a tenant view + private static final Predicate<Mutation> NOT_CHILD_LINK_TENANT_VIEW = mutation -> { + boolean isChildLinkToTenantView = false; + for (List<Cell> cells : mutation.getFamilyCellMap().values()) { + for (Cell cell : cells) { + if (SystemCatalogWALEntryFilter.isCellChildLinkToTenantView(cell)) { + isChildLinkToTenantView = true; + break; + } + } + } + return !isChildLinkToTenantView; + }; + + /** + * If the replication filter evaluates to true, the mutation is ignored from replication + */ + private static Predicate<Mutation> getSynchronousReplicationFilter(byte[] tableName) { + Predicate<Mutation> filter = IGNORE_REPLICATION; + if (SchemaUtil.isMetaTable(tableName)) { + filter = IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX); + } else if (SchemaUtil.isChildLinkTable(tableName)) { + filter = IGNORE_REPLICATION.or + (NOT_TENANT_ID_ROW_KEY_PREFIX.and(NOT_CHILD_LINK_TENANT_VIEW)); + } + return filter; + } + private Predicate<Mutation> ignoreReplicationFilter; @Override public Optional<RegionObserver> getRegionObserver() { @@ -448,6 +515,18 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { BloomType bloomFilterType = tableDescriptor.getColumnFamilies()[0].getBloomFilterType(); // when the table descriptor changes, the coproc is reloaded this.useBloomFilter = bloomFilterType == BloomType.ROW; + byte[] tableName = env.getRegionInfo().getTable().getName(); + this.shouldReplicate = env.getConfiguration().getBoolean( + SYNCHRONOUS_REPLICATION_ENABLED, DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED); + if (this.shouldReplicate) { + // replication feature is enabled, check if it is enabled for the table + this.shouldReplicate = SchemaUtil.shouldReplicateTable(tableName); + } + if (this.shouldReplicate) { + this.replicationLog = ReplicationLogGroup.get(env.getConfiguration(), + env.getServerName(), DEFAULT_HA_GROUP); + this.ignoreReplicationFilter = getSynchronousReplicationFilter(tableName); + } } catch (NoSuchMethodError ex) { disabled = true; LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex); @@ -480,7 +559,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return; } this.stopped = true; - String msg = "Indexer is being stopped"; + String msg = "IndexRegionObserver is being stopped"; this.builder.stop(msg); this.preWriter.stop(msg); this.postWriter.stop(msg); @@ -556,6 +635,89 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { "Somehow didn't return an index update but also didn't propagate the failure to the client!"); } + @Override + public void preWALRestore( + org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends RegionCoprocessorEnvironment> ctx, + org.apache.hadoop.hbase.client.RegionInfo info, + org.apache.hadoop.hbase.wal.WALKey logKey, + WALEdit logEdit) throws IOException { + if (this.disabled) { + return; + } + if (!shouldReplicate) { + return; + } + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { + replicateEditOnWALRestore(logKey, logEdit); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + metricSource.updatePreWALRestoreTime(dataTableName, duration); + } + } + + /** + * A batch of mutations is recorded in a single WAL edit so a WAL edit can have cells + * belonging to multiple rows. Further, for one mutation the WAL edit contains the individual + * cells that are part of the mutation. + * @param logKey + * @param logEdit + * @throws IOException + */ + private void replicateEditOnWALRestore(org.apache.hadoop.hbase.wal.WALKey logKey, + WALEdit logEdit) throws IOException { + ImmutableBytesPtr prevKey = null, currentKey = null; + Put put = null; + Delete del = null; + for (Cell kv : logEdit.getCells()) { + if (kv instanceof IndexedKeyValue) { + IndexedKeyValue ikv = (IndexedKeyValue) kv; + replicationLog.append(Bytes.toString(ikv.getIndexTable()), -1, ikv.getMutation()); + } else { + // While we can generate a separate mutation for every cell that is part of the + // WAL edit and replicate each such mutation. Doing that will not be very efficient + // since a mutation can have large number of cells. Instead, we first group the + // cells belonging to the same row into a mutation and then replicate that + // mutation. + currentKey = new ImmutableBytesPtr(kv.getRowArray(), + kv.getRowOffset(), kv.getRowLength()); + if (!currentKey.equals(prevKey)) { + if (put != null && !this.ignoreReplicationFilter.test(put)) { + replicationLog.append(logKey.getTableName().getNameAsString(), -1, put); + } + if (del != null && !this.ignoreReplicationFilter.test(del)) { + replicationLog.append(logKey.getTableName().getNameAsString(), -1, del); + } + // reset + put = null; + del = null; + } + if (kv.getType() == Cell.Type.Put) { + if (put == null) { + put = new Put(currentKey.get(), + currentKey.getOffset(), currentKey.getLength()); + } + put.add(kv); + } else { + if (del == null) { + del = new Delete(currentKey.get(), + currentKey.getOffset(), currentKey.getLength()); + } + del.add(kv); + } + prevKey = currentKey; + } + } + // append the last one + if (put != null && !this.ignoreReplicationFilter.test(put)) { + replicationLog.append(logKey.getTableName().getNameAsString(), -1, put); + } + if (del != null && !this.ignoreReplicationFilter.test(del)) { + replicationLog.append(logKey.getTableName().getNameAsString(), -1, del); + } + replicationLog.sync(); + } + private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) { for (int i = 0; i < miniBatchOp.size(); i++) { @@ -629,6 +791,11 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { Result result = Result.create(cells); miniBatchOp.setOperationStatus(i, new OperationStatus(SUCCESS, result)); + // since this mutation is ignored by setting it's status to success in the coproc + // it shouldn't be synchronously replicated + if (this.shouldReplicate) { + m.setAttribute(IGNORE_REPLICATION_ATTRIB, IGNORE_REPLICATION_ATTRIB_VAL); + } } } else if (context.returnResult) { Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new HashMap<>(); @@ -1547,6 +1714,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { waitForPreviousConcurrentBatch(table, context); } preparePostIndexMutations(context, batchTimestamp, indexMetaData); + addGlobalIndexMutationsToWAL(miniBatchOp, context); } if (context.hasLocalIndex) { // Group all the updates for a single row into a single update to be processed (for local indexes) @@ -1558,6 +1726,53 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } + /** + * We need to add the index mutations to the data table's WAL to handle cases where the RS + * crashes before the postBatchMutateIndispensably hook is called where the mutations are + * synchronously replicated. This is needed because during WAL restore we don't have the + * IndexMaintainer object to generate the corresponding index mutations. + * @param miniBatchOp + * @param context + */ + private void addGlobalIndexMutationsToWAL(MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context) { + if (!this.shouldReplicate) { + return; + } + + WALEdit edit = miniBatchOp.getWalEdit(0); + if (edit == null) { + edit = new WALEdit(); + miniBatchOp.setWalEdit(0, edit); + } + + if (context.preIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry + : context.preIndexUpdates.entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + // This creates cells of family type WALEdit.METAFAMILY which are not applied + // on restore + edit.add(IndexedKeyValue.newIndexedKeyValue( + entry.getKey().get(), entry.getValue())); + } + } + + if (context.postIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry + : context.postIndexUpdates.entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + // This creates cells of family type WALEdit.METAFAMILY which are not applied + // on restore + edit.add(IndexedKeyValue.newIndexedKeyValue( + entry.getKey().get(), entry.getValue())); + } + } + } + /** * In case of ON DUPLICATE KEY IGNORE, if the row already exists no mutations will be * generated so release the row lock. @@ -1689,7 +1904,16 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { this.builder.batchCompleted(miniBatchOp); if (success) { // The pre-index and data table updates are successful, and now, do post index updates - doPost(c, context); + CompletableFuture<Void> postIndexFuture = + CompletableFuture.runAsync(() -> doPost(c, context)); + long start = EnvironmentEdgeManager.currentTimeMillis(); + try { + replicateMutations(miniBatchOp, context); + } finally { + long duration = EnvironmentEdgeManager.currentTimeMillis() - start; + metricSource.updateReplicationSyncTime(dataTableName, duration); + } + FutureUtils.get(postIndexFuture); } } finally { removeBatchMutateContext(c); @@ -1761,7 +1985,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } - private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException { + private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) { long start = EnvironmentEdgeManager.currentTimeMillis(); try { @@ -2231,4 +2455,49 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { public static boolean isAtomicOperationComplete(OperationStatus status) { return status.getOperationStatusCode() == SUCCESS && status.getResult() != null; } + + private void replicateMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, + BatchMutateContext context) throws IOException { + + if (!this.shouldReplicate) { + return; + } + if (ignoreSyncReplicationForTesting) { + return; + } + assert this.replicationLog != null; + + for (Integer i = 0; i < miniBatchOp.size(); i++) { + Mutation m = miniBatchOp.getOperation(i); + if (this.ignoreReplicationFilter.test(m)) { + continue; + } + this.replicationLog.append(this.dataTableName, -1, m); + Mutation[] mutationsAddedByCP = miniBatchOp.getOperationsFromCoprocessors(i); + if (mutationsAddedByCP != null) { + for (Mutation addedMutation : mutationsAddedByCP) { + this.replicationLog.append(this.dataTableName, -1, addedMutation); + } + } + } + if (context.preIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry + : context.preIndexUpdates.entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + this.replicationLog.append(entry.getKey().getTableName(), -1, entry.getValue()); + } + } + if (context.postIndexUpdates != null) { + for (Map.Entry<HTableInterfaceReference, Mutation> entry + : context.postIndexUpdates.entries()) { + if (this.ignoreReplicationFilter.test(entry.getValue())) { + continue; + } + this.replicationLog.append(entry.getKey().getTableName(), -1, entry.getValue()); + } + } + this.replicationLog.sync(); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java index a973f02e1e..6bcbd99b90 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java @@ -53,6 +53,11 @@ public class IndexedKeyValue extends KeyValue { private int hashCode; public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){ + Cell indexWALCell = adaptFirstCellFromMutation(m); + return new IndexedKeyValue(indexWALCell, new ImmutableBytesPtr(bs), m); + } + + public static IndexedKeyValue newIndexedKeyValue(ImmutableBytesPtr bs, Mutation m) { Cell indexWALCell = adaptFirstCellFromMutation(m); return new IndexedKeyValue(indexWALCell, bs, m); } @@ -83,9 +88,9 @@ public class IndexedKeyValue extends KeyValue { //used for deserialization public IndexedKeyValue() {} - private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){ + private IndexedKeyValue(Cell c, ImmutableBytesPtr bs, Mutation mutation) { super(c); - this.indexTableName = new ImmutableBytesPtr(bs); + this.indexTableName = bs; this.mutation = mutation; this.hashCode = calcHashCode(indexTableName, mutation); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java index 6262e75a10..a3810e0fb7 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java @@ -462,6 +462,19 @@ public abstract class ReplicationLogGroupWriter { } } + /** Close the currentWriter. + * Needed by tests so that we can close the log file and then read it + */ + protected void closeCurrentWriter() { + lock.lock(); + try { + closeWriter(currentWriter); + currentWriter = null; + } finally { + lock.unlock(); + } + } + /** * Check if this ReplicationLogGroup is closed. * diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java index 39c8b13cb4..ec9445c24c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java @@ -76,20 +76,39 @@ public class SystemCatalogWALEntryFilter implements } /** - * Does the cell key have leading tenant Id. + * Does the cell row key have leading tenant Id. * @param cell hbase cell * @return true if the cell has leading tenant Id in key */ private boolean isTenantIdLeadingInKey(final Cell cell) { // rows in system.catalog or system child that aren't tenant-owned // will have a leading separator byte - return cell.getRowArray()[cell.getRowOffset()] - != QueryConstants.SEPARATOR_BYTE; + return isTenantIdLeadingInKey(cell.getRowArray(), cell.getRowOffset()); + + } + + /** + * Checks if the row key of SYSTEM.CATALOG or SYSTEM.CHILD_LINK row has leading tenant ID + * @param rowKey + * @param rowOffset + * @return true if the row key has leading tenant ID + */ + public static boolean isTenantIdLeadingInKey(byte[] rowKey, int rowOffset) { + return rowKey[rowOffset] != QueryConstants.SEPARATOR_BYTE; } /** - * is the cell for system child link a tenant owned. Besides the non empty - * tenant id, system.child_link table have tenant owned data for parent child + * Is the cell for system child link a tenant owned. This happens if the tenant id is + * leading in the row key or the cell has tenant owned data for parent child links. + * @param cell hbase cell + * @return true if the cell is tenant owned + */ + private boolean isTenantRowCellSystemChildLink(final Cell cell) { + return isTenantIdLeadingInKey(cell) || isCellChildLinkToTenantView(cell); + } + + /** + * SYSTEM.CHILD_LINK table have tenant owned data for parent child * links. In this case, the column qualifier is * {@code PhoenixDatabaseMetaData#LINK_TYPE_BYTES} and value is * {@code PTable.LinkType.CHILD_TABLE}. For corresponding delete markers the @@ -97,29 +116,27 @@ public class SystemCatalogWALEntryFilter implements * @param cell hbase cell * @return true if the cell is tenant owned */ - private boolean isTenantRowCellSystemChildLink(final Cell cell) { - boolean isTenantRowCell = isTenantIdLeadingInKey(cell); - + public static boolean isCellChildLinkToTenantView(final Cell cell) { ImmutableBytesWritable key = new ImmutableBytesWritable( - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); boolean isChildLinkToTenantView = false; - if (!isTenantRowCell) { - boolean isChildLink = CellUtil.matchingQualifier( - cell, PhoenixDatabaseMetaData.LINK_TYPE_BYTES); - if ((isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES)) || - cell.getType() == Cell.Type.DeleteFamily) { - byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][]; - SchemaUtil.getVarChars(key.get(), key.getOffset(), - key.getLength(), 0, rowViewKeyMetadata); - /** if the child link is to a tenant-owned view, the COLUMN_NAME field will be - * the byte[] of the tenant otherwise, it will be an empty byte array - * (NOT QueryConstants.SEPARATOR_BYTE, but a byte[0]). This assumption is also - * true for child link's delete markers in SYSTEM.CHILD_LINK as it only contains link - * rows and does not deal with other type of rows like column rows that also has - * COLUMN_NAME populated with actual column name.**/ - isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length != 0; - } + boolean isChildLink = CellUtil.matchingQualifier(cell, + PhoenixDatabaseMetaData.LINK_TYPE_BYTES); + if (isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES) + || cell.getType() == Cell.Type.DeleteFamily) { + byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][]; + SchemaUtil.getVarChars(key.get(), key.getOffset(), + key.getLength(), 0, rowViewKeyMetadata); + /** if the child link is to a tenant-owned view, the COLUMN_NAME field will be + * the byte[] of the tenant otherwise, it will be an empty byte array + * (NOT QueryConstants.SEPARATOR_BYTE, but a byte[0]). This assumption is also + * true for child link's delete markers in SYSTEM.CHILD_LINK as it only contains link + * rows and does not deal with other type of rows like column rows that also has + * COLUMN_NAME populated with actual column name. + **/ + isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length != 0; } - return isTenantRowCell || isChildLinkToTenantView; + return isChildLinkToTenantView; } + } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java index 042f04fef0..4b7b071c35 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java @@ -18,20 +18,25 @@ package org.apache.phoenix.replication.tool; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathNotFoundException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.replication.log.LogFile; import org.apache.phoenix.replication.log.LogFile.Record; import org.apache.phoenix.replication.log.LogFileReader; import org.apache.phoenix.replication.log.LogFileReaderContext; +import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,47 +61,37 @@ public class LogFileAnalyzer extends Configured implements Tool { private boolean verbose = false; private boolean decode = false; private boolean check = false; + FileSystem fs; - @Override - public int run(String[] args) throws Exception { - if (!parseArgs(args)) { - System.err.println(USAGE); - return 1; - } - + private void init() throws IOException { Configuration conf = getConf(); if (conf == null) { conf = HBaseConfiguration.create(); setConf(conf); } + if (fs == null) { + fs = FileSystem.get(getConf()); + } + } + @Override + public int run(String[] args) throws Exception { + if (!parseArgs(args)) { + System.err.println(USAGE); + return 1; + } try { - FileSystem fs = FileSystem.get(conf); + init(); Path path = new Path(args[args.length - 1]); - - if (!fs.exists(path)) { - System.err.println("Path does not exist: " + path); - return 1; - } - - List<Path> filesToAnalyze = new ArrayList<>(); - if (fs.getFileStatus(path).isDirectory()) { - // Recursively find all .plog files - findLogFiles(fs, path, filesToAnalyze); - } else { - filesToAnalyze.add(path); - } - + List<Path> filesToAnalyze = getFilesToAnalyze(path); if (filesToAnalyze.isEmpty()) { System.err.println("No log files found in: " + path); return 1; } - // Analyze each file for (Path file : filesToAnalyze) { - analyzeFile(fs, file); + analyzeFile(file); } - return 0; } catch (Exception e) { LOG.error("Error analyzing log files", e); @@ -104,19 +99,62 @@ public class LogFileAnalyzer extends Configured implements Tool { } } - private void findLogFiles(FileSystem fs, Path dir, List<Path> files) throws IOException { + /** + * Returns all the mutations grouped by the table name under a source path + * @param source Path which can be a file or directory + * @return Mutations grouped by the table name + * @throws IOException + */ + public Map<String, List<Mutation>> groupLogsByTable(String source) throws IOException { + Map<String, List<Mutation>> allFiles = Maps.newHashMap(); + init(); + Path path = new Path(source); + List<Path> filesToAnalyze = getFilesToAnalyze(path); + if (filesToAnalyze.isEmpty()) { + return allFiles; + } + // Analyze each file + for (Path file : filesToAnalyze) { + Map<String, List<Mutation>> perFile = groupLogsByTable(file); + for (Map.Entry<String, List<Mutation>> entry : perFile.entrySet()) { + List<Mutation> mutations = allFiles.get(entry.getKey()); + if (mutations == null) { + allFiles.put(entry.getKey(), entry.getValue()); + } else { + mutations.addAll(entry.getValue()); + } + } + } + return allFiles; + } + + private List<Path> getFilesToAnalyze(Path path) throws IOException { + if (!fs.exists(path)) { + throw new PathNotFoundException(path.toString()); + } + List<Path> filesToAnalyze = Lists.newArrayList(); + if (fs.getFileStatus(path).isDirectory()) { + // Recursively find all .plog files + findLogFiles(path, filesToAnalyze); + } else { + filesToAnalyze.add(path); + } + return filesToAnalyze; + } + + private void findLogFiles(Path dir, List<Path> files) throws IOException { FileStatus[] statuses = fs.listStatus(dir); for (FileStatus status : statuses) { Path path = status.getPath(); if (status.isDirectory()) { - findLogFiles(fs, path, files); + findLogFiles(path, files); } else if (path.getName().endsWith(".plog")) { files.add(path); } } } - private void analyzeFile(FileSystem fs, Path file) throws IOException { + private void analyzeFile(Path file) throws IOException { System.out.println("\nAnalyzing file: " + file); LogFileReaderContext context = new LogFileReaderContext(getConf()) @@ -150,13 +188,18 @@ public class LogFileAnalyzer extends Configured implements Tool { } // Print trailer information - System.out.println("\nTrailer:"); - System.out.println(" Record Count: " + reader.getTrailer().getRecordCount()); - System.out.println(" Block Count: " + reader.getTrailer().getBlockCount()); - System.out.println(" Blocks Start Offset: " - + reader.getTrailer().getBlocksStartOffset()); - System.out.println(" Trailer Start Offset: " - + reader.getTrailer().getTrailerStartOffset()); + LogFile.Trailer trailer = reader.getTrailer(); + if (trailer != null) { + System.out.println("\nTrailer:"); + System.out.println(" Record Count: " + reader.getTrailer().getRecordCount()); + System.out.println(" Block Count: " + reader.getTrailer().getBlockCount()); + System.out.println(" Blocks Start Offset: " + + reader.getTrailer().getBlocksStartOffset()); + System.out.println(" Trailer Start Offset: " + + reader.getTrailer().getTrailerStartOffset()); + } else { + System.out.println("\nTrailer is null"); + } // Print verification results if checking if (check) { @@ -179,6 +222,31 @@ public class LogFileAnalyzer extends Configured implements Tool { } } + private Map<String, List<Mutation>> groupLogsByTable(Path file) throws IOException { + Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap(); + System.out.println("\nAnalyzing file: " + file); + LogFileReaderContext context = new LogFileReaderContext(getConf()) + .setFileSystem(fs) + .setFilePath(file) + .setSkipCorruptBlocks(check); // Skip corrupt blocks if checking + LogFileReader reader = new LogFileReader(); + try { + reader.init(context); + // Process records + Record record; + while ((record = reader.next()) != null) { + String tableName = record.getHBaseTableName(); + List<Mutation> mutations = mutationsByTable.getOrDefault(tableName, + Lists.newArrayList()); + mutations.add(record.getMutation()); + mutationsByTable.put(tableName, mutations); + } + } finally { + reader.close(); + } + return mutationsByTable; + } + private boolean parseArgs(String[] args) { if (args.length == 0) { return false; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java new file mode 100644 index 0000000000..f111d141f6 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import static org.apache.phoenix.hbase.index.IndexRegionObserver.DEFAULT_HA_GROUP; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.net.URI; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.phoenix.end2end.IndexToolIT; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.query.PhoenixTestBuilder; +import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.replication.tool.LogFileAnalyzer; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category(NeedsOwnMiniClusterTest.class) +public class ReplicationLogGroupIT extends ParallelStatsDisabledIT { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogGroupIT.class); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.SYNCHRONOUS_REPLICATION_ENABLED, Boolean.TRUE.toString()); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + LOG.info("Starting test {}", name.getMethodName()); + } + + @After + public void afterTest() throws Exception { + LOG.info("Starting cleanup for test {}", name.getMethodName()); + cleanupLogsFolder(standbyUri); + LOG.info("Ending cleanup for test {}", name.getMethodName()); + } + + /** + * Delete all the shards under the top level replication log directory + * @throws IOException + */ + private void cleanupLogsFolder(URI source) throws IOException { + FileSystem fs = FileSystem.get(config); + Path dir = new Path(source.getPath()); + FileStatus[] statuses = fs.listStatus(dir); + for (FileStatus status : statuses) { + Path shard = status.getPath(); + if (status.isDirectory()) { + fs.delete(shard, true); + } + } + } + + private ReplicationLogGroup getReplicationLogGroup() throws IOException { + HRegionServer rs = getUtility().getHBaseCluster().getRegionServer(0); + return ReplicationLogGroup.get(config, rs.getServerName(), DEFAULT_HA_GROUP); + } + + private Map<String, List<Mutation>> groupLogsByTable() throws Exception { + ReplicationLogGroup log = getReplicationLogGroup(); + log.getActiveWriter().closeCurrentWriter(); + LogFileAnalyzer analyzer = new LogFileAnalyzer(); + analyzer.setConf(config); + String[] args = {"--check", standbyUri.getPath()}; + assertEquals(0, analyzer.run(args)); + return analyzer.groupLogsByTable(standbyUri.getPath()); + } + + private int getCountForTable(Map<String, List<Mutation>> logsByTable, + String tableName) throws Exception { + List<Mutation> mutations = logsByTable.get(tableName); + return mutations != null ? mutations.size() : 0; + } + + private void verifyReplication(Connection conn, + Map<String, Integer> expected) throws Exception { + Map<String, List<Mutation>> mutationsByTable = groupLogsByTable(); + dumpTableLogCount(mutationsByTable); + for (Map.Entry<String, Integer> entry : expected.entrySet()) { + String tableName = entry.getKey(); + int expectedMutationCount = entry.getValue(); + List<Mutation> mutations = mutationsByTable.get(tableName); + int actualMutationCount = mutations != null ? mutations.size() : 0; + try { + if (!tableName.equals(SYSTEM_CATALOG_NAME)) { + assertEquals(String.format("For table %s", tableName), + expectedMutationCount, actualMutationCount); + } else { + // special handling for syscat + assertTrue("For SYSCAT", actualMutationCount >= expectedMutationCount); + } + } catch (AssertionError e) { + TestUtil.dumpTable(conn, TableName.valueOf(tableName)); + throw e; + } + } + } + + private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) { + LOG.info("Dump table log count for test {}", name.getMethodName()); + for (Map.Entry<String, List<Mutation>> table : mutationsByTable.entrySet()) { + LOG.info("#Log entries for {} = {}", table.getKey(), table.getValue().size()); + } + } + + private void moveRegionToServer(TableName tableName, ServerName sn) throws Exception { + HBaseTestingUtility util = getUtility(); + try (RegionLocator locator = util.getConnection().getRegionLocator(tableName)) { + String regEN = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + while (!sn.equals(locator.getAllRegionLocations().get(0).getServerName())) { + LOG.info("Moving region {} of table {} to server {}", regEN, tableName, sn); + util.getAdmin().move(Bytes.toBytes(regEN), sn); + Thread.sleep(100); + } + LOG.info("Moved region {} of table {} to server {}", regEN, tableName, sn); + } + } + + private PhoenixTestBuilder.SchemaBuilder createViewHierarchy() throws Exception { + // Define the test schema. + // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, KP) + // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID) + // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID) + final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new PhoenixTestBuilder.SchemaBuilder(getUrl()); + PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions = + PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults(); + PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions + globalViewOptions = PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults(); + PhoenixTestBuilder.SchemaBuilder.TenantViewOptions + tenantViewWithOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults(); + PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions + tenantViewIndexOverrideOptions = PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults(); + + try (Connection conn = DriverManager.getConnection(getUrl())) { + schemaBuilder.withTableOptions(tableOptions) + .withGlobalViewOptions(globalViewOptions) + .withTenantViewOptions(tenantViewWithOverrideOptions) + .withTenantViewIndexOptions(tenantViewIndexOverrideOptions) + .buildWithNewTenant(); + } + return schemaBuilder; + } + + @Test + public void testAppendAndSync() throws Exception { + final String tableName = "T_" + generateUniqueName(); + final String indexName1 = "I_" + generateUniqueName(); + final String indexName2 = "I_" + generateUniqueName(); + final String indexName3 = "L_" + generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String ddl = String.format("create table %s (id1 integer not null, " + + "id2 integer not null, val1 varchar, val2 varchar " + + "constraint pk primary key (id1, id2))", tableName); + conn.createStatement().execute(ddl); + ddl = String.format("create index %s on %s (val1) include (val2)", + indexName1, tableName); + conn.createStatement().execute(ddl); + ddl = String.format("create index %s on %s (val2) include (val1)", + indexName2, tableName); + conn.createStatement().execute(ddl); + ddl = String.format("create local index %s on %s (id2,val1) include (val2)", + indexName3, tableName); + conn.createStatement().execute(ddl); + conn.commit(); + PreparedStatement stmt = conn.prepareStatement( + "upsert into " + tableName + " VALUES(?, ?, ?, ?)"); + // upsert 50 rows + int rowCount = 50; + for (int i = 0; i < 5; ++i) { + for (int j = 0; j < 10; ++j) { + stmt.setInt(1, i); + stmt.setInt(2, j); + stmt.setString(3, "abcdefghijklmnopqrstuvwxyz"); + stmt.setString(4, null); + stmt.executeUpdate(); + } + conn.commit(); + } + // do some atomic upserts which will be ignored and therefore not replicated + stmt = conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?) " + + "ON DUPLICATE KEY IGNORE"); + conn.setAutoCommit(true); + for (int i = 0; i < 5; ++i) { + for (int j = 0; j < 2; ++j) { + stmt.setInt(1, i); + stmt.setInt(2, j); + stmt.setString(3, null); + assertEquals(0, stmt.executeUpdate()); + } + } + // verify the correctness of the index + IndexToolIT.verifyIndexTable(tableName, indexName1, conn); + // verify replication + Map<String, Integer> expected = Maps.newHashMap(); + // mutation count will be equal to row count since the atomic upsert mutations will be + // ignored and therefore not replicated + expected.put(tableName, rowCount * 3); // Put + Delete + local index update + // for index1 unverified + verified + delete (Delete column) + expected.put(indexName1, rowCount * 3); + // for index2 unverified + verified since the null column is part of row key + expected.put(indexName2, rowCount * 2); + // we didn't create any tenant views so no change in the syscat entries + expected.put(SYSTEM_CATALOG_NAME, 0); + expected.put(SYSTEM_CHILD_LINK_NAME, 0); + verifyReplication(conn, expected); + } + } + + /** + * This test simulates RS crashes in the middle of write transactions after the edits + * have been written to the WAL but before they have been replicated to the standby + * cluster. Those edits will be replicated when the WAL is replayed. + */ + @Test + public void testWALRestore() throws Exception { + HBaseTestingUtility util = getUtility(); + MiniHBaseCluster cluster = util.getHBaseCluster(); + final String tableName = "T_" + generateUniqueName(); + final String indexName = "I_" + generateUniqueName(); + TableName table = TableName.valueOf(tableName); + try (Connection conn = DriverManager.getConnection(getUrl())) { + String ddl = String.format("create table %s (id1 integer not null, " + + "id2 integer not null, val1 varchar, val2 varchar " + + "constraint pk primary key (id1, id2))", tableName); + conn.createStatement().execute(ddl); + ddl = String.format("create index %s on %s (val1) include (val2)", + indexName, tableName); + conn.createStatement().execute(ddl); + conn.commit(); + } + // Mini cluster by default comes with only 1 RS. Starting a second RS so that + // we can kill the RS + JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer(); + ServerName sn2 = rs2.getRegionServer().getServerName(); + // Assign some table regions to the new RS we started above + moveRegionToServer(table, sn2); + moveRegionToServer(TableName.valueOf(SYSTEM_CATALOG_NAME), sn2); + moveRegionToServer(TableName.valueOf(SYSTEM_CHILD_LINK_NAME), sn2); + int rowCount = 50; + try (Connection conn = DriverManager.getConnection(getUrl())) { + PreparedStatement stmt = conn.prepareStatement( + "upsert into " + tableName + " VALUES(?, ?, ?, ?)"); + // upsert 50 rows + for (int i = 0; i < 5; ++i) { + for (int j = 0; j < 10; ++j) { + stmt.setInt(1, i); + stmt.setInt(2, j); + stmt.setString(3, "abcdefghijklmnopqrstuvwxyz"); + stmt.setString(4, null); // Generate a DeleteColumn cell + stmt.executeUpdate(); + } + // we want to simulate RS crash after updating memstore and WAL + IndexRegionObserver.setIgnoreSyncReplicationForTesting(true); + conn.commit(); + } + // Create tenant views for syscat and child link replication + createViewHierarchy(); + } finally { + IndexRegionObserver.setIgnoreSyncReplicationForTesting(false); + } + // Kill the RS + cluster.killRegionServer(rs2.getRegionServer().getServerName()); + Threads.sleep(20000); // just to be sure that the kill has fully started. + // Regions will be re-opened and the WAL will be replayed + util.waitUntilAllRegionsAssigned(table); + try (Connection conn = DriverManager.getConnection(getUrl())) { + Map<String, Integer> expected = Maps.newHashMap(); + // For each row 1 Put + 1 Delete (DeleteColumn) + expected.put(tableName, rowCount * 2); + // unverified + verified + delete (Delete column) + expected.put(indexName, rowCount * 3); + // 1 tenant view was created + expected.put(SYSTEM_CHILD_LINK_NAME, 1); + // atleast 1 log entry for syscat + expected.put(SYSTEM_CATALOG_NAME, 1); + verifyReplication(conn, expected); + } + } + + @Test + public void testSystemTables() throws Exception { + createViewHierarchy(); + Map<String, List<Mutation>> logsByTable = groupLogsByTable(); + dumpTableLogCount(logsByTable); + // find all the log entries for system tables + Map<String, List<Mutation>> systemTables = logsByTable.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(QueryConstants.SYSTEM_SCHEMA_NAME)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + // there should be only 2 entries CATALOG, CHILD_LINK + assertEquals(2, systemTables.size()); + assertEquals(1, getCountForTable(systemTables, SYSTEM_CHILD_LINK_NAME)); + assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java index 3e9775e216..bf2f65f592 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java @@ -85,6 +85,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.math.BigDecimal; +import java.net.URI; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.Date; @@ -119,6 +120,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -154,6 +156,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; import org.apache.phoenix.jdbc.PhoenixTestDriver; +import org.apache.phoenix.replication.ReplicationLogGroup; import org.apache.phoenix.schema.NewerTableAlreadyExistsException; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTableType; @@ -432,6 +435,8 @@ public abstract class BaseTest { protected static boolean clusterInitialized = false; protected static HBaseTestingUtility utility; protected static final Configuration config = HBaseConfiguration.create(); + protected static final String logDir = "/PHOENIX_REPLICATION_IN"; + protected static URI standbyUri = new Path(logDir).toUri(); protected static String getUrl() { if (!clusterInitialized) { @@ -669,6 +674,8 @@ public abstract class BaseTest { conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0); } setPhoenixRegionServerEndpoint(conf); + // setup up synchronous replication + conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, standbyUri.toString()); return conf; } @@ -1985,7 +1992,7 @@ public abstract class BaseTest { } /** - * Returns true if the region contains atleast one of the metadata rows we are interested in + * Returns true if the region contains at least one of the metadata rows we are interested in */ protected static boolean regionContainsMetadataRows(RegionInfo regionInfo, List<byte[]> metadataRowKeys) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java index 437f946647..d3176490d1 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java @@ -93,7 +93,7 @@ public class ReplicationLogGroupTest { public void setUp() throws IOException { conf = HBaseConfiguration.create(); localFs = FileSystem.getLocal(conf); - standbyUri = new Path(testFolder.toString()).toUri(); + standbyUri = new Path(testFolder.getRoot().toString()).toUri(); serverName = ServerName.valueOf("test", 60010, EnvironmentEdgeManager.currentTimeMillis()); conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, standbyUri.toString()); // Small ring buffer size for testing