This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 491ecfecf8 PHOENIX-7601 Use synchronous replication in Phoenix coprocs 
on the write path (#2334)
491ecfecf8 is described below

commit 491ecfecf8d6bacb3aeb416da52ded86c9905712
Author: tkhurana <[email protected]>
AuthorDate: Thu Dec 18 19:41:01 2025 -0800

    PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write 
path (#2334)
---
 .../hbase/index/metrics/MetricsIndexerSource.java  |  11 +
 .../index/metrics/MetricsIndexerSourceImpl.java    |   9 +
 .../org/apache/phoenix/query/QueryServices.java    |   2 +
 .../apache/phoenix/query/QueryServicesOptions.java |   2 +
 .../java/org/apache/phoenix/util/SchemaUtil.java   |  20 ++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 256 ++++++++++++++-
 .../phoenix/hbase/index/wal/IndexedKeyValue.java   |   9 +-
 .../replication/ReplicationLogGroupWriter.java     |  13 +
 .../replication/SystemCatalogWALEntryFilter.java   |  64 ++--
 .../phoenix/replication/tool/LogFileAnalyzer.java  | 131 ++++++--
 .../phoenix/replication/ReplicationLogGroupIT.java | 346 +++++++++++++++++++++
 .../java/org/apache/phoenix/query/BaseTest.java    |   9 +-
 .../replication/ReplicationLogGroupTest.java       |   2 +-
 13 files changed, 809 insertions(+), 65 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
index b0d625e417..c0582b8b99 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -92,6 +92,10 @@ public interface MetricsIndexerSource extends BaseSource {
   String POST_INDEX_UPDATE_FAILURE_DESC =
     "The number of failures of index updates post data updates";
 
+  String REPLICATION_SYNC_TIME = "replicationSyncTime";
+  String REPLICATION_SYNC_TIME_DESC =
+    "Histogram for the time in milliseconds to synchronously replicate a batch 
of mutations";
+
   /**
    * Updates the index preparation time histogram (preBatchMutate).
    * @param dataTableName Physical data table name
@@ -223,4 +227,11 @@ public interface MetricsIndexerSource extends BaseSource {
    * @param dataTableName Physical data table name
    */
   void incrementPostIndexUpdateFailures(String dataTableName);
+
+  /**
+   * Updates the replication sync time histogram.
+   * @param dataTableName Physical data table name
+   * @param t             time taken in milliseconds
+   */
+  void updateReplicationSyncTime(String dataTableName, long t);
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
index 4968ea5a3f..d6fa330310 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -40,6 +40,7 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl 
implements MetricsI
   private final MutableFastCounter slowPostOpenCalls;
   private final MetricHistogram duplicateKeyTimeHisto;
   private final MutableFastCounter slowDuplicateKeyCalls;
+  private final MetricHistogram replicationSyncTimeHisto;
 
   private final MetricHistogram preIndexUpdateTimeHisto;
   private final MetricHistogram postIndexUpdateTimeHisto;
@@ -80,6 +81,8 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl 
implements MetricsI
       getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, 
DUPLICATE_KEY_TIME_DESC);
     slowDuplicateKeyCalls =
       getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, 
SLOW_DUPLICATE_KEY_DESC, 0L);
+    replicationSyncTimeHisto =
+      getMetricsRegistry().newHistogram(REPLICATION_SYNC_TIME, 
REPLICATION_SYNC_TIME_DESC);
 
     postIndexUpdateTimeHisto =
       getMetricsRegistry().newHistogram(POST_INDEX_UPDATE_TIME, 
POST_INDEX_UPDATE_TIME_DESC);
@@ -215,6 +218,12 @@ public class MetricsIndexerSourceImpl extends 
BaseSourceImpl implements MetricsI
     postIndexUpdateFailures.incr();
   }
 
