This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 491ecfecf8 PHOENIX-7601 Use synchronous replication in Phoenix coprocs
on the write path (#2334)
491ecfecf8 is described below
commit 491ecfecf8d6bacb3aeb416da52ded86c9905712
Author: tkhurana <[email protected]>
AuthorDate: Thu Dec 18 19:41:01 2025 -0800
PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write
path (#2334)
---
.../hbase/index/metrics/MetricsIndexerSource.java | 11 +
.../index/metrics/MetricsIndexerSourceImpl.java | 9 +
.../org/apache/phoenix/query/QueryServices.java | 2 +
.../apache/phoenix/query/QueryServicesOptions.java | 2 +
.../java/org/apache/phoenix/util/SchemaUtil.java | 20 ++
.../phoenix/hbase/index/IndexRegionObserver.java | 256 ++++++++++++++-
.../phoenix/hbase/index/wal/IndexedKeyValue.java | 9 +-
.../replication/ReplicationLogGroupWriter.java | 13 +
.../replication/SystemCatalogWALEntryFilter.java | 64 ++--
.../phoenix/replication/tool/LogFileAnalyzer.java | 131 ++++++--
.../phoenix/replication/ReplicationLogGroupIT.java | 346 +++++++++++++++++++++
.../java/org/apache/phoenix/query/BaseTest.java | 9 +-
.../replication/ReplicationLogGroupTest.java | 2 +-
13 files changed, 809 insertions(+), 65 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
index b0d625e417..c0582b8b99 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -92,6 +92,10 @@ public interface MetricsIndexerSource extends BaseSource {
String POST_INDEX_UPDATE_FAILURE_DESC =
"The number of failures of index updates post data updates";
+ String REPLICATION_SYNC_TIME = "replicationSyncTime";
+ String REPLICATION_SYNC_TIME_DESC =
+ "Histogram for the time in milliseconds to synchronously replicate a batch
of mutations";
+
/**
* Updates the index preparation time histogram (preBatchMutate).
* @param dataTableName Physical data table name
@@ -223,4 +227,11 @@ public interface MetricsIndexerSource extends BaseSource {
* @param dataTableName Physical data table name
*/
void incrementPostIndexUpdateFailures(String dataTableName);
+
+ /**
+ * Updates the replication sync time histogram.
+ * @param dataTableName Physical data table name
+ * @param t time taken in milliseconds
+ */
+ void updateReplicationSyncTime(String dataTableName, long t);
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
index 4968ea5a3f..d6fa330310 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -40,6 +40,7 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl
implements MetricsI
private final MutableFastCounter slowPostOpenCalls;
private final MetricHistogram duplicateKeyTimeHisto;
private final MutableFastCounter slowDuplicateKeyCalls;
+ private final MetricHistogram replicationSyncTimeHisto;
private final MetricHistogram preIndexUpdateTimeHisto;
private final MetricHistogram postIndexUpdateTimeHisto;
@@ -80,6 +81,8 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl
implements MetricsI
getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME,
DUPLICATE_KEY_TIME_DESC);
slowDuplicateKeyCalls =
getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY,
SLOW_DUPLICATE_KEY_DESC, 0L);
+ replicationSyncTimeHisto =
+ getMetricsRegistry().newHistogram(REPLICATION_SYNC_TIME,
REPLICATION_SYNC_TIME_DESC);
postIndexUpdateTimeHisto =
getMetricsRegistry().newHistogram(POST_INDEX_UPDATE_TIME,
POST_INDEX_UPDATE_TIME_DESC);
@@ -215,6 +218,12 @@ public class MetricsIndexerSourceImpl extends
BaseSourceImpl implements MetricsI
postIndexUpdateFailures.incr();
}
+ @Override
+ public void updateReplicationSyncTime(String dataTableName, long t) {
+ incrementTableSpecificHistogram(REPLICATION_SYNC_TIME, dataTableName, t);
+ replicationSyncTimeHisto.add(t);
+ }
+
private void incrementTableSpecificCounter(String baseCounterName, String
tableName) {
MutableFastCounter indexSpecificCounter =
getMetricsRegistry().getCounter(getCounterName(baseCounterName,
tableName), 0);
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index dc5d042e46..00d2608b9b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -638,6 +638,8 @@ public interface QueryServices extends SQLCloseable {
String USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP =
"phoenix.bloomfilter.multikey.pointlookup";
+ String SYNCHRONOUS_REPLICATION_ENABLED =
"phoenix.synchronous.replication.enabled";
+
/**
* Get executor service used for parallel scans
*/
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 2af299aa2c..7b0cb4cce9 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -507,6 +507,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE = 512;
public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC
= 60; // 1min
+ public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false;
+
private final Configuration config;
private QueryServicesOptions(Configuration config) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 81057f305f..b42a906ca3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1517,4 +1517,24 @@ public class SchemaUtil {
}
return false;
}
+
+ /**
+ * Data of non system tables is always replicated. Data of the system tables
in the list below is
+ * not replicated because this data is very specific to the cluster.
SYSTEM.SEQUENCE SYSTEM.STATS
+ * SYSTEM.LOG SYSTEM.TASK SYSTEM.FUNCTION SYSTEM.MUTEX SYSTEM.TRANSFORM
SYSTEM.CDC_STREAM_STATUS
+ * SYSTEM.CDC_STREAM For SYSTEM.CATALOG and SYSTEM.CHILD_LINK we only
replicate rows with tenant
+ * information. Non tenant (Global) rows are assumed to be executed by an
admin or an admin
+ * process in each cluster separately and thus not replicated.
+ * @param tableName full name of the table
+ * @return true if the table data should be replicated, else false
+ */
+ public static boolean shouldReplicateTable(byte[] tableName) {
+ if (!isSystemTable(tableName)) {
+ return true;
+ }
+ if (isMetaTable(tableName) || isChildLinkTable(tableName)) {
+ return true;
+ }
+ return false;
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fc55b2435d..5c12fc719e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -25,6 +25,8 @@ import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons
import static
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
import static
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT;
+import static
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
+import static
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;
import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
import java.io.ByteArrayInputStream;
@@ -43,9 +45,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -75,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -107,6 +112,7 @@ import
org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
import org.apache.phoenix.index.IndexMaintainer;
@@ -116,6 +122,8 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
import org.apache.phoenix.query.KeyRange;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.ReplicationLogGroup;
+import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PRow;
@@ -166,6 +174,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static final OperationStatus NOWRITE = new OperationStatus(SUCCESS);
public static final String PHOENIX_APPEND_METADATA_TO_WAL =
"phoenix.append.metadata.to.wal";
public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+ // Mutation attribute to ignore the mutation for replication
+ public static final String IGNORE_REPLICATION_ATTRIB = "_IGNORE_REPLICATION";
+ private static final byte[] IGNORE_REPLICATION_ATTRIB_VAL = new byte[] { 0 };
+ // TODO hardcoded for now, will fix later
+ public static final String DEFAULT_HA_GROUP = "DEFAULT_HA_GROUP";
/**
* Class to represent pending data table rows
@@ -218,6 +231,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static boolean failPostIndexUpdatesForTesting = false;
private static boolean failDataTableUpdatesForTesting = false;
private static boolean ignoreWritingDeleteColumnsToIndex = false;
+ private static boolean ignoreSyncReplicationForTesting = false;
public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
ignoreIndexRebuildForTesting = ignore;
@@ -239,6 +253,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
ignoreWritingDeleteColumnsToIndex = ignore;
}
+ public static void setIgnoreSyncReplicationForTesting(boolean ignore) {
+ ignoreSyncReplicationForTesting = ignore;
+ }
+
public enum BatchMutatePhase {
INIT,
PRE,
@@ -417,6 +435,46 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS =
100;
private byte[] encodedRegionName;
+ private boolean shouldReplicate;
+ private ReplicationLogGroup replicationLog;
+
+ // Don't replicate the mutation if this attribute is set
+ private static final Predicate<Mutation> IGNORE_REPLICATION =
+ mutation -> mutation.getAttribute(IGNORE_REPLICATION_ATTRIB) != null;
+
+ // Don't replicate the mutation for syscat/child link if the tenantid is not
+ // leading in the row key
+ private static final Predicate<Mutation> NOT_TENANT_ID_ROW_KEY_PREFIX =
+ mutation ->
!SystemCatalogWALEntryFilter.isTenantIdLeadingInKey(mutation.getRow(), 0);
+
+ // Don't replicate the mutation for child link if child is not a tenant view
+ private static final Predicate<Mutation> NOT_CHILD_LINK_TENANT_VIEW =
mutation -> {
+ boolean isChildLinkToTenantView = false;
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ for (Cell cell : cells) {
+ if (SystemCatalogWALEntryFilter.isCellChildLinkToTenantView(cell)) {
+ isChildLinkToTenantView = true;
+ break;
+ }
+ }
+ }
+ return !isChildLinkToTenantView;
+ };
+
+ /**
+ * If the replication filter evaluates to true, the mutation is ignored from
replication
+ */
+ private static Predicate<Mutation> getSynchronousReplicationFilter(byte[]
tableName) {
+ Predicate<Mutation> filter = IGNORE_REPLICATION;
+ if (SchemaUtil.isMetaTable(tableName)) {
+ filter = IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX);
+ } else if (SchemaUtil.isChildLinkTable(tableName)) {
+ filter =
IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX.and(NOT_CHILD_LINK_TENANT_VIEW));
+ }
+ return filter;
+ }
+
+ private Predicate<Mutation> ignoreReplicationFilter;
@Override
public Optional<RegionObserver> getRegionObserver() {
@@ -471,6 +529,18 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
BloomType bloomFilterType =
tableDescriptor.getColumnFamilies()[0].getBloomFilterType();
// when the table descriptor changes, the coproc is reloaded
this.useBloomFilter = bloomFilterType == BloomType.ROW;
+ byte[] tableName = env.getRegionInfo().getTable().getName();
+ this.shouldReplicate =
env.getConfiguration().getBoolean(SYNCHRONOUS_REPLICATION_ENABLED,
+ DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
+ if (this.shouldReplicate) {
+ // replication feature is enabled, check if it is enabled for the table
+ this.shouldReplicate = SchemaUtil.shouldReplicateTable(tableName);
+ }
+ if (this.shouldReplicate) {
+ this.replicationLog =
+ ReplicationLogGroup.get(env.getConfiguration(), env.getServerName(),
DEFAULT_HA_GROUP);
+ this.ignoreReplicationFilter =
getSynchronousReplicationFilter(tableName);
+ }
} catch (NoSuchMethodError ex) {
disabled = true;
LOG.error("Must be too early a version of HBase. Disabled coprocessor ",
ex);
@@ -503,7 +573,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return;
}
this.stopped = true;
- String msg = "Indexer is being stopped";
+ String msg = "IndexRegionObserver is being stopped";
this.builder.stop(msg);
this.preWriter.stop(msg);
this.postWriter.stop(msg);
@@ -580,6 +650,82 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
"Somehow didn't return an index update but also didn't propagate the
failure to the client!");
}
+ @Override
+ public void preWALRestore(
+ org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends
RegionCoprocessorEnvironment> ctx,
+ org.apache.hadoop.hbase.client.RegionInfo info,
org.apache.hadoop.hbase.wal.WALKey logKey,
+ WALEdit logEdit) throws IOException {
+ if (this.disabled) {
+ return;
+ }
+ if (!shouldReplicate) {
+ return;
+ }
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ replicateEditOnWALRestore(logKey, logEdit);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ metricSource.updatePreWALRestoreTime(dataTableName, duration);
+ }
+ }
+
+ /**
+ * A batch of mutations is recorded in a single WAL edit so a WAL edit can
have cells belonging to
+ * multiple rows. Further, for one mutation the WAL edit contains the
individual cells that are
+ * part of the mutation.
+ */
+ private void replicateEditOnWALRestore(org.apache.hadoop.hbase.wal.WALKey
logKey, WALEdit logEdit)
+ throws IOException {
+ ImmutableBytesPtr prevKey = null, currentKey = null;
+ Put put = null;
+ Delete del = null;
+ for (Cell kv : logEdit.getCells()) {
+ if (kv instanceof IndexedKeyValue) {
+ IndexedKeyValue ikv = (IndexedKeyValue) kv;
+ replicationLog.append(Bytes.toString(ikv.getIndexTable()), -1,
ikv.getMutation());
+ } else {
+ // While we can generate a separate mutation for every cell that is
part of the
+ // WAL edit and replicate each such mutation. Doing that will not be
very efficient
+ // since a mutation can have large number of cells. Instead, we first
group the
+ // cells belonging to the same row into a mutation and then replicate
that
+ // mutation.
+ currentKey = new ImmutableBytesPtr(kv.getRowArray(),
kv.getRowOffset(), kv.getRowLength());
+ if (!currentKey.equals(prevKey)) {
+ if (put != null && !this.ignoreReplicationFilter.test(put)) {
+ replicationLog.append(logKey.getTableName().getNameAsString(), -1,
put);
+ }
+ if (del != null && !this.ignoreReplicationFilter.test(del)) {
+ replicationLog.append(logKey.getTableName().getNameAsString(), -1,
del);
+ }
+ // reset
+ put = null;
+ del = null;
+ }
+ if (kv.getType() == Cell.Type.Put) {
+ if (put == null) {
+ put = new Put(currentKey.get(), currentKey.getOffset(),
currentKey.getLength());
+ }
+ put.add(kv);
+ } else {
+ if (del == null) {
+ del = new Delete(currentKey.get(), currentKey.getOffset(),
currentKey.getLength());
+ }
+ del.add(kv);
+ }
+ prevKey = currentKey;
+ }
+ }
+ // append the last one
+ if (put != null && !this.ignoreReplicationFilter.test(put)) {
+ replicationLog.append(logKey.getTableName().getNameAsString(), -1, put);
+ }
+ if (del != null && !this.ignoreReplicationFilter.test(del)) {
+ replicationLog.append(logKey.getTableName().getNameAsString(), -1, del);
+ }
+ replicationLog.sync();
+ }
+
private void populateRowsToLock(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
BatchMutateContext context) {
for (int i = 0; i < miniBatchOp.size(); i++) {
@@ -655,6 +801,11 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
// upserts, where 0 represents the row is not updated
Result result = Result.create(cells);
miniBatchOp.setOperationStatus(i, new OperationStatus(SUCCESS,
result));
+ // since this mutation is ignored by setting it's status to success
in the coproc
+ // it shouldn't be synchronously replicated
+ if (this.shouldReplicate) {
+ m.setAttribute(IGNORE_REPLICATION_ATTRIB,
IGNORE_REPLICATION_ATTRIB_VAL);
+ }
}
} else if (context.returnResult) {
Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new
HashMap<>();
@@ -1633,6 +1784,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
waitForPreviousConcurrentBatch(table, context);
}
preparePostIndexMutations(context, batchTimestamp, indexMetaData);
+ addGlobalIndexMutationsToWAL(miniBatchOp, context);
}
if (context.hasLocalIndex) {
// Group all the updates for a single row into a single update to be
processed (for local
@@ -1645,6 +1797,49 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
+ /**
+ * We need to add the index mutations to the data table's WAL to handle
cases where the RS crashes
+ * before the postBatchMutateIndispensably hook is called where the
mutations are synchronously
+ * replicated. This is needed because during WAL restore we don't have the
IndexMaintainer object
+ * to generate the corresponding index mutations.
+ */
+ private void
addGlobalIndexMutationsToWAL(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ BatchMutateContext context) {
+ if (!this.shouldReplicate) {
+ return;
+ }
+
+ WALEdit edit = miniBatchOp.getWalEdit(0);
+ if (edit == null) {
+ edit = new WALEdit();
+ miniBatchOp.setWalEdit(0, edit);
+ }
+
+ if (context.preIndexUpdates != null) {
+ for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.preIndexUpdates
+ .entries()) {
+ if (this.ignoreReplicationFilter.test(entry.getValue())) {
+ continue;
+ }
+ // This creates cells of family type WALEdit.METAFAMILY which are not
applied
+ // on restore
+ edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getKey().get(),
entry.getValue()));
+ }
+ }
+
+ if (context.postIndexUpdates != null) {
+ for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.postIndexUpdates
+ .entries()) {
+ if (this.ignoreReplicationFilter.test(entry.getValue())) {
+ continue;
+ }
+ // This creates cells of family type WALEdit.METAFAMILY which are not
applied
+ // on restore
+ edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getKey().get(),
entry.getValue()));
+ }
+ }
+ }
+
/**
* In case of ON DUPLICATE KEY IGNORE, if the row already exists no
mutations will be generated so
* release the row lock.
@@ -1776,7 +1971,16 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
if (success) { // The pre-index and data table updates are successful,
and now, do post index
// updates
- doPost(c, context);
+ CompletableFuture<Void> postIndexFuture =
+ CompletableFuture.runAsync(() -> doPost(c, context));
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ replicateMutations(miniBatchOp, context);
+ } finally {
+ long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+ metricSource.updateReplicationSyncTime(dataTableName, duration);
+ }
+ FutureUtils.get(postIndexFuture);
}
} finally {
removeBatchMutateContext(c);
@@ -1842,8 +2046,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
- private void doPost(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context)
- throws IOException {
+ private void doPost(ObserverContext<RegionCoprocessorEnvironment> c,
BatchMutateContext context) {
long start = EnvironmentEdgeManager.currentTimeMillis();
try {
@@ -2329,4 +2532,49 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
public static boolean isAtomicOperationComplete(OperationStatus status) {
return status.getOperationStatusCode() == SUCCESS && status.getResult() !=
null;
}
+
+ private void replicateMutations(MiniBatchOperationInProgress<Mutation>
miniBatchOp,
+ BatchMutateContext context) throws IOException {
+
+ if (!this.shouldReplicate) {
+ return;
+ }
+ if (ignoreSyncReplicationForTesting) {
+ return;
+ }
+ assert this.replicationLog != null;
+
+ for (Integer i = 0; i < miniBatchOp.size(); i++) {
+ Mutation m = miniBatchOp.getOperation(i);
+ if (this.ignoreReplicationFilter.test(m)) {
+ continue;
+ }
+ this.replicationLog.append(this.dataTableName, -1, m);
+ Mutation[] mutationsAddedByCP =
miniBatchOp.getOperationsFromCoprocessors(i);
+ if (mutationsAddedByCP != null) {
+ for (Mutation addedMutation : mutationsAddedByCP) {
+ this.replicationLog.append(this.dataTableName, -1, addedMutation);
+ }
+ }
+ }
+ if (context.preIndexUpdates != null) {
+ for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.preIndexUpdates
+ .entries()) {
+ if (this.ignoreReplicationFilter.test(entry.getValue())) {
+ continue;
+ }
+ this.replicationLog.append(entry.getKey().getTableName(), -1,
entry.getValue());
+ }
+ }
+ if (context.postIndexUpdates != null) {
+ for (Map.Entry<HTableInterfaceReference, Mutation> entry :
context.postIndexUpdates
+ .entries()) {
+ if (this.ignoreReplicationFilter.test(entry.getValue())) {
+ continue;
+ }
+ this.replicationLog.append(entry.getKey().getTableName(), -1,
entry.getValue());
+ }
+ }
+ this.replicationLog.sync();
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
index 82434bc230..3696822171 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -51,6 +51,11 @@ public class IndexedKeyValue extends KeyValue {
private int hashCode;
public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m) {
+ Cell indexWALCell = adaptFirstCellFromMutation(m);
+ return new IndexedKeyValue(indexWALCell, new ImmutableBytesPtr(bs), m);
+ }
+
+ public static IndexedKeyValue newIndexedKeyValue(ImmutableBytesPtr bs,
Mutation m) {
Cell indexWALCell = adaptFirstCellFromMutation(m);
return new IndexedKeyValue(indexWALCell, bs, m);
}
@@ -81,9 +86,9 @@ public class IndexedKeyValue extends KeyValue {
public IndexedKeyValue() {
}
- private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation) {
+ private IndexedKeyValue(Cell c, ImmutableBytesPtr bs, Mutation mutation) {
super(c);
- this.indexTableName = new ImmutableBytesPtr(bs);
+ this.indexTableName = bs;
this.mutation = mutation;
this.hashCode = calcHashCode(indexTableName, mutation);
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index b76b8fab34..5391a57cdd 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -454,6 +454,19 @@ public abstract class ReplicationLogGroupWriter {
}
}
+ /**
+ * Close the currentWriter. Needed by tests so that we can close the log
file and then read it
+ */
+ protected void closeCurrentWriter() {
+ lock.lock();
+ try {
+ closeWriter(currentWriter);
+ currentWriter = null;
+ } finally {
+ lock.unlock();
+ }
+ }
+
/**
* Check if this ReplicationLogGroup is closed.
* @return true if closed, false otherwise
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
index 239aa0c501..399fd897e9 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
@@ -74,50 +74,62 @@ public class SystemCatalogWALEntryFilter implements
WALEntryFilter, WALCellFilte
}
/**
- * Does the cell key have leading tenant Id.
+ * Does the cell row key have leading tenant Id.
* @param cell hbase cell
* @return true if the cell has leading tenant Id in key
*/
private boolean isTenantIdLeadingInKey(final Cell cell) {
// rows in system.catalog or system child that aren't tenant-owned
// will have a leading separator byte
- return cell.getRowArray()[cell.getRowOffset()] !=
QueryConstants.SEPARATOR_BYTE;
+ return isTenantIdLeadingInKey(cell.getRowArray(), cell.getRowOffset());
}
/**
- * is the cell for system child link a tenant owned. Besides the non empty
tenant id,
- * system.child_link table have tenant owned data for parent child links. In
this case, the column
+ * Checks if the row key of SYSTEM.CATALOG or SYSTEM.CHILD_LINK row has
leading tenant ID
+ * @return true if the row key has leading tenant ID
+ */
+ public static boolean isTenantIdLeadingInKey(byte[] rowKey, int rowOffset) {
+ return rowKey[rowOffset] != QueryConstants.SEPARATOR_BYTE;
+ }
+
+ /**
+ * Is the cell for system child link a tenant owned. This happens if the
tenant id is leading in
+ * the row key or the cell has tenant owned data for parent child links.
+ * @param cell hbase cell
+ * @return true if the cell is tenant owned
+ */
+ private boolean isTenantRowCellSystemChildLink(final Cell cell) {
+ return isTenantIdLeadingInKey(cell) || isCellChildLinkToTenantView(cell);
+ }
+
+ /**
+ * SYSTEM.CHILD_LINK table have tenant owned data for parent child links. In
this case, the column
* qualifier is {@code PhoenixDatabaseMetaData#LINK_TYPE_BYTES} and value is
* {@code PTable.LinkType.CHILD_TABLE}. For corresponding delete markers the
KeyValue type
* {@code KeyValue.Type} is {@code KeyValue.Type.DeleteFamily}
* @param cell hbase cell
* @return true if the cell is tenant owned
*/
- private boolean isTenantRowCellSystemChildLink(final Cell cell) {
- boolean isTenantRowCell = isTenantIdLeadingInKey(cell);
-
+ public static boolean isCellChildLinkToTenantView(final Cell cell) {
ImmutableBytesWritable key =
new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
boolean isChildLinkToTenantView = false;
- if (!isTenantRowCell) {
- boolean isChildLink =
- CellUtil.matchingQualifier(cell,
PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
- if (
- (isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES))
- || cell.getType() == Cell.Type.DeleteFamily
- ) {
- byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
- SchemaUtil.getVarChars(key.get(), key.getOffset(), key.getLength(), 0,
rowViewKeyMetadata);
- /**
- * if the child link is to a tenant-owned view, the COLUMN_NAME field
will be the byte[] of
- * the tenant otherwise, it will be an empty byte array (NOT
QueryConstants.SEPARATOR_BYTE,
- * but a byte[0]). This assumption is also true for child link's
delete markers in
- * SYSTEM.CHILD_LINK as it only contains link rows and does not deal
with other type of rows
- * like column rows that also has COLUMN_NAME populated with actual
column name.
- **/
- isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length
!= 0;
- }
+ boolean isChildLink = CellUtil.matchingQualifier(cell,
PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
+ if (
+ isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES)
+ || cell.getType() == Cell.Type.DeleteFamily
+ ) {
+ byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
+ SchemaUtil.getVarChars(key.get(), key.getOffset(), key.getLength(), 0,
rowViewKeyMetadata);
+ /**
+ * if the child link is to a tenant-owned view, the COLUMN_NAME field
will be the byte[] of
+ * the tenant otherwise, it will be an empty byte array (NOT
QueryConstants.SEPARATOR_BYTE,
+ * but a byte[0]). This assumption is also true for child link's delete
markers in
+ * SYSTEM.CHILD_LINK as it only contains link rows and does not deal
with other type of rows
+ * like column rows that also has COLUMN_NAME populated with actual
column name.
+ **/
+ isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length
!= 0;
}
- return isTenantRowCell || isChildLinkToTenantView;
+ return isChildLinkToTenantView;
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
index d5b1bb43d8..18172721a0 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -18,22 +18,28 @@
package org.apache.phoenix.replication.tool;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFile.Record;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
/**
* Command-line tool for analyzing Phoenix Replication Log files. This tool
can: - Read a single log
* file or directory of log files - Print file headers, trailers, and block
headers - Decode and
@@ -51,6 +57,18 @@ public class LogFileAnalyzer extends Configured implements
Tool {
private boolean verbose = false;
private boolean decode = false;
private boolean check = false;
+ FileSystem fs;
+
+ private void init() throws IOException {
+ Configuration conf = getConf();
+ if (conf == null) {
+ conf = HBaseConfiguration.create();
+ setConf(conf);
+ }
+ if (fs == null) {
+ fs = FileSystem.get(getConf());
+ }
+ }
@Override
public int run(String[] args) throws Exception {
@@ -59,29 +77,10 @@ public class LogFileAnalyzer extends Configured implements
Tool {
return 1;
}
- Configuration conf = getConf();
- if (conf == null) {
- conf = HBaseConfiguration.create();
- setConf(conf);
- }
-
try {
- FileSystem fs = FileSystem.get(conf);
+ init();
Path path = new Path(args[args.length - 1]);
-
- if (!fs.exists(path)) {
- System.err.println("Path does not exist: " + path);
- return 1;
- }
-
- List<Path> filesToAnalyze = new ArrayList<>();
- if (fs.getFileStatus(path).isDirectory()) {
- // Recursively find all .plog files
- findLogFiles(fs, path, filesToAnalyze);
- } else {
- filesToAnalyze.add(path);
- }
-
+ List<Path> filesToAnalyze = getFilesToAnalyze(path);
if (filesToAnalyze.isEmpty()) {
System.err.println("No log files found in: " + path);
return 1;
@@ -89,7 +88,7 @@ public class LogFileAnalyzer extends Configured implements
Tool {
// Analyze each file
for (Path file : filesToAnalyze) {
- analyzeFile(fs, file);
+ analyzeFile(file);
}
return 0;
@@ -99,19 +98,83 @@ public class LogFileAnalyzer extends Configured implements
Tool {
}
}
- private void findLogFiles(FileSystem fs, Path dir, List<Path> files) throws
IOException {
+ /**
+ * Returns all the mutations grouped by the table name under a source path
+ * @param source Path which can be a file or directory
+ * @return Mutations grouped by the table name
+ */
+ public Map<String, List<Mutation>> groupLogsByTable(String source) throws
IOException {
+ Map<String, List<Mutation>> allFiles = Maps.newHashMap();
+ init();
+ Path path = new Path(source);
+ List<Path> filesToAnalyze = getFilesToAnalyze(path);
+ if (filesToAnalyze.isEmpty()) {
+ return allFiles;
+ }
+ // Analyze each file
+ for (Path file : filesToAnalyze) {
+ Map<String, List<Mutation>> perFile = groupLogsByTable(file);
+ for (Map.Entry<String, List<Mutation>> entry : perFile.entrySet()) {
+ List<Mutation> mutations = allFiles.get(entry.getKey());
+ if (mutations == null) {
+ allFiles.put(entry.getKey(), entry.getValue());
+ } else {
+ mutations.addAll(entry.getValue());
+ }
+ }
+ }
+ return allFiles;
+ }
+
+ private Map<String, List<Mutation>> groupLogsByTable(Path file) throws
IOException {
+ Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap();
+ System.out.println("\nAnalyzing file: " + file);
+ LogFileReaderContext context = new
LogFileReaderContext(getConf()).setFileSystem(fs)
+ .setFilePath(file).setSkipCorruptBlocks(check); // Skip corrupt blocks
if checking
+ LogFileReader reader = new LogFileReader();
+ try {
+ reader.init(context);
+ // Process records
+ Record record;
+ while ((record = reader.next()) != null) {
+ String tableName = record.getHBaseTableName();
+ List<Mutation> mutations = mutationsByTable.getOrDefault(tableName,
Lists.newArrayList());
+ mutations.add(record.getMutation());
+ mutationsByTable.put(tableName, mutations);
+ }
+ } finally {
+ reader.close();
+ }
+ return mutationsByTable;
+ }
+
+ private List<Path> getFilesToAnalyze(Path path) throws IOException {
+ if (!fs.exists(path)) {
+ throw new PathNotFoundException(path.toString());
+ }
+ List<Path> filesToAnalyze = Lists.newArrayList();
+ if (fs.getFileStatus(path).isDirectory()) {
+ // Recursively find all .plog files
+ findLogFiles(path, filesToAnalyze);
+ } else {
+ filesToAnalyze.add(path);
+ }
+ return filesToAnalyze;
+ }
+
+ private void findLogFiles(Path dir, List<Path> files) throws IOException {
FileStatus[] statuses = fs.listStatus(dir);
for (FileStatus status : statuses) {
Path path = status.getPath();
if (status.isDirectory()) {
- findLogFiles(fs, path, files);
+ findLogFiles(path, files);
} else if (path.getName().endsWith(".plog")) {
files.add(path);
}
}
}
- private void analyzeFile(FileSystem fs, Path file) throws IOException {
+ private void analyzeFile(Path file) throws IOException {
System.out.println("\nAnalyzing file: " + file);
LogFileReaderContext context = new
LogFileReaderContext(getConf()).setFileSystem(fs)
@@ -143,11 +206,17 @@ public class LogFileAnalyzer extends Configured
implements Tool {
}
// Print trailer information
- System.out.println("\nTrailer:");
- System.out.println(" Record Count: " +
reader.getTrailer().getRecordCount());
- System.out.println(" Block Count: " +
reader.getTrailer().getBlockCount());
- System.out.println(" Blocks Start Offset: " +
reader.getTrailer().getBlocksStartOffset());
- System.out.println(" Trailer Start Offset: " +
reader.getTrailer().getTrailerStartOffset());
+ LogFile.Trailer trailer = reader.getTrailer();
+ if (trailer != null) {
+ System.out.println("\nTrailer:");
+ System.out.println(" Record Count: " +
reader.getTrailer().getRecordCount());
+ System.out.println(" Block Count: " +
reader.getTrailer().getBlockCount());
+ System.out.println(" Blocks Start Offset: " +
reader.getTrailer().getBlocksStartOffset());
+ System.out
+ .println(" Trailer Start Offset: " +
reader.getTrailer().getTrailerStartOffset());
+ } else {
+ System.out.println("\nTrailer is null");
+ }
// Print verification results if checking
if (check) {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
new file mode 100644
index 0000000000..21644d86fa
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static
org.apache.phoenix.hbase.index.IndexRegionObserver.DEFAULT_HA_GROUP;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.tool.LogFileAnalyzer;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogGroupIT extends ParallelStatsDisabledIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static synchronized void doSetup() throws Exception {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+ props.put(QueryServices.SYNCHRONOUS_REPLICATION_ENABLED,
Boolean.TRUE.toString());
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @Before
+ public void beforeTest() throws Exception {
+ LOG.info("Starting test {}", name.getMethodName());
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ LOG.info("Starting cleanup for test {}", name.getMethodName());
+ cleanupLogsFolder(standbyUri);
+ LOG.info("Ending cleanup for test {}", name.getMethodName());
+ }
+
+ /**
+ * Delete all the shards under the top level replication log directory
+ */
+ private void cleanupLogsFolder(URI source) throws IOException {
+ FileSystem fs = FileSystem.get(config);
+ Path dir = new Path(source.getPath());
+ FileStatus[] statuses = fs.listStatus(dir);
+ for (FileStatus status : statuses) {
+ Path shard = status.getPath();
+ if (status.isDirectory()) {
+ fs.delete(shard, true);
+ }
+ }
+ }
+
+ private ReplicationLogGroup getReplicationLogGroup() throws IOException {
+ HRegionServer rs = getUtility().getHBaseCluster().getRegionServer(0);
+ return ReplicationLogGroup.get(config, rs.getServerName(),
DEFAULT_HA_GROUP);
+ }
+
+ private Map<String, List<Mutation>> groupLogsByTable() throws Exception {
+ ReplicationLogGroup log = getReplicationLogGroup();
+ log.getActiveWriter().closeCurrentWriter();
+ LogFileAnalyzer analyzer = new LogFileAnalyzer();
+ analyzer.setConf(config);
+ String[] args = { "--check", standbyUri.getPath() };
+ assertEquals(0, analyzer.run(args));
+ return analyzer.groupLogsByTable(standbyUri.getPath());
+ }
+
+ private int getCountForTable(Map<String, List<Mutation>> logsByTable, String
tableName)
+ throws Exception {
+ List<Mutation> mutations = logsByTable.get(tableName);
+ return mutations != null ? mutations.size() : 0;
+ }
+
+ private void verifyReplication(Connection conn, Map<String, Integer>
expected) throws Exception {
+ Map<String, List<Mutation>> mutationsByTable = groupLogsByTable();
+ dumpTableLogCount(mutationsByTable);
+ for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+ String tableName = entry.getKey();
+ int expectedMutationCount = entry.getValue();
+ List<Mutation> mutations = mutationsByTable.get(tableName);
+ int actualMutationCount = mutations != null ? mutations.size() : 0;
+ try {
+ if (!tableName.equals(SYSTEM_CATALOG_NAME)) {
+ assertEquals(String.format("For table %s", tableName),
expectedMutationCount,
+ actualMutationCount);
+ } else {
+ // special handling for syscat
+ assertTrue("For SYSCAT", actualMutationCount >=
expectedMutationCount);
+ }
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+ throw e;
+ }
+ }
+ }
+
+ private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable)
{
+ LOG.info("Dump table log count for test {}", name.getMethodName());
+ for (Map.Entry<String, List<Mutation>> table :
mutationsByTable.entrySet()) {
+ LOG.info("#Log entries for {} = {}", table.getKey(),
table.getValue().size());
+ }
+ }
+
+ private void moveRegionToServer(TableName tableName, ServerName sn) throws
Exception {
+ HBaseTestingUtility util = getUtility();
+ try (RegionLocator locator =
util.getConnection().getRegionLocator(tableName)) {
+ String regEN =
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+ while
(!sn.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+ LOG.info("Moving region {} of table {} to server {}", regEN,
tableName, sn);
+ util.getAdmin().move(Bytes.toBytes(regEN), sn);
+ Thread.sleep(100);
+ }
+ LOG.info("Moved region {} of table {} to server {}", regEN, tableName,
sn);
+ }
+ }
+
+ private PhoenixTestBuilder.SchemaBuilder createViewHierarchy() throws
Exception {
+ // Define the test schema.
+ // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID,
KP)
+ // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID)
+ // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+ final PhoenixTestBuilder.SchemaBuilder schemaBuilder =
+ new PhoenixTestBuilder.SchemaBuilder(getUrl());
+ PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions =
+ PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+ PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions globalViewOptions =
+ PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+ PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
tenantViewWithOverrideOptions =
+ PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+ PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions
tenantViewIndexOverrideOptions =
+ PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+ .withTenantViewOptions(tenantViewWithOverrideOptions)
+
.withTenantViewIndexOptions(tenantViewIndexOverrideOptions).buildWithNewTenant();
+ }
+ return schemaBuilder;
+ }
+
+ @Test
+ public void testAppendAndSync() throws Exception {
+ final String tableName = "T_" + generateUniqueName();
+ final String indexName1 = "I_" + generateUniqueName();
+ final String indexName2 = "I_" + generateUniqueName();
+ final String indexName3 = "L_" + generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = String.format("create table %s (id1 integer not null, "
+ + "id2 integer not null, val1 varchar, val2 varchar "
+ + "constraint pk primary key (id1, id2))", tableName);
+ conn.createStatement().execute(ddl);
+ ddl = String.format("create index %s on %s (val1) include (val2)",
indexName1, tableName);
+ conn.createStatement().execute(ddl);
+ ddl = String.format("create index %s on %s (val2) include (val1)",
indexName2, tableName);
+ conn.createStatement().execute(ddl);
+ ddl = String.format("create local index %s on %s (id2,val1) include
(val2)", indexName3,
+ tableName);
+ conn.createStatement().execute(ddl);
+ conn.commit();
+ PreparedStatement stmt =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
+ // upsert 50 rows
+ int rowCount = 50;
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+ stmt.setString(4, null);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+ // do some atomic upserts which will be ignored and therefore not
replicated
+ stmt = conn.prepareStatement(
+ "upsert into " + tableName + " VALUES(?, ?, ?) " + "ON DUPLICATE KEY
IGNORE");
+ conn.setAutoCommit(true);
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 2; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, null);
+ assertEquals(0, stmt.executeUpdate());
+ }
+ }
+ // verify the correctness of the index
+ IndexToolIT.verifyIndexTable(tableName, indexName1, conn);
+ // verify replication
+ Map<String, Integer> expected = Maps.newHashMap();
+ // mutation count will be equal to row count since the atomic upsert
mutations will be
+ // ignored and therefore not replicated
+ expected.put(tableName, rowCount * 3); // Put + Delete + local index
update
+ // for index1 unverified + verified + delete (Delete column)
+ expected.put(indexName1, rowCount * 3);
+ // for index2 unverified + verified since the null column is part of row
key
+ expected.put(indexName2, rowCount * 2);
+ // we didn't create any tenant views so no change in the syscat entries
+ expected.put(SYSTEM_CATALOG_NAME, 0);
+ expected.put(SYSTEM_CHILD_LINK_NAME, 0);
+ verifyReplication(conn, expected);
+ }
+ }
+
+ /**
+ * This test simulates RS crashes in the middle of write transactions after
the edits have been
+ * written to the WAL but before they have been replicated to the standby
cluster. Those edits
+ * will be replicated when the WAL is replayed.
+ */
+ @Test
+ public void testWALRestore() throws Exception {
+ HBaseTestingUtility util = getUtility();
+ MiniHBaseCluster cluster = util.getHBaseCluster();
+ final String tableName = "T_" + generateUniqueName();
+ final String indexName = "I_" + generateUniqueName();
+ TableName table = TableName.valueOf(tableName);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String ddl = String.format("create table %s (id1 integer not null, "
+ + "id2 integer not null, val1 varchar, val2 varchar "
+ + "constraint pk primary key (id1, id2))", tableName);
+ conn.createStatement().execute(ddl);
+ ddl = String.format("create index %s on %s (val1) include (val2)",
indexName, tableName);
+ conn.createStatement().execute(ddl);
+ conn.commit();
+ }
+ // Mini cluster by default comes with only 1 RS. Starting a second RS so
that
+ // we can kill the RS
+ JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer();
+ ServerName sn2 = rs2.getRegionServer().getServerName();
+ // Assign some table regions to the new RS we started above
+ moveRegionToServer(table, sn2);
+ moveRegionToServer(TableName.valueOf(SYSTEM_CATALOG_NAME), sn2);
+ moveRegionToServer(TableName.valueOf(SYSTEM_CHILD_LINK_NAME), sn2);
+ int rowCount = 50;
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ PreparedStatement stmt =
+ conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?,
?)");
+ // upsert 50 rows
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+ stmt.setString(4, null); // Generate a DeleteColumn cell
+ stmt.executeUpdate();
+ }
+ // we want to simulate RS crash after updating memstore and WAL
+ IndexRegionObserver.setIgnoreSyncReplicationForTesting(true);
+ conn.commit();
+ }
+ // Create tenant views for syscat and child link replication
+ createViewHierarchy();
+ } finally {
+ IndexRegionObserver.setIgnoreSyncReplicationForTesting(false);
+ }
+ // Kill the RS
+ cluster.killRegionServer(rs2.getRegionServer().getServerName());
+ Threads.sleep(20000); // just to be sure that the kill has fully started.
+ // Regions will be re-opened and the WAL will be replayed
+ util.waitUntilAllRegionsAssigned(table);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ Map<String, Integer> expected = Maps.newHashMap();
+ // For each row 1 Put + 1 Delete (DeleteColumn)
+ expected.put(tableName, rowCount * 2);
+ // unverified + verified + delete (Delete column)
+ expected.put(indexName, rowCount * 3);
+ // 1 tenant view was created
+ expected.put(SYSTEM_CHILD_LINK_NAME, 1);
+ // atleast 1 log entry for syscat
+ expected.put(SYSTEM_CATALOG_NAME, 1);
+ verifyReplication(conn, expected);
+ }
+ }
+
+ @Test
+ public void testSystemTables() throws Exception {
+ createViewHierarchy();
+ Map<String, List<Mutation>> logsByTable = groupLogsByTable();
+ dumpTableLogCount(logsByTable);
+ // find all the log entries for system tables
+ Map<String,
+ List<Mutation>> systemTables = logsByTable.entrySet().stream()
+ .filter(entry ->
entry.getKey().startsWith(QueryConstants.SYSTEM_SCHEMA_NAME))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ // there should be only 2 entries CATALOG, CHILD_LINK
+ assertEquals(2, systemTables.size());
+ assertEquals(1, getCountForTable(systemTables, SYSTEM_CHILD_LINK_NAME));
+ assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);
+ }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7b14996fba..53d2100cfd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -85,6 +85,7 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.math.BigDecimal;
+import java.net.URI;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
@@ -117,6 +118,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@@ -152,6 +154,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.replication.ReplicationLogGroup;
import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
@@ -343,6 +346,8 @@ public abstract class BaseTest {
protected static boolean clusterInitialized = false;
protected static HBaseTestingUtility utility;
protected static final Configuration config = HBaseConfiguration.create();
+ protected static final String logDir = "/PHOENIX_REPLICATION_IN";
+ protected static URI standbyUri = new Path(logDir).toUri();
protected static String getUrl() {
if (!clusterInitialized) {
@@ -586,6 +591,8 @@ public abstract class BaseTest {
conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
}
setPhoenixRegionServerEndpoint(conf);
+ // setup up synchronous replication
+ conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
return conf;
}
@@ -1900,7 +1907,7 @@ public abstract class BaseTest {
}
/**
- * Returns true if the region contains atleast one of the metadata rows we
are interested in
+ * Returns true if the region contains at least one of the metadata rows we
are interested in
*/
protected static boolean regionContainsMetadataRows(RegionInfo regionInfo,
List<byte[]> metadataRowKeys) {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index e6e5818430..a317d17a96 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -92,7 +92,7 @@ public class ReplicationLogGroupTest {
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
localFs = FileSystem.getLocal(conf);
- standbyUri = new Path(testFolder.toString()).toUri();
+ standbyUri = new Path(testFolder.getRoot().toString()).toUri();
serverName = ServerName.valueOf("test", 60010,
EnvironmentEdgeManager.currentTimeMillis());
conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY,
standbyUri.toString());
// Small ring buffer size for testing