This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 337ce8cf27 PHOENIX-7601 Use synchronous replication in Phoenix coprocs 
on the write path (#2192)
337ce8cf27 is described below

commit 337ce8cf27bb9e2c9e8f3a5c2d192c6475e391e4
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Mon Jul 7 15:54:35 2025 -0700

    PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write 
path (#2192)
    
    PHOENIX-7601 Use synchronous replication in Phoenix coprocs on the write 
path
    
    
    ---------
    
    Co-authored-by: Tanuj Khurana <tkhur...@apache.org>
---
 .../hbase/index/metrics/MetricsIndexerSource.java  |  10 +
 .../index/metrics/MetricsIndexerSourceImpl.java    |  10 +-
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   1 +
 .../java/org/apache/phoenix/util/SchemaUtil.java   |  29 ++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 385 +++++++++++++++++----
 .../phoenix/hbase/index/wal/IndexedKeyValue.java   |   9 +-
 .../replication/ReplicationLogGroupWriter.java     |  13 +
 .../replication/SystemCatalogWALEntryFilter.java   |  69 ++--
 .../phoenix/replication/tool/LogFileAnalyzer.java  | 140 ++++++--
 .../phoenix/replication/ReplicationLogGroupIT.java | 351 +++++++++++++++++++
 .../java/org/apache/phoenix/query/BaseTest.java    |   9 +-
 .../replication/ReplicationLogGroupTest.java       |   2 +-
 13 files changed, 906 insertions(+), 125 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
index 9707244148..df938c1a8b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSource.java
@@ -78,6 +78,9 @@ public interface MetricsIndexerSource extends BaseSource {
   String POST_INDEX_UPDATE_FAILURE = "postIndexUpdateFailure";
   String POST_INDEX_UPDATE_FAILURE_DESC = "The number of failures of index 
updates post data updates";
 
+  String REPLICATION_SYNC_TIME = "replicationSyncTime";
+  String REPLICATION_SYNC_TIME_DESC = "Histogram for the time in milliseconds 
to synchronously replicate a batch of mutations";
+
   /**
    * Updates the index preparation time histogram (preBatchMutate).
    * @param dataTableName  Physical data table name
@@ -209,4 +212,11 @@ public interface MetricsIndexerSource extends BaseSource {
    * @param dataTableName  Physical data table name
    */
   void incrementPostIndexUpdateFailures(String dataTableName);
+
+  /**
+   * Updates the replication sync time histogram.
+   * @param dataTableName Physical data table name
+   * @param t time taken in milliseconds
+   */
+  void updateReplicationSyncTime(String dataTableName, long t);
 }
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
index 79060fa986..cf258aa30e 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/metrics/MetricsIndexerSourceImpl.java
@@ -39,6 +39,7 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl 
implements MetricsI
     private final MutableFastCounter slowPostOpenCalls;
     private final MetricHistogram duplicateKeyTimeHisto;
     private final MutableFastCounter slowDuplicateKeyCalls;
+    private final MetricHistogram replicationSyncTimeHisto;
 
     private final MetricHistogram preIndexUpdateTimeHisto;
     private final MetricHistogram postIndexUpdateTimeHisto;
@@ -69,7 +70,8 @@ public class MetricsIndexerSourceImpl extends BaseSourceImpl 
implements MetricsI
         slowPostOpenCalls = getMetricsRegistry().newCounter(SLOW_POST_OPEN, 
SLOW_POST_OPEN_DESC, 0L);
         duplicateKeyTimeHisto = 
getMetricsRegistry().newHistogram(DUPLICATE_KEY_TIME, DUPLICATE_KEY_TIME_DESC);
         slowDuplicateKeyCalls = 
getMetricsRegistry().newCounter(SLOW_DUPLICATE_KEY, SLOW_DUPLICATE_KEY_DESC, 
0L);
-
+        replicationSyncTimeHisto = getMetricsRegistry().newHistogram(
+                REPLICATION_SYNC_TIME, REPLICATION_SYNC_TIME_DESC);
         postIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
                 POST_INDEX_UPDATE_TIME, POST_INDEX_UPDATE_TIME_DESC);
         preIndexUpdateTimeHisto = getMetricsRegistry().newHistogram(
@@ -219,4 +221,10 @@ public class MetricsIndexerSourceImpl extends 
BaseSourceImpl implements MetricsI
     private String getCounterName(String baseCounterName, String tableName) {
         return baseCounterName + "." + tableName;
     }
+
+    @Override
+    public void updateReplicationSyncTime(String dataTableName, long t) {
+        incrementTableSpecificHistogram(REPLICATION_SYNC_TIME, dataTableName, 
t);
+        replicationSyncTimeHisto.add(t);
+    }
 }
\ No newline at end of file
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
index e16e716958..4f6926144a 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -557,6 +557,9 @@ public interface QueryServices extends SQLCloseable {
 
     String CQSI_THREAD_POOL_METRICS_ENABLED = 
"phoenix.cqsi.thread.pool.metrics.enabled";
 
+    public static final String SYNCHRONOUS_REPLICATION_ENABLED = 
"phoenix.synchronous.replication.enabled";
+
+
     /**
      * Get executor service used for parallel scans
      */
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index bb5ef67e49..5a88e45270 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -470,6 +470,7 @@ public class QueryServicesOptions {
     public static final Boolean 
DEFAULT_CQSI_THREAD_POOL_ALLOW_CORE_THREAD_TIMEOUT = true;
     public static final Boolean DEFAULT_CQSI_THREAD_POOL_METRICS_ENABLED = 
false;
 
+    public static final Boolean DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED = 
false;
 
     private final Configuration config;
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index fb0bdc0c82..eb0e2a4ad4 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -1445,4 +1445,33 @@ public class SchemaUtil {
         }
         return false;
     }
+
+    /**
+     * Data of non system tables is always replicated.
+     * Data of the system tables in the list below is not replicated because 
this data is very
+     * specific to the cluster.
+     * SYSTEM.SEQUENCE
+     * SYSTEM.STATS
+     * SYSTEM.LOG
+     * SYSTEM.TASK
+     * SYSTEM.FUNCTION
+     * SYSTEM.MUTEX
+     * SYSTEM.TRANSFORM
+     * SYSTEM.CDC_STREAM_STATUS
+     * SYSTEM.CDC_STREAM
+     * For SYSTEM.CATALOG and SYSTEM.CHILD_LINK we only replicate rows with 
tenant information.
+     * Non tenant (Global) rows are assumed to be executed by an admin or an 
admin process in each
+     * cluster separately and thus not replicated.
+     * @param tableName full name of the table
+     * @return true if the table data should be replicated, else false
+     */
+    public static boolean shouldReplicateTable(byte[] tableName) {
+        if (!isSystemTable(tableName)) {
+            return true;
+        }
+        if (isMetaTable(tableName) || isChildLinkTable(tableName)) {
+            return true;
+        }
+        return false;
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index ab1c7ad95e..7ade7ca48e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -77,6 +78,7 @@ import 
org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -86,6 +88,8 @@ import org.apache.phoenix.jdbc.HAGroupStoreManager;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.replication.ReplicationLogGroup;
+import org.apache.phoenix.replication.SystemCatalogWALEntryFilter;
 import org.apache.phoenix.schema.CompiledConditionalTTLExpression;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PRow;
@@ -134,9 +138,11 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
 import static 
org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
@@ -146,6 +152,8 @@ import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverCons
 import static 
org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
 import static 
org.apache.phoenix.index.PhoenixIndexBuilderHelper.ATOMIC_OP_ATTRIB;
 import static org.apache.phoenix.index.PhoenixIndexBuilderHelper.RETURN_RESULT;
+import static 
org.apache.phoenix.query.QueryServices.SYNCHRONOUS_REPLICATION_ENABLED;
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
 
 /**
@@ -163,10 +171,15 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private static final OperationStatus NOWRITE = new 
OperationStatus(SUCCESS);
     public static final String PHOENIX_APPEND_METADATA_TO_WAL = 
"phoenix.append.metadata.to.wal";
     public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+    // Mutation attribute to ignore the mutation for replication
+    public static final String IGNORE_REPLICATION_ATTRIB = 
"_IGNORE_REPLICATION";
+    private static final byte[] IGNORE_REPLICATION_ATTRIB_VAL = new byte[]{0};
+    // TODO hardcoded for now, will fix later
+    public static final String DEFAULT_HA_GROUP = "DEFAULT_HA_GROUP";
 
     /**
      * Class to represent pending data table rows
-     * */
+     */
     private class PendingRow {
         private int count;
         private boolean usable;
@@ -202,33 +215,45 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
 
         public int getCount() {
-          return count;
-      }
+            return count;
+        }
 
         public BatchMutateContext getLastContext() {
-          return lastContext;
-      }
+            return lastContext;
+        }
     }
-    private static boolean ignoreIndexRebuildForTesting  = false;
+
+    private static boolean ignoreIndexRebuildForTesting = false;
     private static boolean failPreIndexUpdatesForTesting = false;
     private static boolean failPostIndexUpdatesForTesting = false;
     private static boolean failDataTableUpdatesForTesting = false;
     private static boolean ignoreWritingDeleteColumnsToIndex = false;
+    private static boolean ignoreSyncReplicationForTesting = false;
+
     public static void setIgnoreIndexRebuildForTesting(boolean ignore) {
         ignoreIndexRebuildForTesting = ignore;
     }
+
     public static void setFailPreIndexUpdatesForTesting(boolean fail) {
         failPreIndexUpdatesForTesting = fail;
     }
+
     public static void setFailPostIndexUpdatesForTesting(boolean fail) {
         failPostIndexUpdatesForTesting = fail;
     }
+
     public static void setFailDataTableUpdatesForTesting(boolean fail) {
         failDataTableUpdatesForTesting = fail;
     }
+
     public static void setIgnoreWritingDeleteColumnsToIndex(boolean ignore) {
         ignoreWritingDeleteColumnsToIndex = ignore;
     }
+
+    public static void setIgnoreSyncReplicationForTesting(boolean ignore) {
+        ignoreSyncReplicationForTesting = ignore;
+    }
+
     public enum BatchMutatePhase {
         INIT, PRE, POST, FAILED
     }
@@ -237,13 +262,13 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     // coprocessor calls. TODO: remove after HBASE-18127 when available
 
     /*
-    * The concurrent batch of mutations is a set such that every pair of 
batches in this set has at
-    * least one common row. Since a BatchMutateContext object of a batch is 
modified only after the
-    * row locks for all the rows that are mutated by this batch are acquired, 
there can be only one
-    * thread can acquire the locks for its batch and safely access all the 
batch contexts in the
-    * set of concurrent batches. Because of this, we do not read atomic 
variables or additional
-    * locks to serialize the access to the BatchMutateContext objects.
-    */
+     * The concurrent batch of mutations is a set such that every pair of 
batches in this set has at
+     * least one common row. Since a BatchMutateContext object of a batch is 
modified only after the
+     * row locks for all the rows that are mutated by this batch are acquired, 
there can be only one
+     * thread can acquire the locks for its batch and safely access all the 
batch contexts in the
+     * set of concurrent batches. Because of this, we do not read atomic 
variables or additional
+     * locks to serialize the access to the BatchMutateContext objects.
+     */
     public static class BatchMutateContext {
         private volatile BatchMutatePhase currentPhase = BatchMutatePhase.INIT;
         // The max of reference counts on the pending rows of this batch at 
the time this
@@ -301,7 +326,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
 
         public BatchMutateContext(int clientVersion) {
-          this.clientVersion = clientVersion;
+            this.clientVersion = clientVersion;
         }
 
         public void 
populateOriginalMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
@@ -355,50 +380,92 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             return maxPendingRowCount;
         }
     }