+  @Override
+  public void updateReplicationSyncTime(String dataTableName, long t) {
+    incrementTableSpecificHistogram(REPLICATION_SYNC_TIME, dataTableName, t);
+    replicationSyncTimeHisto.add(t);
+  }
+
   private void incrementTableSpecificCounter(String baseCounterName, String 
tableName) {
     MutableFastCounter indexSpecificCounter =
       getMetricsRegistry().getCounter(getCounterName(baseCounterName, 
tableName), 0);
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index dc5d042e46..00d2608b9b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -638,6 +638,8 @@ public interface QueryServices extends SQLCloseable {
 
   String USE_BLOOMFILTER_FOR_MULTIKEY_POINTLOOKUP = 
"phoenix.bloomfilter.multikey.pointlookup";
 
+  String SYNCHRONOUS_REPLICATION_ENABLED = 
"phoenix.synchronous.replication.enabled";
+
   /**
    * Get executor service used for parallel scans
    */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 2af299aa2c..7b0cb4cce9 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -507,6 +507,8 @@ public class QueryServicesOptions {
   public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_MAX_POOL_SIZE = 512;
   public static final int DEFAULT_PHOENIX_UNCOVERED_INDEX_KEEP_ALIVE_TIME_SEC 
= 60; // 1min
 
+  public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = false;
+
   private final Configuration config;
 
   private QueryServicesOptions(Configuration config) {
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 81057f305f..b42a906ca3 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1517,4 +1517,24 @@ public class SchemaUtil {
     }
     return false;
   }
+
+  /**
+   * Data of non system tables is always replicated. Data of the system tables 
in the list below is
+   * not replicated because this data is very specific to the cluster. 
SYSTEM.SEQUENCE SYSTEM.STATS
+   * SYSTEM.LOG SYSTEM.TASK SYSTEM.FUNCTION SYSTEM.MUTEX SYSTEM.TRANSFORM 
SYSTEM.CDC_STREAM_STATUS
+   * SYSTEM.CDC_STREAM For SYSTEM.CATALOG and SYSTEM.CHILD_LINK we only 
replicate rows with tenant
+   * information. Non tenant (Global) rows are assumed to be executed by an 
admin or an admin
+   * process in each cluster separately and thus not replicated.
+   * @param tableName full name of the table
+   * @return true if the table data should be replicated, else false
+   */
+  public static boolean shouldReplicateTable(byte[] tableName) {
+    if (!isSystemTable(tableName)) {
+      return true;
+    }
+    if (isMetaTable(tableName) || isChildLinkTable(tableName)) {
+      return true;
+    }
+    return false;
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fc55b2435d..5c12fc719e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -25,6 +25,8 @@ import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons
 import static 
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
 import static 
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
 import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT;
+import static 
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 import java.io.ByteArrayInputStream;
@@ -43,9 +45,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -75,6 +79,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -107,6 +112,7 @@ import 
org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -116,6 +122,8 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.ReplicationLogGroup;
+import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
 import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
@@ -166,6 +174,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private static final OperationStatus NOWRITE = new OperationStatus(SUCCESS);
   public static final String PHOENIX_APPEND_METADATA_TO_WAL = 
"phoenix.append.metadata.to.wal";
   public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+  // Mutation attribute to ignore the mutation for replication
+  public static final String IGNORE_REPLICATION_ATTRIB = "_IGNORE_REPLICATION";
+  private static final byte[] IGNORE_REPLICATION_ATTRIB_VAL = new byte[] { 0 };
+  // TODO hardcoded for now, will fix later
+  public static final String DEFAULT_HA_GROUP = "DEFAULT_HA_GROUP";
 
   /**
    * Class to represent pending data table rows
@@ -218,6 +231,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private static boolean failPostIndexUpdatesForTesting = false;
   private static boolean failDataTableUpdatesForTesting = false;
   private static boolean ignoreWritingDeleteColumnsToIndex = false;
+  private static boolean ignoreSyncReplicationForTesting = false;
 
   public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
     ignoreIndexRebuildForTesting = ignore;
@@ -239,6 +253,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     ignoreWritingDeleteColumnsToIndex = ignore;
   }
 
+  public static void setIgnoreSyncReplicationForTesting(boolean ignore) {
+    ignoreSyncReplicationForTesting = ignore;
+  }
+
   public enum BatchMutatePhase {
     INIT,
     PRE,
@@ -417,6 +435,46 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
   private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
   private byte[] encodedRegionName;
+  private boolean shouldReplicate;
+  private ReplicationLogGroup replicationLog;
+
+  // Don't replicate the mutation if this attribute is set
+  private static final Predicate<Mutation> IGNORE_REPLICATION =
+    mutation -> mutation.getAttribute(IGNORE_REPLICATION_ATTRIB) != null;
+
+  // Don't replicate the mutation for syscat/child link if the tenantid is not
+  // leading in the row key
+  private static final Predicate<Mutation> NOT_TENANT_ID_ROW_KEY_PREFIX =
+    mutation -> 
!SystemCatalogWALEntryFilter.isTenantIdLeadingInKey(mutation.getRow(), 0);
+
+  // Don't replicate the mutation for child link if child is not a tenant view
+  private static final Predicate<Mutation> NOT_CHILD_LINK_TENANT_VIEW = 
mutation -> {
+    boolean isChildLinkToTenantView = false;
+    for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+      for (Cell cell : cells) {
+        if (SystemCatalogWALEntryFilter.isCellChildLinkToTenantView(cell)) {
+          isChildLinkToTenantView = true;
+          break;
+        }
+      }
+    }
+    return !isChildLinkToTenantView;
+  };
+
+  /**
+   * If the replication filter evaluates to true, the mutation is ignored from 
replication
+   */
+  private static Predicate<Mutation> getSynchronousReplicationFilter(byte[] 
tableName) {
+    Predicate<Mutation> filter = IGNORE_REPLICATION;
+    if (SchemaUtil.isMetaTable(tableName)) {
+      filter = IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX);
+    } else if (SchemaUtil.isChildLinkTable(tableName)) {
+      filter = 
IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX.and(NOT_CHILD_LINK_TENANT_VIEW));
+    }
+    return filter;
+  }
+
+  private Predicate<Mutation> ignoreReplicationFilter;
 
   @Override
   public Optional<RegionObserver> getRegionObserver() {
@@ -471,6 +529,18 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       BloomType bloomFilterType = 
tableDescriptor.getColumnFamilies()[0].getBloomFilterType();
       // when the table descriptor changes, the coproc is reloaded
       this.useBloomFilter = bloomFilterType == BloomType.ROW;
+      byte[] tableName = env.getRegionInfo().getTable().getName();
+      this.shouldReplicate = 
env.getConfiguration().getBoolean(SYNCHRONOUS_REPLICATION_ENABLED,
+        DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
+      if (this.shouldReplicate) {
+        // replication feature is enabled, check if it is enabled for the table
+        this.shouldReplicate = SchemaUtil.shouldReplicateTable(tableName);
+      }
+      if (this.shouldReplicate) {
+        this.replicationLog =
+          ReplicationLogGroup.get(env.getConfiguration(), env.getServerName(), 
DEFAULT_HA_GROUP);
+        this.ignoreReplicationFilter = 
getSynchronousReplicationFilter(tableName);
+      }
     } catch (NoSuchMethodError ex) {
       disabled = true;
       LOG.error("Must be too early a version of HBase. Disabled coprocessor ", 
ex);
@@ -503,7 +573,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       return;
     }
     this.stopped = true;
-    String msg = "Indexer is being stopped";
+    String msg = "IndexRegionObserver is being stopped";
     this.builder.stop(msg);
     this.preWriter.stop(msg);
     this.postWriter.stop(msg);
@@ -580,6 +650,82 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       "Somehow didn't return an index update but also didn't propagate the 
failure to the client!");
   }
 
+  @Override
+  public void preWALRestore(
+    org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
+    org.apache.hadoop.hbase.client.RegionInfo info, 
org.apache.hadoop.hbase.wal.WALKey logKey,
+    WALEdit logEdit) throws IOException {
+    if (this.disabled) {
+      return;
+    }
+    if (!shouldReplicate) {
+      return;
+    }
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    try {
+      replicateEditOnWALRestore(logKey, logEdit);
+    } finally {
+      long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+      metricSource.updatePreWALRestoreTime(dataTableName, duration);
+    }
+  }
+
+  /**
+   * A batch of mutations is recorded in a single WAL edit so a WAL edit can 
have cells belonging to
+   * multiple rows. Further, for one mutation the WAL edit contains the 
individual cells that are
+   * part of the mutation.
+   */
+  private void replicateEditOnWALRestore(org.apache.hadoop.hbase.wal.WALKey 
logKey, WALEdit logEdit)
+    throws IOException {
+    ImmutableBytesPtr prevKey = null, currentKey = null;
+    Put put = null;
+    Delete del = null;
+    for (Cell kv : logEdit.getCells()) {
+      if (kv instanceof IndexedKeyValue) {
+        IndexedKeyValue ikv = (IndexedKeyValue) kv;
+        replicationLog.append(Bytes.toString(ikv.getIndexTable()), -1, 
ikv.getMutation());
+      } else {
+        // While we can generate a separate mutation for every cell that is 
part of the
+        // WAL edit and replicate each such mutation. Doing that will not be 
very efficient
+        // since a mutation can have large number of cells. Instead, we first 
group the
+        // cells belonging to the same row into a mutation and then replicate 
that
+        // mutation.
+        currentKey = new ImmutableBytesPtr(kv.getRowArray(), 
kv.getRowOffset(), kv.getRowLength());
+        if (!currentKey.equals(prevKey)) {
+          if (put != null && !this.ignoreReplicationFilter.test(put)) {
+            replicationLog.append(logKey.getTableName().getNameAsString(), -1, 
put);
+          }
+          if (del != null && !this.ignoreReplicationFilter.test(del)) {
+            replicationLog.append(logKey.getTableName().getNameAsString(), -1, 
del);
+          }
+          // reset
+          put = null;
+          del = null;
+        }
+        if (kv.getType() == Cell.Type.Put) {
+          if (put == null) {
+            put = new Put(currentKey.get(), currentKey.getOffset(), 
currentKey.getLength());
+          }
+          put.add(kv);
+        } else {
+          if (del == null) {
+            del = new Delete(currentKey.get(), currentKey.getOffset(), 
currentKey.getLength());
+          }
+          del.add(kv);
+        }
+        prevKey = currentKey;
+      }
+    }
+    // append the last one
+    if (put != null && !this.ignoreReplicationFilter.test(put)) {
+      replicationLog.append(logKey.getTableName().getNameAsString(), -1, put);
+    }
+    if (del != null && !this.ignoreReplicationFilter.test(del)) {
+      replicationLog.append(logKey.getTableName().getNameAsString(), -1, del);
+    }
+    replicationLog.sync();
+  }
+
   private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
     BatchMutateContext context) {
     for (int i = 0; i < miniBatchOp.size(); i++) {
@@ -655,6 +801,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           // upserts, where 0 represents the row is not updated
           Result result = Result.create(cells);
           miniBatchOp.setOperationStatus(i, new OperationStatus(SUCCESS, 
result));
+          // since this mutation is ignored by setting it's status to success 
in the coproc
+          // it shouldn't be synchronously replicated
+          if (this.shouldReplicate) {
+            m.setAttribute(IGNORE_REPLICATION_ATTRIB, 
IGNORE_REPLICATION_ATTRIB_VAL);
+          }
         }
       } else if (context.returnResult) {
         Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap = new 
HashMap<>();
@@ -1633,6 +1784,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         waitForPreviousConcurrentBatch(table, context);
       }
       preparePostIndexMutations(context, batchTimestamp, indexMetaData);
+      addGlobalIndexMutationsToWAL(miniBatchOp, context);
     }
     if (context.hasLocalIndex) {
       // Group all the updates for a single row into a single update to be 
processed (for local
@@ -1645,6 +1797,49 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
   }
 
+  /**
+   * We need to add the index mutations to the data table's WAL to handle 
cases where the RS crashes
+   * before the postBatchMutateIndispensably hook is called where the 
mutations are synchronously
+   * replicated. This is needed because during WAL restore we don't have the 
IndexMaintainer object
+   * to generate the corresponding index mutations.
+   */
+  private void 
addGlobalIndexMutationsToWAL(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+    BatchMutateContext context) {
+    if (!this.shouldReplicate) {
+      return;
+    }
+
+    WALEdit edit = miniBatchOp.getWalEdit(0);
+    if (edit == null) {
+      edit = new WALEdit();
+      miniBatchOp.setWalEdit(0, edit);
+    }
+
+    if (context.preIndexUpdates != null) {
+      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.preIndexUpdates
+        .entries()) {
+        if (this.ignoreReplicationFilter.test(entry.getValue())) {
+          continue;
+        }
+        // This creates cells of family type WALEdit.METAFAMILY which are not 
applied
+        // on restore
+        edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getKey().get(), 
entry.getValue()));
+      }
+    }
+
+    if (context.postIndexUpdates != null) {
+      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.postIndexUpdates
+        .entries()) {
+        if (this.ignoreReplicationFilter.test(entry.getValue())) {
+          continue;
+        }
+        // This creates cells of family type WALEdit.METAFAMILY which are not 
applied
+        // on restore
+        edit.add(IndexedKeyValue.newIndexedKeyValue(entry.getKey().get(), 
entry.getValue()));
+      }
+    }
+  }
+
   /**
    * In case of ON DUPLICATE KEY IGNORE, if the row already exists no 
mutations will be generated so
    * release the row lock.
@@ -1776,7 +1971,16 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
 
       if (success) { // The pre-index and data table updates are successful, 
and now, do post index
                      // updates
-        doPost(c, context);
+        CompletableFuture<Void> postIndexFuture =
+          CompletableFuture.runAsync(() -> doPost(c, context));
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        try {
+          replicateMutations(miniBatchOp, context);
+        } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          metricSource.updateReplicationSyncTime(dataTableName, duration);
+        }
+        FutureUtils.get(postIndexFuture);
       }
     } finally {
       removeBatchMutateContext(c);
@@ -1842,8 +2046,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
   }
 
-  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context)
-    throws IOException {
+  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
     long start = EnvironmentEdgeManager.currentTimeMillis();
 
     try {
@@ -2329,4 +2532,49 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   public static boolean isAtomicOperationComplete(OperationStatus status) {
     return status.getOperationStatusCode() == SUCCESS && status.getResult() != 
null;
   }
+
+  private void replicateMutations(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+    BatchMutateContext context) throws IOException {
+
+    if (!this.shouldReplicate) {
+      return;
+    }
+    if (ignoreSyncReplicationForTesting) {
+      return;
+    }
+    assert this.replicationLog != null;
+
+    for (Integer i = 0; i < miniBatchOp.size(); i++) {
+      Mutation m = miniBatchOp.getOperation(i);
+      if (this.ignoreReplicationFilter.test(m)) {
+        continue;
+      }
+      this.replicationLog.append(this.dataTableName, -1, m);
+      Mutation[] mutationsAddedByCP = 
miniBatchOp.getOperationsFromCoprocessors(i);
+      if (mutationsAddedByCP != null) {
+        for (Mutation addedMutation : mutationsAddedByCP) {
+          this.replicationLog.append(this.dataTableName, -1, addedMutation);
+        }
+      }
+    }
+    if (context.preIndexUpdates != null) {
+      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.preIndexUpdates
+        .entries()) {
+        if (this.ignoreReplicationFilter.test(entry.getValue())) {
+          continue;
+        }
+        this.replicationLog.append(entry.getKey().getTableName(), -1, 
entry.getValue());
+      }
+    }
+    if (context.postIndexUpdates != null) {
+      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.postIndexUpdates
+        .entries()) {
+        if (this.ignoreReplicationFilter.test(entry.getValue())) {
+          continue;
+        }
+        this.replicationLog.append(entry.getKey().getTableName(), -1, 
entry.getValue());
+      }
+    }
+    this.replicationLog.sync();
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
index 82434bc230..3696822171 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -51,6 +51,11 @@ public class IndexedKeyValue extends KeyValue {
   private int hashCode;
 
   public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m) {
+    Cell indexWALCell = adaptFirstCellFromMutation(m);
+    return new IndexedKeyValue(indexWALCell, new ImmutableBytesPtr(bs), m);
+  }
+
+  public static IndexedKeyValue newIndexedKeyValue(ImmutableBytesPtr bs, 
Mutation m) {
     Cell indexWALCell = adaptFirstCellFromMutation(m);
     return new IndexedKeyValue(indexWALCell, bs, m);
   }
@@ -81,9 +86,9 @@ public class IndexedKeyValue extends KeyValue {
   public IndexedKeyValue() {
   }
 
-  private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation) {
+  private IndexedKeyValue(Cell c, ImmutableBytesPtr bs, Mutation mutation) {
     super(c);
-    this.indexTableName = new ImmutableBytesPtr(bs);
+    this.indexTableName = bs;
     this.mutation = mutation;
     this.hashCode = calcHashCode(indexTableName, mutation);
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index b76b8fab34..5391a57cdd 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -454,6 +454,19 @@ public abstract class ReplicationLogGroupWriter {
     }
   }
 
+  /**
+   * Close the currentWriter. Needed by tests so that we can close the log 
file and then read it
+   */
+  protected void closeCurrentWriter() {
+    lock.lock();
+    try {
+      closeWriter(currentWriter);
+      currentWriter = null;
+    } finally {
+      lock.unlock();
+    }
+  }
+
   /**
    * Check if this ReplicationLogGroup is closed.
    * @return true if closed, false otherwise
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
index 239aa0c501..399fd897e9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
@@ -74,50 +74,62 @@ public class SystemCatalogWALEntryFilter implements 
WALEntryFilter, WALCellFilte
   }
 
   /**
-   * Does the cell key have leading tenant Id.
+   * Does the cell row key have leading tenant Id.
    * @param cell hbase cell
    * @return true if the cell has leading tenant Id in key
    */
   private boolean isTenantIdLeadingInKey(final Cell cell) {
     // rows in system.catalog or system child that aren't tenant-owned
     // will have a leading separator byte
-    return cell.getRowArray()[cell.getRowOffset()] != 
QueryConstants.SEPARATOR_BYTE;
+    return isTenantIdLeadingInKey(cell.getRowArray(), cell.getRowOffset());
   }
 
   /**
-   * is the cell for system child link a tenant owned. Besides the non empty 
tenant id,
-   * system.child_link table have tenant owned data for parent child links. In 
this case, the column
+   * Checks if the row key of SYSTEM.CATALOG or SYSTEM.CHILD_LINK row has 
leading tenant ID
+   * @return true if the row key has leading tenant ID
+   */
+  public static boolean isTenantIdLeadingInKey(byte[] rowKey, int rowOffset) {
+    return rowKey[rowOffset] != QueryConstants.SEPARATOR_BYTE;
+  }
+
+  /**
+   * Is the cell for system child link a tenant owned. This happens if the 
tenant id is leading in
+   * the row key or the cell has tenant owned data for parent child links.
+   * @param cell hbase cell
+   * @return true if the cell is tenant owned
+   */
+  private boolean isTenantRowCellSystemChildLink(final Cell cell) {
+    return isTenantIdLeadingInKey(cell) || isCellChildLinkToTenantView(cell);
+  }
+
+  /**
+   * SYSTEM.CHILD_LINK table have tenant owned data for parent child links. In 
this case, the column
    * qualifier is {@code PhoenixDatabaseMetaData#LINK_TYPE_BYTES} and value is
    * {@code PTable.LinkType.CHILD_TABLE}. For corresponding delete markers the 
KeyValue type
    * {@code KeyValue.Type} is {@code KeyValue.Type.DeleteFamily}
    * @param cell hbase cell
    * @return true if the cell is tenant owned
    */
