Apache9 commented on code in PR #4924:
URL: https://github.com/apache/hbase/pull/4924#discussion_r1067794535


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.java:
##########
@@ -137,4 +140,33 @@ default void 
preExecuteProcedures(ObserverContext<RegionServerCoprocessorEnviron
   default void 
postExecuteProcedures(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
     throws IOException {
   }
+
+  /**
+   * This will be called before replication sink mutations are executed on the 
sink table as part of
+   * batch call.
+   * @param ctx      the environment to interact with the framework and region 
server.
+   * @param walEntry wal entry from which mutation is formed.
+   * @param mutation mutation to be applied at sink cluster.
+   * @throws IOException if something goes wrong.
+   */
+  default void preReplicationSinkBatchMutate(
+    ObserverContext<RegionServerCoprocessorEnvironment> ctx, 
AdminProtos.WALEntry walEntry,

Review Comment:
   Agree with @apurtell that exposing pb message in CP is not very good but I 
checked the code, we never convert the WALEntry to a none pb one so there is no 
good way for us to not use pb message here. Let's keep it like this for now.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java:
##########
@@ -178,16 +179,19 @@ private void decorateConf() {
   /**
    * Replicate this array of entries directly into the local cluster using the 
native client. Only
    * operates against raw protobuf type saving on a conversion from pb to pojo.
+   * @param entries                    WAL entries to be replicated.
+   * @param cells                      cell scanner for iteration.
    * @param replicationClusterId       Id which will uniquely identify source 
cluster FS client
    *                                   configurations in the replication 
configuration directory
    * @param sourceBaseNamespaceDirPath Path that point to the source cluster 
base namespace
    *                                   directory
    * @param sourceHFileArchiveDirPath  Path that point to the source cluster 
hfile archive directory
+   * @param rsServerHost               regionserver coproc host.
    * @throws IOException If failed to replicate the data
    */
   public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
     String replicationClusterId, String sourceBaseNamespaceDirPath,
-    String sourceHFileArchiveDirPath) throws IOException {
+    String sourceHFileArchiveDirPath, RegionServerCoprocessorHost 
rsServerHost) throws IOException {

Review Comment:
   I suppose the RegionServerCroprocessorHost should be initialized when 
creating ReplicationSink?
   
   I checked the code, maybe in ReplicationSinkServiceImpl, we could check 
whether the Server is an instance of HRegionServer, if so, we get its 
RegionServerCoprocessorHost for creating the ReplicationSink. And we could also 
reuse the AsyncClusterConnection of the Server, instead of creating a new one 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to