This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e7797208 Revert "HBASE-25068 Pass WALFactory to Replication so it 
knows of all WALProviders, not just default/user-space"
e7797208 is described below

commit e7797208d6ca10a10d37b77591e1f0531ed57dfc
Author: stack <[email protected]>
AuthorDate: Tue Sep 22 20:48:31 2020 -0700

    Revert "HBASE-25068 Pass WALFactory to Replication so it knows of all 
WALProviders, not just default/user-space"
    
    This reverts commit 17ebf917ba354e4632b726323b2b32af3aa6c8de.
---
 .../apache/hadoop/hbase/regionserver/HRegionServer.java   | 15 ++++++++-------
 .../hadoop/hbase/regionserver/ReplicationService.java     | 11 +++++++----
 .../hbase/replication/regionserver/Replication.java       |  8 ++++----
 .../hbase/replication/regionserver/ReplicationSyncUp.java |  6 ++----
 .../hadoop/hbase/replication/TestReplicationBase.java     |  2 +-
 .../regionserver/TestReplicationSourceManager.java        |  3 +--
 6 files changed, 23 insertions(+), 22 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 8abede5..f14da2f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -1911,7 +1911,8 @@ public class HRegionServer extends Thread implements
         throw new IOException("Can not create wal directory " + logDir);
       }
       // Instantiate replication if replication enabled. Pass it the log 
directories.
-      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, 
factory);
+      createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir,
+        factory.getWALProvider());
     }
     this.walFactory = factory;
   }
@@ -3062,7 +3063,7 @@ public class HRegionServer extends Thread implements
    * Load the replication executorService objects, if any
    */
   private static void createNewReplicationInstance(Configuration conf, 
HRegionServer server,
-      FileSystem walFs, Path walDir, Path oldWALDir, WALFactory walFactory) 
throws IOException {
+      FileSystem walFs, Path walDir, Path oldWALDir, WALProvider walProvider) 
throws IOException {
     // read in the name of the source replication class from the config file.
     String sourceClassname = 
conf.get(HConstants.REPLICATION_SOURCE_SERVICE_CLASSNAME,
       HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
@@ -3075,19 +3076,19 @@ public class HRegionServer extends Thread implements
     // only one object.
     if (sourceClassname.equals(sinkClassname)) {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, 
oldWALDir, walFactory);
+        ReplicationSourceService.class, conf, server, walFs, walDir, 
oldWALDir, walProvider);
       server.replicationSinkHandler = (ReplicationSinkService) 
server.replicationSourceHandler;
     } else {
       server.replicationSourceHandler = newReplicationInstance(sourceClassname,
-        ReplicationSourceService.class, conf, server, walFs, walDir, 
oldWALDir, walFactory);
+        ReplicationSourceService.class, conf, server, walFs, walDir, 
oldWALDir, walProvider);
       server.replicationSinkHandler = newReplicationInstance(sinkClassname,
-        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, 
walFactory);
+        ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, 
walProvider);
     }
   }
 
   private static <T extends ReplicationService> T 
newReplicationInstance(String classname,
       Class<T> xface, Configuration conf, HRegionServer server, FileSystem 
walFs, Path logDir,
-      Path oldLogDir, WALFactory walFactory) throws IOException {
+      Path oldLogDir, WALProvider walProvider) throws IOException {
     final Class<? extends T> clazz;
     try {
       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
@@ -3096,7 +3097,7 @@ public class HRegionServer extends Thread implements
       throw new IOException("Could not find class for " + classname);
     }
     T service = ReflectionUtils.newInstance(clazz, conf);
-    service.initialize(server, walFs, logDir, oldLogDir, walFactory);
+    service.initialize(server, walFs, logDir, oldLogDir, walProvider);
     return service;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
index 33b3321..e9bbaea 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationService.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,7 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
-import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -32,11 +32,14 @@ import org.apache.yetus.audience.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public interface ReplicationService {
+
   /**
    * Initializes the replication service object.
+   * @param walProvider can be null if not initialized inside a live region 
server environment, for
+   *          example, {@code ReplicationSyncUp}.
    */
-  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, 
WALFactory walFactory)
-    throws IOException;
+  void initialize(Server rs, FileSystem fs, Path logdir, Path oldLogDir, 
WALProvider walProvider)
+      throws IOException;
 
   /**
    * Start replication services.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index d8a696c..195877b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.SyncReplicationState;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -90,7 +89,7 @@ public class Replication implements ReplicationSourceService, 
ReplicationSinkSer
 
   @Override
   public void initialize(Server server, FileSystem fs, Path logDir, Path 
oldLogDir,
-      WALFactory walFactory) throws IOException {
+      WALProvider walProvider) throws IOException {
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.isReplicationForBulkLoadDataEnabled =
@@ -129,7 +128,6 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
     SyncReplicationPeerMappingManager mapping = new 
SyncReplicationPeerMappingManager();
     this.globalMetricsSource = CompatibilitySingletonFactory
         .getInstance(MetricsReplicationSourceFactory.class).getGlobalSource();
-    WALProvider walProvider = walFactory.getWALProvider();
     this.replicationManager = new ReplicationSourceManager(queueStorage, 
replicationPeers,
         replicationTracker, conf, this.server, fs, logDir, oldLogDir, 
clusterId,
         walProvider != null ? walProvider.getWALFileLengthProvider() : p -> 
OptionalLong.empty(),
@@ -200,6 +198,7 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
    * @param sourceBaseNamespaceDirPath Path that point to the source cluster 
base namespace
    *          directory required for replicating hfiles
    * @param sourceHFileArchiveDirPath Path that point to the source cluster 
hfile archive directory
+   * @throws IOException
    */
   @Override
   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
@@ -212,6 +211,7 @@ public class Replication implements 
ReplicationSourceService, ReplicationSinkSer
   /**
    * If replication is enabled and this cluster is a master,
    * it starts
+   * @throws IOException
    */
   @Override
   public void startReplicationService() throws IOException {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index b04c7eb..98490f1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -83,8 +82,7 @@ public class ReplicationSyncUp extends Configured implements 
Tool {
 
       System.out.println("Start Replication Server start");
       Replication replication = new Replication();
-      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
-        new WALFactory(conf, "test", false));
+      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, 
null);
       ReplicationSourceManager manager = replication.getReplicationManager();
       manager.init().get();
       while (manager.activeFailoverTaskCount() > 0) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 455b272..6e1692a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4abb00f..8e38114 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -194,8 +194,7 @@ public abstract class TestReplicationSourceManager {
     logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME);
     remoteLogDir = 
utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME);
     replication = new Replication();
-    replication.initialize(new DummyServer(), fs, logDir, oldLogDir,
-      new WALFactory(conf, "test", false));
+    replication.initialize(new DummyServer(), fs, logDir, oldLogDir, null);
     managerOfCluster = getManagerFromCluster();
     if (managerOfCluster != null) {
       // After replication procedure, we need to add peer by hand (other than 
by receiving

Reply via email to