+
     private ThreadLocal<BatchMutateContext> batchMutateContext =
             new ThreadLocal<BatchMutateContext>();
 
-  /**
-   * Configuration key for if the indexer should check the version of HBase is 
running. Generally,
-   * you only want to ignore this for testing or for custom versions of HBase.
-   */
-  public static final String CHECK_VERSION_CONF_KEY = 
"com.saleforce.hbase.index.checkversion";
-
-  public static final String INDEX_LAZY_POST_BATCH_WRITE = 
"org.apache.hadoop.hbase.index.lazy.post_batch.write";
-  private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
-
-  private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.post.batch.mutate.threshold";
-  private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
-  private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = 
"phoenix.indexer.slow.pre.increment";
-  private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 
3_000;
-
-  // Index writers get invoked before and after data table updates
-  protected IndexWriter preWriter;
-  protected IndexWriter postWriter;
-
-  protected IndexBuildManager builder;
-  private LockManager lockManager;
-
-  // The collection of pending data table rows
-  private Map<ImmutableBytesPtr, PendingRow> pendingRows = new 
ConcurrentHashMap<>();
-
-  private MetricsIndexerSource metricSource;
-
-  private boolean stopped;
-  private boolean disabled;
-  private long slowIndexPrepareThreshold;
-  private long slowPreIncrementThreshold;
-  private int rowLockWaitDuration;
-  private int concurrentMutationWaitDuration;
-  private String dataTableName;
-  private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
-  private boolean isNamespaceEnabled = false;
-  private boolean useBloomFilter = false;
-  private long lastTimestamp = 0;
-  private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new 
ArrayList<>();
-  private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
-  private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
-  private byte[] encodedRegionName;
+    /**
+     * Configuration key for if the indexer should check the version of HBase 
is running. Generally,
+     * you only want to ignore this for testing or for custom versions of 
HBase.
+     */
+    public static final String CHECK_VERSION_CONF_KEY = 
"com.saleforce.hbase.index.checkversion";
+    public static final String INDEX_LAZY_POST_BATCH_WRITE =
+            "org.apache.hadoop.hbase.index.lazy.post_batch.write";
+    private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
+    private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY =
+            "phoenix.indexer.slow.post.batch.mutate.threshold";
+    private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 
3_000;
+    private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY =
+            "phoenix.indexer.slow.pre.increment";
+    private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 
3_000;
+
+    // Index writers get invoked before and after data table updates
+    protected IndexWriter preWriter;
+    protected IndexWriter postWriter;
+
+    protected IndexBuildManager builder;
+    private LockManager lockManager;
+
+    // The collection of pending data table rows
+    private Map<ImmutableBytesPtr, PendingRow> pendingRows = new 
ConcurrentHashMap<>();
+
+    private MetricsIndexerSource metricSource;
+
+    private boolean stopped;
+    private boolean disabled;
+    private long slowIndexPrepareThreshold;
+    private long slowPreIncrementThreshold;
+    private int rowLockWaitDuration;
+    private int concurrentMutationWaitDuration;
+    private String dataTableName;
+    private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
+    private boolean isNamespaceEnabled = false;
+    private boolean useBloomFilter = false;
+    private long lastTimestamp = 0;
+    private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new 
ArrayList<>();
+    private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+    private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
+    private byte[] encodedRegionName;
+    private boolean shouldReplicate;
+    private ReplicationLogGroup replicationLog;
+
+    // Don't replicate the mutation if this attribute is set
+    private static final Predicate<Mutation> IGNORE_REPLICATION = mutation ->
+        mutation.getAttribute(IGNORE_REPLICATION_ATTRIB) != null;
+
+    // Don't replicate the mutation for syscat/child link if the tenantid is 
not
+    // leading in the row key
+    private static final Predicate<Mutation> NOT_TENANT_ID_ROW_KEY_PREFIX = 
mutation ->
+        !SystemCatalogWALEntryFilter.isTenantIdLeadingInKey(mutation.getRow(), 
0);
+
+    // Don't replicate the mutation for child link if child is not a tenant 
view
+    private static final Predicate<Mutation> NOT_CHILD_LINK_TENANT_VIEW = 
mutation -> {
+        boolean isChildLinkToTenantView = false;
+        for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                if 
(SystemCatalogWALEntryFilter.isCellChildLinkToTenantView(cell)) {
+                    isChildLinkToTenantView = true;
+                    break;
+                }
+            }
+        }
+        return !isChildLinkToTenantView;
+    };
+
+    /**
+     * If the replication filter evaluates to true, the mutation is ignored 
from replication
+     */
+    private static Predicate<Mutation> getSynchronousReplicationFilter(byte[] 
tableName) {
+        Predicate<Mutation> filter = IGNORE_REPLICATION;
+        if (SchemaUtil.isMetaTable(tableName)) {
+            filter = IGNORE_REPLICATION.or(NOT_TENANT_ID_ROW_KEY_PREFIX);
+        } else if (SchemaUtil.isChildLinkTable(tableName)) {
+            filter = IGNORE_REPLICATION.or
+                    
(NOT_TENANT_ID_ROW_KEY_PREFIX.and(NOT_CHILD_LINK_TENANT_VIEW));
+        }
+        return filter;
+    }
+    private Predicate<Mutation> ignoreReplicationFilter;
 
   @Override
   public Optional<RegionObserver> getRegionObserver() {
@@ -448,6 +515,18 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           BloomType bloomFilterType = 
tableDescriptor.getColumnFamilies()[0].getBloomFilterType();
           // when the table descriptor changes, the coproc is reloaded
           this.useBloomFilter = bloomFilterType == BloomType.ROW;
+          byte[] tableName = env.getRegionInfo().getTable().getName();
+          this.shouldReplicate = env.getConfiguration().getBoolean(
+                  SYNCHRONOUS_REPLICATION_ENABLED, 
DEFAULT_SYNCHRONOUS_REPLICATION_ENABLED);
+          if (this.shouldReplicate) {
+              // replication feature is enabled, check if it is enabled for 
the table
+              this.shouldReplicate = 
SchemaUtil.shouldReplicateTable(tableName);
+          }
+          if (this.shouldReplicate) {
+              this.replicationLog = 
ReplicationLogGroup.get(env.getConfiguration(),
+                      env.getServerName(), DEFAULT_HA_GROUP);
+              this.ignoreReplicationFilter = 
getSynchronousReplicationFilter(tableName);
+          }
       } catch (NoSuchMethodError ex) {
           disabled = true;
           LOG.error("Must be too early a version of HBase. Disabled 
coprocessor ", ex);
@@ -480,7 +559,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         return;
       }
     this.stopped = true;
-    String msg = "Indexer is being stopped";
+    String msg = "IndexRegionObserver is being stopped";
     this.builder.stop(msg);
     this.preWriter.stop(msg);
     this.postWriter.stop(msg);
@@ -556,6 +635,89 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         "Somehow didn't return an index update but also didn't propagate the 
failure to the client!");
   }
 
