huaxiangsun commented on a change in pull request #2522:
URL: https://github.com/apache/hbase/pull/2522#discussion_r502702094



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java
##########
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
+ * This EndPoint is specially for system tables. It does a few things:
+ *
+ * 1. For each replica region, there is a shipper to ship edits for this 
region. Internally,
+ * the shipper maintains a in-memory queue, where edits from wal log are 
queued in this
+ * in-memory queue.
+ * 2. Each shipper will read edits from its queue and ship edits to the 
specific replica region.
+ * The pace of shippers are independent from each other, so they are not 
synced.
+ * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be 
discarded as
+ * those edits are already in flushed hfiles. This also prevents the queue 
grow out-of-bound
+ * when the replica region server is slow processing edits.
+ * 4. register/deregister region is based on region events from wal log.
+ * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as 
it does not need
+ * zookeeper support.
+ *
+ * TODO: the threading model for shippers needs to be improved. Right now, 
each shipper maps to
+ * one thread, which is not efficient. It needs to be improved so two threads 
are needed.
+ *  One sending thread which loops through shipper's queue and send out the 
walEdits.
+ *  One receiving thread which will wait for futures, once it is ready, it 
processes that
+ *  future accordingly.
+ */
[email protected] public class CatalogReplicaReplicationEndpoint
+  extends BaseReplicationEndpoint {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private AsyncClusterConnection connection;
+
+  private int numRetries;
+  private long operationTimeoutNs;
+  private long shippedEdits;
+
+  // Track primary regions under replication.
+  ConcurrentHashMap<String, CatalogReplicaShipper[]> 
primaryRegionsUnderReplication;
+
+  private byte[][] replicatedFamilies;
+
+  public CatalogReplicaReplicationEndpoint() {
+    // TODO: right now, it only allow family info to be replicated.
+    // This need to be changed in the future.
+    replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY };
+    primaryRegionsUnderReplication = new ConcurrentHashMap<>();
+  }
+
+  public long getShippedEdits() {
+    return shippedEdits;
+  }
+
+  @Override public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = context.getConfiguration();
+    // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
+    // We are resetting it here because we want default number of retries (35) 
rather than 10 times
+    // that which makes very long retries for disabled tables etc.
+    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    if (defaultNumRetries > 10) {
+      int mult = 
conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
+        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
+      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
+    }
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
+      .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();

Review comment:
       This is the connection in the region server, timeout and retries will be 
passed in when replay() is called.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to