virajjasani commented on a change in pull request #2198:
URL: https://github.com/apache/hbase/pull/2198#discussion_r465794580



##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
##########
@@ -295,7 +356,40 @@ protected void stopServiceThreads() {
     }
   }
 
-  // Test HBASE-20497
+  /**
+   * Deadend Endpoint. Does nothing.
+   */
+  public static class DoNothingReplicationEndpoint extends 
HBaseInterClusterReplicationEndpoint {
+    private final UUID uuid = UUID.randomUUID();
+
+    @Override public void init(Context context) throws IOException {
+      this.ctx = context;
+      return;

Review comment:
       nit: redundant

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSource.java
##########
@@ -108,16 +107,100 @@ public static void tearDownAfterClass() throws Exception 
{
     TEST_UTIL.shutdownMiniDFSCluster();
   }
 
+  /**
+   * Test the default ReplicationSource skips queuing hbase:meta WAL files.
+   */
+  @Test
+  public void testDefaultSkipsMetaWAL() throws IOException {
+    ReplicationSource rs = new ReplicationSource();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt("replication.source.maxretriesmultiplier", 1);
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = 
Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId, null,
+      p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      assertTrue(rs.isSourceActive());
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1" + META_WAL_PROVIDER_ID));
+      assertEquals(0, rs.getSourceMetrics().getSizeOfLogQueue());
+      rs.enqueueLog(new Path("a.1"));
+      assertEquals(1, rs.getSourceMetrics().getSizeOfLogQueue());
+    } finally {
+      rs.terminate("Done");
+      rss.stop("Done");
+    }
+  }
+
+  /**
+   * Test that we filter out meta edits, etc.
+   */
+  @Test
+  public void testWALEntryFilter() throws IOException {
+    // To get the fully constructed default WALEntryFilter, need to create a 
ReplicationSource
+    // instance and init it.
+    ReplicationSource rs = new ReplicationSource();
+    UUID uuid = UUID.randomUUID();
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
+    Mockito.when(mockPeer.getConfiguration()).thenReturn(conf);
+    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
+    ReplicationPeerConfig peerConfig = 
Mockito.mock(ReplicationPeerConfig.class);
+    Mockito.when(peerConfig.getReplicationEndpointImpl()).
+      thenReturn(DoNothingReplicationEndpoint.class.getName());
+    Mockito.when(mockPeer.getPeerConfig()).thenReturn(peerConfig);
+    ReplicationSourceManager manager = 
Mockito.mock(ReplicationSourceManager.class);
+    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
+    String queueId = "qid";
+    RegionServerServices rss =
+      
TEST_UTIL.createMockRegionServerService(ServerName.parseServerName("a.b.c,1,1"));
+    rs.init(conf, null, manager, null, mockPeer, rss, queueId,
+      uuid, p -> OptionalLong.empty(), new MetricsSource(queueId));
+    try {
+      rs.startup();
+      TEST_UTIL.waitFor(30000, () -> rs.getWalEntryFilter() != null);
+      WALEntryFilter wef = rs.getWalEntryFilter();
+      // Test non-system WAL edit.
+      WAL.Entry e = new WAL.Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY,
+        TableName.valueOf("test"), -1), new WALEdit());
+      assertTrue(wef.filter(e) == e);

Review comment:
       nit: just in case if you like `assertSame(e, wef.filter(e))`

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/WALEntryFilter.java
##########
@@ -49,5 +47,5 @@
    * @return a (possibly modified) Entry to use. Returning null or an entry 
with no cells will cause
    *         the entry to be skipped for replication.
    */
-  public Entry filter(Entry entry);
+  Entry filter(Entry entry);

Review comment:
       For system tables we return null Entry. Good to consider return type 
`Optional<Entry>` here?
   Maybe as follow up task?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to