+    @Override
+    public void preWALRestore(
+            org.apache.hadoop.hbase.coprocessor.ObserverContext<? extends 
RegionCoprocessorEnvironment> ctx,
+            org.apache.hadoop.hbase.client.RegionInfo info,
+            org.apache.hadoop.hbase.wal.WALKey logKey,
+            WALEdit logEdit) throws IOException {
+        if (this.disabled) {
+            return;
+        }
+        if (!shouldReplicate) {
+            return;
+        }
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        try {
+            replicateEditOnWALRestore(logKey, logEdit);
+        } finally {
+            long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+            metricSource.updatePreWALRestoreTime(dataTableName, duration);
+        }
+    }
+
+    /**
+     * A batch of mutations is recorded in a single WAL edit so a WAL edit can 
have cells
+     * belonging to multiple rows. Further, for one mutation the WAL edit 
contains the individual
+     * cells that are part of the mutation.
+     * @param logKey
+     * @param logEdit
+     * @throws IOException
+     */
+    private void replicateEditOnWALRestore(org.apache.hadoop.hbase.wal.WALKey 
logKey,
+                                           WALEdit logEdit) throws IOException 
{
+        ImmutableBytesPtr prevKey = null, currentKey = null;
+        Put put = null;
+        Delete del = null;
+        for (Cell kv : logEdit.getCells()) {
+            if (kv instanceof IndexedKeyValue) {
+                IndexedKeyValue ikv = (IndexedKeyValue) kv;
+                replicationLog.append(Bytes.toString(ikv.getIndexTable()), -1, 
ikv.getMutation());
+            } else {
+                // While we can generate a separate mutation for every cell 
that is part of the
+                // WAL edit and replicate each such mutation. Doing that will 
not be very efficient
+                // since a mutation can have large number of cells. Instead, 
we first group the
+                // cells belonging to the same row into a mutation and then 
replicate that
+                // mutation.
+                currentKey = new ImmutableBytesPtr(kv.getRowArray(),
+                        kv.getRowOffset(), kv.getRowLength());
+                if (!currentKey.equals(prevKey)) {
+                    if (put != null && 
!this.ignoreReplicationFilter.test(put)) {
+                        
replicationLog.append(logKey.getTableName().getNameAsString(), -1, put);
+                    }
+                    if (del != null && 
!this.ignoreReplicationFilter.test(del)) {
+                        
replicationLog.append(logKey.getTableName().getNameAsString(), -1, del);
+                    }
+                    // reset
+                    put = null;
+                    del = null;
+                }
+                if (kv.getType() == Cell.Type.Put) {
+                    if (put == null) {
+                        put = new Put(currentKey.get(),
+                                currentKey.getOffset(), 
currentKey.getLength());
+                    }
+                    put.add(kv);
+                } else {
+                    if (del == null) {
+                        del = new Delete(currentKey.get(),
+                                currentKey.getOffset(), 
currentKey.getLength());
+                    }
+                    del.add(kv);
+                }
+                prevKey = currentKey;
+            }
+        }
+        // append the last one
+        if (put != null && !this.ignoreReplicationFilter.test(put)) {
+            replicationLog.append(logKey.getTableName().getNameAsString(), -1, 
put);
+        }
+        if (del != null && !this.ignoreReplicationFilter.test(del)) {
+            replicationLog.append(logKey.getTableName().getNameAsString(), -1, 
del);
+        }
+        replicationLog.sync();
+    }
+
   private void populateRowsToLock(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
           BatchMutateContext context) {
       for (int i = 0; i < miniBatchOp.size(); i++) {
@@ -629,6 +791,11 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                   Result result = Result.create(cells);
                   miniBatchOp.setOperationStatus(i,
                           new OperationStatus(SUCCESS, result));
+                  // since this mutation is ignored by setting it's status to 
success in the coproc
+                  // it shouldn't be synchronously replicated
+                  if (this.shouldReplicate) {
+                      m.setAttribute(IGNORE_REPLICATION_ATTRIB, 
IGNORE_REPLICATION_ATTRIB_VAL);
+                  }
               }
           } else if (context.returnResult) {
               Map<ColumnReference, Pair<Cell, Boolean>> currColumnCellExprMap 
= new HashMap<>();
@@ -1547,6 +1714,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                 waitForPreviousConcurrentBatch(table, context);
             }
             preparePostIndexMutations(context, batchTimestamp, indexMetaData);