-  private boolean isTenantRowCellSystemChildLink(final Cell cell) {
-    boolean isTenantRowCell = isTenantIdLeadingInKey(cell);
-
+  public static boolean isCellChildLinkToTenantView(final Cell cell) {
     ImmutableBytesWritable key =
       new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength());
     boolean isChildLinkToTenantView = false;
-    if (!isTenantRowCell) {
-      boolean isChildLink =
-        CellUtil.matchingQualifier(cell, 
PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
-      if (
-        (isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES))
-          || cell.getType() == Cell.Type.DeleteFamily
-      ) {
-        byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
-        SchemaUtil.getVarChars(key.get(), key.getOffset(), key.getLength(), 0, 
rowViewKeyMetadata);
-        /**
-         * if the child link is to a tenant-owned view, the COLUMN_NAME field 
will be the byte[] of
-         * the tenant otherwise, it will be an empty byte array (NOT 
QueryConstants.SEPARATOR_BYTE,
-         * but a byte[0]). This assumption is also true for child link's 
delete markers in
-         * SYSTEM.CHILD_LINK as it only contains link rows and does not deal 
with other type of rows
-         * like column rows that also has COLUMN_NAME populated with actual 
column name.
-         **/
-        isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length 
!= 0;
-      }
+    boolean isChildLink = CellUtil.matchingQualifier(cell, 
PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
+    if (
+      isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES)
+        || cell.getType() == Cell.Type.DeleteFamily
+    ) {
+      byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
+      SchemaUtil.getVarChars(key.get(), key.getOffset(), key.getLength(), 0, 
rowViewKeyMetadata);
+      /**
+       * if the child link is to a tenant-owned view, the COLUMN_NAME field 
will be the byte[] of
+       * the tenant otherwise, it will be an empty byte array (NOT 
QueryConstants.SEPARATOR_BYTE,
+       * but a byte[0]). This assumption is also true for child link's delete 
markers in
+       * SYSTEM.CHILD_LINK as it only contains link rows and does not deal 
with other type of rows
+       * like column rows that also has COLUMN_NAME populated with actual 
column name.
+       **/
+      isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length 
!= 0;
     }
-    return isTenantRowCell || isChildLinkToTenantView;
+    return isChildLinkToTenantView;
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
index d5b1bb43d8..18172721a0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -18,22 +18,28 @@
 package org.apache.phoenix.replication.tool;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFile.Record;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
 /**
  * Command-line tool for analyzing Phoenix Replication Log files. This tool 
can: - Read a single log
  * file or directory of log files - Print file headers, trailers, and block 
headers - Decode and
@@ -51,6 +57,18 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
   private boolean verbose = false;
   private boolean decode = false;
   private boolean check = false;
+  FileSystem fs;
+
+  private void init() throws IOException {
+    Configuration conf = getConf();
+    if (conf == null) {
+      conf = HBaseConfiguration.create();
+      setConf(conf);
+    }
+    if (fs == null) {
+      fs = FileSystem.get(getConf());
+    }
+  }
 
   @Override
   public int run(String[] args) throws Exception {
@@ -59,29 +77,10 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
       return 1;
     }
 
-    Configuration conf = getConf();
-    if (conf == null) {
-      conf = HBaseConfiguration.create();
-      setConf(conf);
-    }
-
     try {
-      FileSystem fs = FileSystem.get(conf);
+      init();
       Path path = new Path(args[args.length - 1]);
-
-      if (!fs.exists(path)) {
-        System.err.println("Path does not exist: " + path);
-        return 1;
-      }
-
-      List<Path> filesToAnalyze = new ArrayList<>();
-      if (fs.getFileStatus(path).isDirectory()) {
-        // Recursively find all .plog files
-        findLogFiles(fs, path, filesToAnalyze);
-      } else {
-        filesToAnalyze.add(path);
-      }
-
+      List<Path> filesToAnalyze = getFilesToAnalyze(path);
       if (filesToAnalyze.isEmpty()) {
         System.err.println("No log files found in: " + path);
         return 1;
@@ -89,7 +88,7 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
 
       // Analyze each file
       for (Path file : filesToAnalyze) {
-        analyzeFile(fs, file);
+        analyzeFile(file);
       }
 
       return 0;
@@ -99,19 +98,83 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
     }
   }
 
-  private void findLogFiles(FileSystem fs, Path dir, List<Path> files) throws 
IOException {
+  /**
+   * Returns all the mutations grouped by the table name under a source path
+   * @param source Path which can be a file or directory
+   * @return Mutations grouped by the table name
+   */
+  public Map<String, List<Mutation>> groupLogsByTable(String source) throws 
IOException {
+    Map<String, List<Mutation>> allFiles = Maps.newHashMap();
+    init();
+    Path path = new Path(source);
+    List<Path> filesToAnalyze = getFilesToAnalyze(path);
+    if (filesToAnalyze.isEmpty()) {
+      return allFiles;
+    }
+    // Analyze each file
+    for (Path file : filesToAnalyze) {
+      Map<String, List<Mutation>> perFile = groupLogsByTable(file);
+      for (Map.Entry<String, List<Mutation>> entry : perFile.entrySet()) {
+        List<Mutation> mutations = allFiles.get(entry.getKey());
+        if (mutations == null) {
+          allFiles.put(entry.getKey(), entry.getValue());
+        } else {
+          mutations.addAll(entry.getValue());
+        }
+      }
+    }
+    return allFiles;
+  }
+
+  private Map<String, List<Mutation>> groupLogsByTable(Path file) throws 
IOException {
+    Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap();
+    System.out.println("\nAnalyzing file: " + file);
+    LogFileReaderContext context = new 
LogFileReaderContext(getConf()).setFileSystem(fs)
+      .setFilePath(file).setSkipCorruptBlocks(check); // Skip corrupt blocks 
if checking
+    LogFileReader reader = new LogFileReader();
+    try {
+      reader.init(context);
+      // Process records
+      Record record;
+      while ((record = reader.next()) != null) {
+        String tableName = record.getHBaseTableName();
+        List<Mutation> mutations = mutationsByTable.getOrDefault(tableName, 
Lists.newArrayList());
+        mutations.add(record.getMutation());
+        mutationsByTable.put(tableName, mutations);
+      }
+    } finally {
+      reader.close();
+    }
+    return mutationsByTable;
+  }
+
+  private List<Path> getFilesToAnalyze(Path path) throws IOException {
+    if (!fs.exists(path)) {
+      throw new PathNotFoundException(path.toString());
+    }
+    List<Path> filesToAnalyze = Lists.newArrayList();
+    if (fs.getFileStatus(path).isDirectory()) {
+      // Recursively find all .plog files
+      findLogFiles(path, filesToAnalyze);
+    } else {
+      filesToAnalyze.add(path);
+    }
+    return filesToAnalyze;
+  }
+
+  private void findLogFiles(Path dir, List<Path> files) throws IOException {
     FileStatus[] statuses = fs.listStatus(dir);
     for (FileStatus status : statuses) {
       Path path = status.getPath();
       if (status.isDirectory()) {
-        findLogFiles(fs, path, files);
+        findLogFiles(path, files);
       } else if (path.getName().endsWith(".plog")) {
         files.add(path);
       }
     }
   }
 
-  private void analyzeFile(FileSystem fs, Path file) throws IOException {
+  private void analyzeFile(Path file) throws IOException {
     System.out.println("\nAnalyzing file: " + file);
 
     LogFileReaderContext context = new 
LogFileReaderContext(getConf()).setFileSystem(fs)
@@ -143,11 +206,17 @@ public class LogFileAnalyzer extends Configured 
implements Tool {
       }
 
       // Print trailer information
-      System.out.println("\nTrailer:");
-      System.out.println("  Record Count: " + 
reader.getTrailer().getRecordCount());
-      System.out.println("  Block Count: " + 
reader.getTrailer().getBlockCount());
-      System.out.println("  Blocks Start Offset: " + 
reader.getTrailer().getBlocksStartOffset());
-      System.out.println("  Trailer Start Offset: " + 
reader.getTrailer().getTrailerStartOffset());
+      LogFile.Trailer trailer = reader.getTrailer();
+      if (trailer != null) {
+        System.out.println("\nTrailer:");
+        System.out.println("  Record Count: " + 
reader.getTrailer().getRecordCount());
+        System.out.println("  Block Count: " + 
reader.getTrailer().getBlockCount());
+        System.out.println("  Blocks Start Offset: " + 
reader.getTrailer().getBlocksStartOffset());
+        System.out
+          .println("  Trailer Start Offset: " + 
reader.getTrailer().getTrailerStartOffset());
+      } else {
+        System.out.println("\nTrailer is null");
+      }
 
       // Print verification results if checking
       if (check) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
new file mode 100644
index 0000000000..21644d86fa
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.DEFAULT_HA_GROUP;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.tool.LogFileAnalyzer;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogGroupIT extends ParallelStatsDisabledIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static synchronized void doSetup() throws Exception {
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+    props.put(QueryServices.SYNCHRONOUS_REPLICATION_ENABLED, 
Boolean.TRUE.toString());
+    setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+  }
+
+  @Before
+  public void beforeTest() throws Exception {
+    LOG.info("Starting test {}", name.getMethodName());
+  }
+
+  @After
+  public void afterTest() throws Exception {
+    LOG.info("Starting cleanup for test {}", name.getMethodName());
+    cleanupLogsFolder(standbyUri);
+    LOG.info("Ending cleanup for test {}", name.getMethodName());
+  }
+
+  /**
+   * Delete all the shards under the top level replication log directory
+   */
+  private void cleanupLogsFolder(URI source) throws IOException {
+    FileSystem fs = FileSystem.get(config);
+    Path dir = new Path(source.getPath());
+    FileStatus[] statuses = fs.listStatus(dir);
+    for (FileStatus status : statuses) {
+      Path shard = status.getPath();
+      if (status.isDirectory()) {
+        fs.delete(shard, true);
+      }
+    }
+  }
+
+  private ReplicationLogGroup getReplicationLogGroup() throws IOException {
+    HRegionServer rs = getUtility().getHBaseCluster().getRegionServer(0);
+    return ReplicationLogGroup.get(config, rs.getServerName(), 
DEFAULT_HA_GROUP);
+  }
+
+  private Map<String, List<Mutation>> groupLogsByTable() throws Exception {
+    ReplicationLogGroup log = getReplicationLogGroup();
+    log.getActiveWriter().closeCurrentWriter();
+    LogFileAnalyzer analyzer = new LogFileAnalyzer();
+    analyzer.setConf(config);
+    String[] args = { "--check", standbyUri.getPath() };
+    assertEquals(0, analyzer.run(args));
+    return analyzer.groupLogsByTable(standbyUri.getPath());
+  }
+
+  private int getCountForTable(Map<String, List<Mutation>> logsByTable, String 
tableName)
+    throws Exception {
+    List<Mutation> mutations = logsByTable.get(tableName);
+    return mutations != null ? mutations.size() : 0;
+  }
+
+  private void verifyReplication(Connection conn, Map<String, Integer> 
expected) throws Exception {
+    Map<String, List<Mutation>> mutationsByTable = groupLogsByTable();
+    dumpTableLogCount(mutationsByTable);
+    for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+      String tableName = entry.getKey();
+      int expectedMutationCount = entry.getValue();
+      List<Mutation> mutations = mutationsByTable.get(tableName);
+      int actualMutationCount = mutations != null ? mutations.size() : 0;
+      try {
+        if (!tableName.equals(SYSTEM_CATALOG_NAME)) {
+          assertEquals(String.format("For table %s", tableName), 
expectedMutationCount,
+            actualMutationCount);
+        } else {
+          // special handling for syscat
+          assertTrue("For SYSCAT", actualMutationCount >= 
expectedMutationCount);
+        }
+      } catch (AssertionError e) {
+        TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+        throw e;
+      }
+    }
+  }
+
+  private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) 
{
+    LOG.info("Dump table log count for test {}", name.getMethodName());
+    for (Map.Entry<String, List<Mutation>> table : 
mutationsByTable.entrySet()) {
+      LOG.info("#Log entries for {} = {}", table.getKey(), 
table.getValue().size());
+    }
+  }
+
+  private void moveRegionToServer(TableName tableName, ServerName sn) throws 
Exception {
+    HBaseTestingUtility util = getUtility();
+    try (RegionLocator locator = 
util.getConnection().getRegionLocator(tableName)) {
+      String regEN = 
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+      while 
(!sn.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+        LOG.info("Moving region {} of table {} to server {}", regEN, 
tableName, sn);
+        util.getAdmin().move(Bytes.toBytes(regEN), sn);
+        Thread.sleep(100);
+      }
+      LOG.info("Moved region {} of table {} to server {}", regEN, tableName, 
sn);
+    }
+  }
+
+  private PhoenixTestBuilder.SchemaBuilder createViewHierarchy() throws 
Exception {
+    // Define the test schema.
+    // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => (ORG_ID, 
KP)
+    // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID)
+    // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+    final PhoenixTestBuilder.SchemaBuilder schemaBuilder =
+      new PhoenixTestBuilder.SchemaBuilder(getUrl());
+    PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions =
+      PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+    PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions globalViewOptions =
+      PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+    PhoenixTestBuilder.SchemaBuilder.TenantViewOptions 
tenantViewWithOverrideOptions =
+      PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+    PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions 
tenantViewIndexOverrideOptions =
+      PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      
schemaBuilder.withTableOptions(tableOptions).withGlobalViewOptions(globalViewOptions)
+        .withTenantViewOptions(tenantViewWithOverrideOptions)
+        
.withTenantViewIndexOptions(tenantViewIndexOverrideOptions).buildWithNewTenant();
+    }
+    return schemaBuilder;
+  }
+
+  @Test
+  public void testAppendAndSync() throws Exception {
+    final String tableName = "T_" + generateUniqueName();
+    final String indexName1 = "I_" + generateUniqueName();
+    final String indexName2 = "I_" + generateUniqueName();
+    final String indexName3 = "L_" + generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String ddl = String.format("create table %s (id1 integer not null, "
+        + "id2 integer not null, val1 varchar, val2 varchar "
+        + "constraint pk primary key (id1, id2))", tableName);
+      conn.createStatement().execute(ddl);
+      ddl = String.format("create index %s on %s (val1) include (val2)", 
indexName1, tableName);
+      conn.createStatement().execute(ddl);
+      ddl = String.format("create index %s on %s (val2) include (val1)", 
indexName2, tableName);
+      conn.createStatement().execute(ddl);
+      ddl = String.format("create local index %s on %s (id2,val1) include 
(val2)", indexName3,
+        tableName);
+      conn.createStatement().execute(ddl);
+      conn.commit();
+      PreparedStatement stmt =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
+      // upsert 50 rows
+      int rowCount = 50;
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          stmt.setInt(1, i);
+          stmt.setInt(2, j);
+          stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+          stmt.setString(4, null);
+          stmt.executeUpdate();
+        }
+        conn.commit();
+      }
+      // do some atomic upserts which will be ignored and therefore not 
replicated
+      stmt = conn.prepareStatement(
+        "upsert into " + tableName + " VALUES(?, ?, ?) " + "ON DUPLICATE KEY 
IGNORE");
+      conn.setAutoCommit(true);
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 2; ++j) {
+          stmt.setInt(1, i);
+          stmt.setInt(2, j);
+          stmt.setString(3, null);
+          assertEquals(0, stmt.executeUpdate());
+        }
+      }
+      // verify the correctness of the index
+      IndexToolIT.verifyIndexTable(tableName, indexName1, conn);
+      // verify replication
+      Map<String, Integer> expected = Maps.newHashMap();
+      // mutation count will be equal to row count since the atomic upsert 
mutations will be
+      // ignored and therefore not replicated
+      expected.put(tableName, rowCount * 3); // Put + Delete + local index 
update
+      // for index1 unverified + verified + delete (Delete column)
+      expected.put(indexName1, rowCount * 3);
+      // for index2 unverified + verified since the null column is part of row 
key
+      expected.put(indexName2, rowCount * 2);
+      // we didn't create any tenant views so no change in the syscat entries
+      expected.put(SYSTEM_CATALOG_NAME, 0);
+      expected.put(SYSTEM_CHILD_LINK_NAME, 0);
+      verifyReplication(conn, expected);
+    }
+  }
+
+  /**
+   * This test simulates RS crashes in the middle of write transactions after 
the edits have been
+   * written to the WAL but before they have been replicated to the standby 
cluster. Those edits
+   * will be replicated when the WAL is replayed.
+   */
+  @Test
+  public void testWALRestore() throws Exception {
+    HBaseTestingUtility util = getUtility();
+    MiniHBaseCluster cluster = util.getHBaseCluster();
+    final String tableName = "T_" + generateUniqueName();
+    final String indexName = "I_" + generateUniqueName();
+    TableName table = TableName.valueOf(tableName);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String ddl = String.format("create table %s (id1 integer not null, "
+        + "id2 integer not null, val1 varchar, val2 varchar "
+        + "constraint pk primary key (id1, id2))", tableName);
+      conn.createStatement().execute(ddl);
+      ddl = String.format("create index %s on %s (val1) include (val2)", 
indexName, tableName);
+      conn.createStatement().execute(ddl);
+      conn.commit();
+    }
+    // Mini cluster by default comes with only 1 RS. Starting a second RS so 
that
+    // we can kill the RS
+    JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer();
+    ServerName sn2 = rs2.getRegionServer().getServerName();
+    // Assign some table regions to the new RS we started above
+    moveRegionToServer(table, sn2);
+    moveRegionToServer(TableName.valueOf(SYSTEM_CATALOG_NAME), sn2);
+    moveRegionToServer(TableName.valueOf(SYSTEM_CHILD_LINK_NAME), sn2);
+    int rowCount = 50;
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      PreparedStatement stmt =
+        conn.prepareStatement("upsert into " + tableName + " VALUES(?, ?, ?, 
?)");
+      // upsert 50 rows
+      for (int i = 0; i < 5; ++i) {
+        for (int j = 0; j < 10; ++j) {
+          stmt.setInt(1, i);
+          stmt.setInt(2, j);
+          stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+          stmt.setString(4, null); // Generate a DeleteColumn cell
+          stmt.executeUpdate();
+        }
+        // we want to simulate RS crash after updating memstore and WAL
+        IndexRegionObserver.setIgnoreSyncReplicationForTesting(true);
+        conn.commit();
+      }
+      // Create tenant views for syscat and child link replication
+      createViewHierarchy();
+    } finally {
+      IndexRegionObserver.setIgnoreSyncReplicationForTesting(false);
+    }
+    // Kill the RS
+    cluster.killRegionServer(rs2.getRegionServer().getServerName());
+    Threads.sleep(20000); // just to be sure that the kill has fully started.
+    // Regions will be re-opened and the WAL will be replayed
+    util.waitUntilAllRegionsAssigned(table);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      Map<String, Integer> expected = Maps.newHashMap();
+      // For each row 1 Put + 1 Delete (DeleteColumn)
+      expected.put(tableName, rowCount * 2);
+      // unverified + verified + delete (Delete column)
+      expected.put(indexName, rowCount * 3);
+      // 1 tenant view was created
+      expected.put(SYSTEM_CHILD_LINK_NAME, 1);
+      // atleast 1 log entry for syscat
+      expected.put(SYSTEM_CATALOG_NAME, 1);
+      verifyReplication(conn, expected);
+    }
+  }
+
+  @Test
+  public void testSystemTables() throws Exception {
+    createViewHierarchy();
+    Map<String, List<Mutation>> logsByTable = groupLogsByTable();
+    dumpTableLogCount(logsByTable);
+    // find all the log entries for system tables
+    Map<String,
+      List<Mutation>> systemTables = logsByTable.entrySet().stream()
+        .filter(entry -> 
entry.getKey().startsWith(QueryConstants.SYSTEM_SCHEMA_NAME))
+        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    // there should be only 2 entries CATALOG, CHILD_LINK
+    assertEquals(2, systemTables.size());
+    assertEquals(1, getCountForTable(systemTables, SYSTEM_CHILD_LINK_NAME));
+    assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);
+  }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 7b14996fba..53d2100cfd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -85,6 +85,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.net.URI;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.Date;
@@ -117,6 +118,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -152,6 +154,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.replication.ReplicationLogGroup;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
@@ -343,6 +346,8 @@ public abstract class BaseTest {
   protected static boolean clusterInitialized = false;
   protected static HBaseTestingUtility utility;
   protected static final Configuration config = HBaseConfiguration.create();
+  protected static final String logDir = "/PHOENIX_REPLICATION_IN";
+  protected static URI standbyUri = new Path(logDir).toUri();
 
   protected static String getUrl() {
     if (!clusterInitialized) {
@@ -586,6 +591,8 @@ public abstract class BaseTest {
       conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
     }
     setPhoenixRegionServerEndpoint(conf);
+    // setup up synchronous replication
+    conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
     return conf;
   }
 
@@ -1900,7 +1907,7 @@ public abstract class BaseTest {
   }
 
   /**
-   * Returns true if the region contains atleast one of the metadata rows we 
are interested in
+   * Returns true if the region contains at least one of the metadata rows we 
are interested in
    */
   protected static boolean regionContainsMetadataRows(RegionInfo regionInfo,
     List<byte[]> metadataRowKeys) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index e6e5818430..a317d17a96 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -92,7 +92,7 @@ public class ReplicationLogGroupTest {
   public void setUp() throws IOException {
     conf = HBaseConfiguration.create();
     localFs = FileSystem.getLocal(conf);
-    standbyUri = new Path(testFolder.toString()).toUri();
+    standbyUri = new Path(testFolder.getRoot().toString()).toUri();
     serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
     conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
     // Small ring buffer size for testing

Reply via email to