+            addGlobalIndexMutationsToWAL(miniBatchOp, context);
         }
         if (context.hasLocalIndex) {
             // Group all the updates for a single row into a single update to 
be processed (for local indexes)
@@ -1558,6 +1726,53 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
 
+    /**
+     * We need to add the index mutations to the data table's WAL to handle 
cases where the RS
+     * crashes before the postBatchMutateIndispensably hook is called where 
the mutations are
+     * synchronously replicated. This is needed because during WAL restore we 
don't have the
+     * IndexMaintainer object to generate the corresponding index mutations.
+     * @param miniBatchOp
+     * @param context
+     */
+    private void 
addGlobalIndexMutationsToWAL(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                              BatchMutateContext context) {
+        if (!this.shouldReplicate) {
+            return;
+        }
+
+        WALEdit edit = miniBatchOp.getWalEdit(0);
+        if (edit == null) {
+            edit = new WALEdit();
+            miniBatchOp.setWalEdit(0, edit);
+        }
+
+        if (context.preIndexUpdates != null) {
+            for (Map.Entry<HTableInterfaceReference, Mutation> entry
+                    : context.preIndexUpdates.entries()) {
+                if (this.ignoreReplicationFilter.test(entry.getValue())) {
+                    continue;
+                }
+                // This creates cells of family type WALEdit.METAFAMILY which 
are not applied
+                // on restore
+                edit.add(IndexedKeyValue.newIndexedKeyValue(
+                        entry.getKey().get(), entry.getValue()));
+            }
+        }
+
+        if (context.postIndexUpdates != null) {
+            for (Map.Entry<HTableInterfaceReference, Mutation> entry
+                    : context.postIndexUpdates.entries()) {
+                if (this.ignoreReplicationFilter.test(entry.getValue())) {
+                    continue;
+                }
+                // This creates cells of family type WALEdit.METAFAMILY which 
are not applied
+                // on restore
+                edit.add(IndexedKeyValue.newIndexedKeyValue(
+                        entry.getKey().get(), entry.getValue()));
+            }
+        }
+    }
+
     /**
      * In case of ON DUPLICATE KEY IGNORE, if the row already exists no 
mutations will be
      * generated so release the row lock.
@@ -1689,7 +1904,16 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           this.builder.batchCompleted(miniBatchOp);
 
           if (success) { // The pre-index and data table updates are 
successful, and now, do post index updates
-              doPost(c, context);
+              CompletableFuture<Void> postIndexFuture =
+                      CompletableFuture.runAsync(() -> doPost(c, context));
+              long start = EnvironmentEdgeManager.currentTimeMillis();
+              try {
+                  replicateMutations(miniBatchOp, context);
+              } finally {
+                  long duration = EnvironmentEdgeManager.currentTimeMillis() - 
start;
+                  metricSource.updateReplicationSyncTime(dataTableName, 
duration);
+              }
+              FutureUtils.get(postIndexFuture);
           }
        } finally {
            removeBatchMutateContext(c);
@@ -1761,7 +1985,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
 
-  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) throws IOException {
+  private void doPost(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context) {
       long start = EnvironmentEdgeManager.currentTimeMillis();
 
       try {
@@ -2231,4 +2455,49 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     public static boolean isAtomicOperationComplete(OperationStatus status) {
         return status.getOperationStatusCode() == SUCCESS && 
status.getResult() != null;
     }
+
+    private void replicateMutations(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+                                    BatchMutateContext context) throws 
IOException {
+
+        if (!this.shouldReplicate) {
+            return;
+        }
+        if (ignoreSyncReplicationForTesting) {
+            return;
+        }
+        assert this.replicationLog != null;
+
+        for (Integer i = 0; i < miniBatchOp.size(); i++) {
+            Mutation m = miniBatchOp.getOperation(i);
+            if (this.ignoreReplicationFilter.test(m)) {
+                continue;
+            }
+            this.replicationLog.append(this.dataTableName, -1, m);
+            Mutation[] mutationsAddedByCP = 
miniBatchOp.getOperationsFromCoprocessors(i);
+            if (mutationsAddedByCP != null) {
+                for (Mutation addedMutation : mutationsAddedByCP) {
+                    this.replicationLog.append(this.dataTableName, -1, 
addedMutation);
+                }
+            }
+        }
+        if (context.preIndexUpdates != null) {
+            for (Map.Entry<HTableInterfaceReference, Mutation> entry
+                    : context.preIndexUpdates.entries()) {
+                if (this.ignoreReplicationFilter.test(entry.getValue())) {
+                    continue;
+                }
+                this.replicationLog.append(entry.getKey().getTableName(), -1, 
entry.getValue());
+            }
+        }
+        if (context.postIndexUpdates != null) {
+            for (Map.Entry<HTableInterfaceReference, Mutation> entry
+                    : context.postIndexUpdates.entries()) {
+                if (this.ignoreReplicationFilter.test(entry.getValue())) {
+                    continue;
+                }
+                this.replicationLog.append(entry.getKey().getTableName(), -1, 
entry.getValue());
+            }
+        }
+        this.replicationLog.sync();
+    }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
index a973f02e1e..6bcbd99b90 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -53,6 +53,11 @@ public class IndexedKeyValue extends KeyValue {
     private int hashCode;
 
     public static IndexedKeyValue newIndexedKeyValue(byte[] bs, Mutation m){
+        Cell indexWALCell = adaptFirstCellFromMutation(m);
+        return new IndexedKeyValue(indexWALCell, new ImmutableBytesPtr(bs), m);
+    }
+
+    public static IndexedKeyValue newIndexedKeyValue(ImmutableBytesPtr bs, 
Mutation m) {
         Cell indexWALCell = adaptFirstCellFromMutation(m);
         return new IndexedKeyValue(indexWALCell, bs, m);
     }
@@ -83,9 +88,9 @@ public class IndexedKeyValue extends KeyValue {
     //used for deserialization
     public IndexedKeyValue() {}
 
-    private IndexedKeyValue(Cell c, byte[] bs, Mutation mutation){
+    private IndexedKeyValue(Cell c, ImmutableBytesPtr bs, Mutation mutation) {
         super(c);
-        this.indexTableName = new ImmutableBytesPtr(bs);
+        this.indexTableName = bs;
         this.mutation = mutation;
         this.hashCode = calcHashCode(indexTableName, mutation);
     }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
index 6262e75a10..a3810e0fb7 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroupWriter.java
@@ -462,6 +462,19 @@ public abstract class ReplicationLogGroupWriter {
         }
     }
 
+    /** Close the currentWriter.
+     * Needed by tests so that we can close the log file and then read it
+     */
+    protected void closeCurrentWriter() {
+        lock.lock();
+        try {
+            closeWriter(currentWriter);
+            currentWriter = null;
+        } finally {
+            lock.unlock();
+        }
+    }
+
     /**
      * Check if this ReplicationLogGroup is closed.
      *
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
index 39c8b13cb4..ec9445c24c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/SystemCatalogWALEntryFilter.java
@@ -76,20 +76,39 @@ public class SystemCatalogWALEntryFilter implements
   }
 
   /**
-   * Does the cell key have leading tenant Id.
+   * Does the cell row key have leading tenant Id.
    * @param cell hbase cell
    * @return true if the cell has leading tenant Id in key
    */
   private boolean isTenantIdLeadingInKey(final Cell cell) {
     // rows in system.catalog or system child that aren't tenant-owned
     // will have a leading separator byte
-    return cell.getRowArray()[cell.getRowOffset()]
-      != QueryConstants.SEPARATOR_BYTE;
+    return isTenantIdLeadingInKey(cell.getRowArray(), cell.getRowOffset());
+
+  }
+
+  /**
+   * Checks if the row key of SYSTEM.CATALOG or SYSTEM.CHILD_LINK row has 
leading tenant ID
+   * @param rowKey
+   * @param rowOffset
+   * @return true if the row key has leading tenant ID
+   */
+  public static boolean isTenantIdLeadingInKey(byte[] rowKey, int rowOffset) {
+    return rowKey[rowOffset] != QueryConstants.SEPARATOR_BYTE;
   }
 
   /**
-   * is the cell for system child link a tenant owned. Besides the non empty
-   * tenant id, system.child_link table have tenant owned data for parent child
+   * Is the cell for system child link a tenant owned. This happens if the 
tenant id is
+   * leading in the row key or the cell has tenant owned data for parent child 
links.
+   * @param cell hbase cell
+   * @return true if the cell is tenant owned
+   */
+  private boolean isTenantRowCellSystemChildLink(final Cell cell) {
+    return isTenantIdLeadingInKey(cell) || isCellChildLinkToTenantView(cell);
+  }
+
+  /**
+   * SYSTEM.CHILD_LINK table have tenant owned data for parent child
    * links. In this case, the column qualifier is
    * {@code PhoenixDatabaseMetaData#LINK_TYPE_BYTES} and value is
    * {@code PTable.LinkType.CHILD_TABLE}. For corresponding delete markers the
@@ -97,29 +116,27 @@ public class SystemCatalogWALEntryFilter implements
    * @param cell hbase cell
    * @return true if the cell is tenant owned
    */
-  private boolean isTenantRowCellSystemChildLink(final Cell cell) {
-    boolean isTenantRowCell = isTenantIdLeadingInKey(cell);
-
+  public static boolean isCellChildLinkToTenantView(final Cell cell) {
     ImmutableBytesWritable key = new ImmutableBytesWritable(
-      cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+            cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
     boolean isChildLinkToTenantView = false;
-    if (!isTenantRowCell) {
-      boolean isChildLink = CellUtil.matchingQualifier(
-        cell, PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
-      if ((isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES)) ||
-              cell.getType() == Cell.Type.DeleteFamily) {
-        byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
-        SchemaUtil.getVarChars(key.get(), key.getOffset(),
-            key.getLength(), 0, rowViewKeyMetadata);
-        /** if the child link is to a tenant-owned view, the COLUMN_NAME field 
will be
-         * the byte[] of the tenant otherwise, it will be an empty byte array
-         * (NOT QueryConstants.SEPARATOR_BYTE, but a byte[0]). This assumption 
is also
-         * true for child link's delete markers in SYSTEM.CHILD_LINK as it 
only contains link
-         * rows and does not deal with other type of rows like column rows 
that also has
-         * COLUMN_NAME populated with actual column name.**/
-        isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length 
!= 0;
-      }
+    boolean isChildLink = CellUtil.matchingQualifier(cell,
+            PhoenixDatabaseMetaData.LINK_TYPE_BYTES);
+    if (isChildLink && CellUtil.matchingValue(cell, CHILD_TABLE_BYTES)
+            || cell.getType() == Cell.Type.DeleteFamily) {
+      byte[][] rowViewKeyMetadata = new byte[NUM_COLUMNS_PRIMARY_KEY][];
+      SchemaUtil.getVarChars(key.get(), key.getOffset(),
+              key.getLength(), 0, rowViewKeyMetadata);
+      /** if the child link is to a tenant-owned view, the COLUMN_NAME field 
will be
+       * the byte[] of the tenant otherwise, it will be an empty byte array
+       * (NOT QueryConstants.SEPARATOR_BYTE, but a byte[0]). This assumption 
is also
+       * true for child link's delete markers in SYSTEM.CHILD_LINK as it only 
contains link
+       * rows and does not deal with other type of rows like column rows that 
also has
+       * COLUMN_NAME populated with actual column name.
+       **/
+      isChildLinkToTenantView = rowViewKeyMetadata[COLUMN_NAME_INDEX].length 
!= 0;
     }
-    return isTenantRowCell || isChildLinkToTenantView;
+    return isChildLinkToTenantView;
   }
+
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
index 042f04fef0..4b7b071c35 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/tool/LogFileAnalyzer.java
@@ -18,20 +18,25 @@
 package org.apache.phoenix.replication.tool;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathNotFoundException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFile.Record;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,47 +61,37 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
     private boolean verbose = false;
     private boolean decode = false;
     private boolean check = false;
+    FileSystem fs;
 
-    @Override
-    public int run(String[] args) throws Exception {
-        if (!parseArgs(args)) {
-            System.err.println(USAGE);
-            return 1;
-        }
-
+    private void init() throws IOException {
         Configuration conf = getConf();
         if (conf == null) {
             conf = HBaseConfiguration.create();
             setConf(conf);
         }
+        if (fs == null) {
+            fs = FileSystem.get(getConf());
+        }
+    }
 
+    @Override
+    public int run(String[] args) throws Exception {
+        if (!parseArgs(args)) {
+            System.err.println(USAGE);
+            return 1;
+        }
         try {
-            FileSystem fs = FileSystem.get(conf);
+            init();
             Path path = new Path(args[args.length - 1]);
-
-            if (!fs.exists(path)) {
-                System.err.println("Path does not exist: " + path);
-                return 1;
-            }
-
-            List<Path> filesToAnalyze = new ArrayList<>();
-            if (fs.getFileStatus(path).isDirectory()) {
-                // Recursively find all .plog files
-                findLogFiles(fs, path, filesToAnalyze);
-            } else {
-                filesToAnalyze.add(path);
-            }
-
+            List<Path> filesToAnalyze = getFilesToAnalyze(path);
             if (filesToAnalyze.isEmpty()) {
                 System.err.println("No log files found in: " + path);
                 return 1;
             }
-
             // Analyze each file
             for (Path file : filesToAnalyze) {
-                analyzeFile(fs, file);
+                analyzeFile(file);
             }
-
             return 0;
         } catch (Exception e) {
             LOG.error("Error analyzing log files", e);
@@ -104,19 +99,62 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
         }
     }
 
-    private void findLogFiles(FileSystem fs, Path dir, List<Path> files) 
throws IOException {
+    /**
+     * Returns all the mutations grouped by the table name under a source path
+     * @param source Path which can be a file or directory
+     * @return Mutations grouped by the table name
+     * @throws IOException
+     */
+    public Map<String, List<Mutation>> groupLogsByTable(String source) throws 
IOException {
+        Map<String, List<Mutation>> allFiles = Maps.newHashMap();
+        init();
+        Path path = new Path(source);
+        List<Path> filesToAnalyze = getFilesToAnalyze(path);
+        if (filesToAnalyze.isEmpty()) {
+            return allFiles;
+        }
+        // Analyze each file
+        for (Path file : filesToAnalyze) {
+            Map<String, List<Mutation>> perFile = groupLogsByTable(file);
+            for (Map.Entry<String, List<Mutation>> entry : perFile.entrySet()) 
{
+                List<Mutation> mutations = allFiles.get(entry.getKey());
+                if (mutations == null) {
+                    allFiles.put(entry.getKey(), entry.getValue());
+                } else {
+                    mutations.addAll(entry.getValue());
+                }
+            }
+        }
+        return allFiles;
+    }
+
+    private List<Path> getFilesToAnalyze(Path path) throws IOException {
+        if (!fs.exists(path)) {
+            throw new PathNotFoundException(path.toString());
+        }
+        List<Path> filesToAnalyze = Lists.newArrayList();
+        if (fs.getFileStatus(path).isDirectory()) {
+            // Recursively find all .plog files
+            findLogFiles(path, filesToAnalyze);
+        } else {
+            filesToAnalyze.add(path);
+        }
+        return filesToAnalyze;
+    }
+
+    private void findLogFiles(Path dir, List<Path> files) throws IOException {
         FileStatus[] statuses = fs.listStatus(dir);
         for (FileStatus status : statuses) {
             Path path = status.getPath();
             if (status.isDirectory()) {
-                findLogFiles(fs, path, files);
+                findLogFiles(path, files);
             } else if (path.getName().endsWith(".plog")) {
                 files.add(path);
             }
         }
     }
 
-    private void analyzeFile(FileSystem fs, Path file) throws IOException {
+    private void analyzeFile(Path file) throws IOException {
         System.out.println("\nAnalyzing file: " + file);
 
         LogFileReaderContext context = new LogFileReaderContext(getConf())
@@ -150,13 +188,18 @@ public class LogFileAnalyzer extends Configured 
implements Tool {
             }
 
             // Print trailer information
-            System.out.println("\nTrailer:");
-            System.out.println("  Record Count: " + 
reader.getTrailer().getRecordCount());
-            System.out.println("  Block Count: " + 
reader.getTrailer().getBlockCount());
-            System.out.println("  Blocks Start Offset: "
-                + reader.getTrailer().getBlocksStartOffset());
-            System.out.println("  Trailer Start Offset: "
-                + reader.getTrailer().getTrailerStartOffset());
+            LogFile.Trailer trailer = reader.getTrailer();
+            if (trailer != null) {
+                System.out.println("\nTrailer:");
+                System.out.println("  Record Count: " + 
reader.getTrailer().getRecordCount());
+                System.out.println("  Block Count: " + 
reader.getTrailer().getBlockCount());
+                System.out.println("  Blocks Start Offset: "
+                        + reader.getTrailer().getBlocksStartOffset());
+                System.out.println("  Trailer Start Offset: "
+                        + reader.getTrailer().getTrailerStartOffset());
+            } else {
+                System.out.println("\nTrailer is null");
+            }
 
             // Print verification results if checking
             if (check) {
@@ -179,6 +222,31 @@ public class LogFileAnalyzer extends Configured implements 
Tool {
         }
     }
 
+    private Map<String, List<Mutation>> groupLogsByTable(Path file) throws 
IOException {
+        Map<String, List<Mutation>> mutationsByTable = Maps.newHashMap();
+        System.out.println("\nAnalyzing file: " + file);
+        LogFileReaderContext context = new LogFileReaderContext(getConf())
+                .setFileSystem(fs)
+                .setFilePath(file)
+                .setSkipCorruptBlocks(check); // Skip corrupt blocks if 
checking
+        LogFileReader reader = new LogFileReader();
+        try {
+            reader.init(context);
+            // Process records
+            Record record;
+            while ((record = reader.next()) != null) {
+                String tableName = record.getHBaseTableName();
+                List<Mutation> mutations = 
mutationsByTable.getOrDefault(tableName,
+                        Lists.newArrayList());
+                mutations.add(record.getMutation());
+                mutationsByTable.put(tableName, mutations);
+            }
+        } finally {
+            reader.close();
+        }
+        return mutationsByTable;
+    }
+
     private boolean parseArgs(String[] args) {
         if (args.length == 0) {
             return false;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
new file mode 100644
index 0000000000..f111d141f6
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication;
+
+import static 
org.apache.phoenix.hbase.index.IndexRegionObserver.DEFAULT_HA_GROUP;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.end2end.IndexToolIT;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.query.PhoenixTestBuilder;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.replication.tool.LogFileAnalyzer;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ReplicationLogGroupIT extends ParallelStatsDisabledIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReplicationLogGroupIT.class);
+
+    @Rule
+    public TestName name = new TestName();
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.SYNCHRONOUS_REPLICATION_ENABLED, 
Boolean.TRUE.toString());
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Before
+    public void beforeTest() throws Exception {
+        LOG.info("Starting test {}", name.getMethodName());
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        LOG.info("Starting cleanup for test {}", name.getMethodName());
+        cleanupLogsFolder(standbyUri);
+        LOG.info("Ending cleanup for test {}", name.getMethodName());
+    }
+
+    /**
+     * Delete all the shards under the top level replication log directory
+     * @throws IOException
+     */
+    private void cleanupLogsFolder(URI source) throws IOException {
+        FileSystem fs = FileSystem.get(config);
+        Path dir = new Path(source.getPath());
+        FileStatus[] statuses = fs.listStatus(dir);
+        for (FileStatus status : statuses) {
+            Path shard = status.getPath();
+            if (status.isDirectory()) {
+                fs.delete(shard, true);
+            }
+        }
+    }
+
+    private ReplicationLogGroup getReplicationLogGroup() throws IOException {
+        HRegionServer rs = getUtility().getHBaseCluster().getRegionServer(0);
+        return ReplicationLogGroup.get(config, rs.getServerName(), 
DEFAULT_HA_GROUP);
+    }
+
+    private Map<String, List<Mutation>> groupLogsByTable() throws Exception {
+        ReplicationLogGroup log = getReplicationLogGroup();
+        log.getActiveWriter().closeCurrentWriter();
+        LogFileAnalyzer analyzer = new LogFileAnalyzer();
+        analyzer.setConf(config);
+        String[] args = {"--check", standbyUri.getPath()};
+        assertEquals(0, analyzer.run(args));
+        return analyzer.groupLogsByTable(standbyUri.getPath());
+    }
+
+    private int getCountForTable(Map<String, List<Mutation>> logsByTable,
+                                 String tableName) throws Exception {
+        List<Mutation> mutations = logsByTable.get(tableName);
+        return mutations != null ? mutations.size() : 0;
+    }
+
+    private void verifyReplication(Connection conn,
+                                   Map<String, Integer> expected) throws 
Exception {
+        Map<String, List<Mutation>> mutationsByTable = groupLogsByTable();
+        dumpTableLogCount(mutationsByTable);
+        for (Map.Entry<String, Integer> entry : expected.entrySet()) {
+            String tableName = entry.getKey();
+            int expectedMutationCount = entry.getValue();
+            List<Mutation> mutations = mutationsByTable.get(tableName);
+            int actualMutationCount = mutations != null ? mutations.size() : 0;
+            try {
+                if (!tableName.equals(SYSTEM_CATALOG_NAME)) {
+                    assertEquals(String.format("For table %s", tableName),
+                            expectedMutationCount, actualMutationCount);
+                } else {
+                    // special handling for syscat
+                    assertTrue("For SYSCAT", actualMutationCount >= 
expectedMutationCount);
+                }
+            } catch (AssertionError e) {
+                TestUtil.dumpTable(conn, TableName.valueOf(tableName));
+                throw e;
+            }
+        }
+    }
+
+    private void dumpTableLogCount(Map<String, List<Mutation>> 
mutationsByTable) {
+        LOG.info("Dump table log count for test {}", name.getMethodName());
+        for  (Map.Entry<String, List<Mutation>> table : 
mutationsByTable.entrySet()) {
+            LOG.info("#Log entries for {} = {}", table.getKey(), 
table.getValue().size());
+        }
+    }
+
+    private void moveRegionToServer(TableName tableName, ServerName sn) throws 
Exception {
+        HBaseTestingUtility util = getUtility();
+        try (RegionLocator locator = 
util.getConnection().getRegionLocator(tableName)) {
+            String regEN = 
locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
+            while 
(!sn.equals(locator.getAllRegionLocations().get(0).getServerName())) {
+                LOG.info("Moving region {} of table {} to server {}", regEN, 
tableName, sn);
+                util.getAdmin().move(Bytes.toBytes(regEN), sn);
+                Thread.sleep(100);
+            }
+            LOG.info("Moved region {} of table {} to server {}", regEN, 
tableName, sn);
+        }
+    }
+
+    private PhoenixTestBuilder.SchemaBuilder createViewHierarchy() throws 
Exception {
+        // Define the test schema.
+        // 1. Table with columns => (ORG_ID, KP, COL1, COL2, COL3), PK => 
(ORG_ID, KP)
+        // 2. GlobalView with columns => (ID, COL4, COL5, COL6), PK => (ID)
+        // 3. Tenant with columns => (ZID, COL7, COL8, COL9), PK => (ZID)
+        final PhoenixTestBuilder.SchemaBuilder schemaBuilder = new 
PhoenixTestBuilder.SchemaBuilder(getUrl());
+        PhoenixTestBuilder.SchemaBuilder.TableOptions tableOptions =
+                PhoenixTestBuilder.SchemaBuilder.TableOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions
+                globalViewOptions = 
PhoenixTestBuilder.SchemaBuilder.GlobalViewOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.TenantViewOptions
+                tenantViewWithOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewOptions.withDefaults();
+        PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions
+                tenantViewIndexOverrideOptions = 
PhoenixTestBuilder.SchemaBuilder.TenantViewIndexOptions.withDefaults();
+
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            schemaBuilder.withTableOptions(tableOptions)
+                    .withGlobalViewOptions(globalViewOptions)
+                    .withTenantViewOptions(tenantViewWithOverrideOptions)
+                    .withTenantViewIndexOptions(tenantViewIndexOverrideOptions)
+                    .buildWithNewTenant();
+        }
+        return schemaBuilder;
+    }
+
+    @Test
+    public void testAppendAndSync() throws Exception {
+        final String tableName = "T_" + generateUniqueName();
+        final String indexName1 = "I_" + generateUniqueName();
+        final String indexName2 = "I_" + generateUniqueName();
+        final String indexName3 = "L_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = String.format("create table %s (id1 integer not null, 
" +
+                    "id2 integer not null, val1 varchar, val2 varchar " +
+                    "constraint pk primary key (id1, id2))", tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val1) include (val2)",
+                    indexName1, tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val2) include (val1)",
+                    indexName2, tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create local index %s on %s (id2,val1) 
include (val2)",
+                    indexName3, tableName);
+            conn.createStatement().execute(ddl);
+            conn.commit();
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tableName + " VALUES(?, ?, ?, ?)");
+            // upsert 50 rows
+            int rowCount = 50;
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 10; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+                    stmt.setString(4, null);
+                    stmt.executeUpdate();
+                }
+                conn.commit();
+            }
+            // do some atomic upserts which will be ignored and therefore not 
replicated
+            stmt = conn.prepareStatement("upsert into " + tableName + " 
VALUES(?, ?, ?) " +
+                    "ON DUPLICATE KEY IGNORE");
+            conn.setAutoCommit(true);
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 2; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, null);
+                    assertEquals(0, stmt.executeUpdate());
+                }
+            }
+            // verify the correctness of the index
+            IndexToolIT.verifyIndexTable(tableName, indexName1, conn);
+            // verify replication
+            Map<String, Integer> expected = Maps.newHashMap();
+            // mutation count will be equal to row count since the atomic 
upsert mutations will be
+            // ignored and therefore not replicated
+            expected.put(tableName, rowCount * 3); // Put + Delete + local 
index update
+            // for index1 unverified + verified + delete (Delete column)
+            expected.put(indexName1, rowCount * 3);
+            // for index2 unverified + verified  since the null column is part 
of row key
+            expected.put(indexName2, rowCount * 2);
+            // we didn't create any tenant views so no change in the syscat 
entries
+            expected.put(SYSTEM_CATALOG_NAME, 0);
+            expected.put(SYSTEM_CHILD_LINK_NAME, 0);
+            verifyReplication(conn, expected);
+        }
+    }
+
+    /**
+     * This test simulates RS crashes in the middle of write transactions 
after the edits
+     * have been written to the WAL but before they have been replicated to 
the standby
+     * cluster. Those edits will be replicated when the WAL is replayed.
+     */
+    @Test
+    public void testWALRestore() throws Exception {
+        HBaseTestingUtility util = getUtility();
+        MiniHBaseCluster cluster = util.getHBaseCluster();
+        final String tableName = "T_" + generateUniqueName();
+        final String indexName = "I_" + generateUniqueName();
+        TableName table = TableName.valueOf(tableName);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String ddl = String.format("create table %s (id1 integer not null, 
" +
+                    "id2 integer not null, val1 varchar, val2 varchar " +
+                    "constraint pk primary key (id1, id2))", tableName);
+            conn.createStatement().execute(ddl);
+            ddl = String.format("create index %s on %s (val1) include (val2)",
+                    indexName, tableName);
+            conn.createStatement().execute(ddl);
+            conn.commit();
+        }
+        // Mini cluster by default comes with only 1 RS. Starting a second RS 
so that
+        // we can kill the RS
+        JVMClusterUtil.RegionServerThread rs2 = cluster.startRegionServer();
+        ServerName sn2 = rs2.getRegionServer().getServerName();
+        // Assign some table regions to the new RS we started above
+        moveRegionToServer(table, sn2);
+        moveRegionToServer(TableName.valueOf(SYSTEM_CATALOG_NAME), sn2);
+        moveRegionToServer(TableName.valueOf(SYSTEM_CHILD_LINK_NAME), sn2);
+        int rowCount = 50;
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tableName + " VALUES(?, ?, ?, ?)");
+            // upsert 50 rows
+            for (int i = 0; i < 5; ++i) {
+                for (int j = 0; j < 10; ++j) {
+                    stmt.setInt(1, i);
+                    stmt.setInt(2, j);
+                    stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+                    stmt.setString(4, null); // Generate a DeleteColumn cell
+                    stmt.executeUpdate();
+                }
+                // we want to simulate RS crash after updating memstore and WAL
+                IndexRegionObserver.setIgnoreSyncReplicationForTesting(true);
+                conn.commit();
+            }
+            // Create tenant views for syscat and child link replication
+            createViewHierarchy();
+        } finally {
+            IndexRegionObserver.setIgnoreSyncReplicationForTesting(false);
+        }
+        // Kill the RS
+        cluster.killRegionServer(rs2.getRegionServer().getServerName());
+        Threads.sleep(20000); // just to be sure that the kill has fully 
started.
+        // Regions will be re-opened and the WAL will be replayed
+        util.waitUntilAllRegionsAssigned(table);
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Map<String, Integer> expected = Maps.newHashMap();
+            // For each row 1 Put + 1 Delete (DeleteColumn)
+            expected.put(tableName, rowCount * 2);
+            // unverified + verified + delete (Delete column)
+            expected.put(indexName, rowCount * 3);
+            // 1 tenant view was created
+            expected.put(SYSTEM_CHILD_LINK_NAME, 1);
+            // atleast 1 log entry for syscat
+            expected.put(SYSTEM_CATALOG_NAME, 1);
+            verifyReplication(conn, expected);
+        }
+    }
+
+    @Test
+    public void testSystemTables() throws Exception {
+        createViewHierarchy();
+        Map<String, List<Mutation>> logsByTable = groupLogsByTable();
+        dumpTableLogCount(logsByTable);
+        // find all the log entries for system tables
+        Map<String, List<Mutation>> systemTables = 
logsByTable.entrySet().stream()
+                .filter(entry -> 
entry.getKey().startsWith(QueryConstants.SYSTEM_SCHEMA_NAME))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+        // there should be only 2 entries CATALOG, CHILD_LINK
+        assertEquals(2, systemTables.size());
+        assertEquals(1, getCountForTable(systemTables, 
SYSTEM_CHILD_LINK_NAME));
+        assertTrue(getCountForTable(systemTables, SYSTEM_CATALOG_NAME) > 0);
+    }
+}
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 3e9775e216..bf2f65f592 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -85,6 +85,7 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.math.BigDecimal;
+import java.net.URI;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.Date;
@@ -119,6 +120,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nonnull;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -154,6 +156,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.replication.ReplicationLogGroup;
 import org.apache.phoenix.schema.NewerTableAlreadyExistsException;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableType;
@@ -432,6 +435,8 @@ public abstract class BaseTest {
     protected static boolean clusterInitialized = false;
     protected static HBaseTestingUtility utility;
     protected static final Configuration config = HBaseConfiguration.create();
+    protected static final String logDir = "/PHOENIX_REPLICATION_IN";
+    protected static URI standbyUri = new Path(logDir).toUri();
 
     protected static String getUrl() {
         if (!clusterInitialized) {
@@ -669,6 +674,8 @@ public abstract class BaseTest {
             conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 0);
         }
         setPhoenixRegionServerEndpoint(conf);
+        // setup up synchronous replication
+        conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
         return conf;
     }
 
@@ -1985,7 +1992,7 @@ public abstract class BaseTest {
     }
 
     /**
-     * Returns true if the region contains atleast one of the metadata rows we 
are interested in
+     * Returns true if the region contains at least one of the metadata rows 
we are interested in
      */
     protected static boolean regionContainsMetadataRows(RegionInfo regionInfo,
             List<byte[]> metadataRowKeys) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 437f946647..d3176490d1 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -93,7 +93,7 @@ public class ReplicationLogGroupTest {
     public void setUp() throws IOException {
         conf = HBaseConfiguration.create();
         localFs = FileSystem.getLocal(conf);
-        standbyUri = new Path(testFolder.toString()).toUri();
+        standbyUri = new Path(testFolder.getRoot().toString()).toUri();
         serverName = ServerName.valueOf("test", 60010, 
EnvironmentEdgeManager.currentTimeMillis());
         conf.set(ReplicationLogGroup.REPLICATION_STANDBY_HDFS_URL_KEY, 
standbyUri.toString());
         // Small ring buffer size for testing

Reply via email to