Apache9 commented on a change in pull request #2522:
URL: https://github.com/apache/hbase/pull/2522#discussion_r505536165



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/BaseReplicationEndpoint.java
##########
@@ -40,6 +39,10 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(BaseReplicationEndpoint.class);
   public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY
       = "hbase.replication.source.custom.walentryfilters";
+  // Can be configured differently than hbase.client.retries.number
+  public static final String CLIENT_RETRIES_NUMBER =
+    "hbase.region.replica.replication.client.retries.number";

Review comment:
       Why move it here? It is base replication endpoint, not 
BaseRegionReplicaReplicationEndpoint, I do not think we should put region 
replica specific configs in this general class.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java
##########
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
+ * This EndPoint is specially for system tables. It does a few things:
+ * 1. For each replica region, there is a shipper to ship edits for this 
region. Internally,
+ * the shipper maintains a in-memory queue, where edits from wal log are 
queued in this
+ * in-memory queue.
+ * 2. Each shipper will read edits from its queue and ship edits to the 
specific replica region.
+ * The pace of shippers are independent from each other, so they are not 
synced.
+ * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be 
discarded as
+ * those edits are already in flushed hfiles. This also prevents the queue 
grow out-of-bound
+ * when the replica region server is slow processing edits.
+ * 4. register/deregister region is based on region events from wal log.
+ * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as 
it does not need
+ * zookeeper support.
+ * TODO: the threading model for shippers needs to be improved. Right now, 
each shipper maps to
+ * one thread, which is not efficient. It needs to be improved so two threads 
are needed.
+ * One sending thread which loops through shipper's queue and send out the 
walEdits.
+ * One receiving thread which will wait for futures, once it is ready, it 
processes that
+ * future accordingly.
+ */
[email protected] public class CatalogReplicaReplicationEndpoint
+  extends BaseReplicationEndpoint {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private AsyncClusterConnection connection;
+
+  private int numRetries;
+  private long operationTimeoutNs;
+  private long shippedEdits;
+
+  // Track primary regions under replication.
+  ConcurrentHashMap<String, CatalogReplicaShipper[]> 
primaryRegionsUnderReplication;
+
+  private byte[][] replicatedFamilies;
+
+  public CatalogReplicaReplicationEndpoint() {
+    // TODO: right now, it only allow family info to be replicated.
+    // This need to be changed in the future.
+    replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY };
+    primaryRegionsUnderReplication = new ConcurrentHashMap<>();
+  }
+
+  public long getShippedEdits() {
+    return shippedEdits;
+  }
+
+  @Override public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = context.getConfiguration();
+    // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
+    // We are resetting it here because we want default number of retries (35) 
rather than 10 times
+    // that which makes very long retries for disabled tables etc.
+    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    if (defaultNumRetries > 10) {
+      int mult = 
conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
+        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
+      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
+    }
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
+      .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
+  }
+
+  /**
+   * returns true if the specified entry must be replicated. We should always 
replicate meta
+   * operations (e.g. flush) and use the configured family to filter out edits 
which do not
+   * need to be replicated.
+   */
+  private boolean requiresReplication(final WALEdit edit) {
+
+    // TODO: let meta edits go through. Need to filter out flush events which 
are not for
+    // targetting families.
+    if (edit.isMetaEdit()) {
+      return true;
+    }
+    // empty edit does not need to be replicated
+    if (edit.isEmpty()) {
+      return false;
+    }
+
+    for (byte[] fam : replicatedFamilies) {
+      if (edit.getFamilies().contains(fam)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  // Register a new primary region to the endpoint when a region is opened, it 
will setup its
+  // shipper to replicate its wal edits to replica regions.
+  private void registerRegion(final String encodedName, final byte[] 
regionName)
+    throws IOException {
+    // Make sure it does not exist.
+    LOG.info("Register region {}", encodedName);
+
+    if (primaryRegionsUnderReplication.get(encodedName) != null) {
+      LOG.warn("Try to register an existing regoin {}", encodedName);
+      return;
+    }
+
+    RegionInfo regionInfo = 
CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
+
+    // TODO: meta replicas is not at the table descriptor at this moment.
+    int numReplicas =
+      conf.getInt(HConstants.META_REPLICAS_NUM, 
HConstants.DEFAULT_META_REPLICA_NUM);
+
+    if (numReplicas <= 1) {
+      LOG.warn("Trying to register region {} without replica enabled", 
encodedName);
+      return;
+    }
+
+    CatalogReplicaShipper[] replicaShippers = new 
CatalogReplicaShipper[numReplicas - 1];
+
+    for (int i = 1; i < numReplicas; i++) {
+      // It leaves each ReplicaShipper to resolve its replica region location.
+      replicaShippers[i - 1] = new CatalogReplicaShipper(this, 
this.connection, null,
+        RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(i).build());
+
+      replicaShippers[i - 1].start();
+    }
+
+    primaryRegionsUnderReplication.put(encodedName, replicaShippers);
+  }
+
+  // Deregister a primary region from the endpoint when a region is closed, it 
will clean up all
+  // resources allocated for this region.
+  private void unregisterRegion(final String encodedName) {
+    LOG.info("Unregister region {}", encodedName);
+    // Make sure it does exist.
+    if (primaryRegionsUnderReplication.get(encodedName) == null) {

Review comment:
       Why not just call remove and check the return value?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java
##########
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
+ * This EndPoint is specially for system tables. It does a few things:
+ *
+ * 1. For each replica region, there is a shipper to ship edits for this 
region. Internally,
+ * the shipper maintains a in-memory queue, where edits from wal log are 
queued in this
+ * in-memory queue.
+ * 2. Each shipper will read edits from its queue and ship edits to the 
specific replica region.
+ * The pace of shippers are independent from each other, so they are not 
synced.
+ * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be 
discarded as
+ * those edits are already in flushed hfiles. This also prevents the queue 
grow out-of-bound
+ * when the replica region server is slow processing edits.
+ * 4. register/deregister region is based on region events from wal log.
+ * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as 
it does not need
+ * zookeeper support.
+ *
+ * TODO: the threading model for shippers needs to be improved. Right now, 
each shipper maps to
+ * one thread, which is not efficient. It needs to be improved so two threads 
are needed.
+ *  One sending thread which loops through shipper's queue and send out the 
walEdits.
+ *  One receiving thread which will wait for futures, once it is ready, it 
processes that
+ *  future accordingly.
+ */
[email protected] public class CatalogReplicaReplicationEndpoint
+  extends BaseReplicationEndpoint {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private AsyncClusterConnection connection;
+
+  private int numRetries;
+  private long operationTimeoutNs;
+  private long shippedEdits;
+
+  // Track primary regions under replication.
+  ConcurrentHashMap<String, CatalogReplicaShipper[]> 
primaryRegionsUnderReplication;
+
+  private byte[][] replicatedFamilies;
+
+  public CatalogReplicaReplicationEndpoint() {
+    // TODO: right now, it only allow family info to be replicated.
+    // This need to be changed in the future.
+    replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY };
+    primaryRegionsUnderReplication = new ConcurrentHashMap<>();
+  }
+
+  public long getShippedEdits() {
+    return shippedEdits;
+  }
+
+  @Override public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = context.getConfiguration();
+    // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
+    // We are resetting it here because we want default number of retries (35) 
rather than 10 times
+    // that which makes very long retries for disabled tables etc.
+    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    if (defaultNumRetries > 10) {
+      int mult = 
conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
+        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
+      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
+    }
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
+      .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
+  }
+
+  /**
+   * returns true if the specified entry must be replicated. We should always 
replicate meta
+   * operations (e.g. flush) and use the configured family to filter out edits 
which do not
+   * need to be replicated.
+   */
+  private boolean requiresReplication(final WALEdit edit) {
+
+    // TODO: let meta edits go through. Need to filter out flush events which 
are not for
+    // targetting families.
+    if (edit.isMetaEdit()) {
+      return true;
+    }
+    // empty edit does not need to be replicated
+    if (edit.isEmpty()) {
+      return false;
+    }
+
+    for (byte[] fam : replicatedFamilies) {
+      if (edit.getFamilies().contains(fam)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  // Register a new primary region to the endpoint when a region is opened, it 
will setup its
+  // shipper to replicate its wal edits to replica regions.
+  private void registerRegion(final String encodedName, final byte[] 
regionName)
+    throws IOException {
+    // Make sure it does not exist.
+    LOG.info("Register region {}", encodedName);
+
+    if (primaryRegionsUnderReplication.get(encodedName) != null) {
+      LOG.warn("Try to register an existing regoin {}", encodedName);
+      return;
+    }
+
+    RegionInfo regionInfo = 
CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
+
+    // TODO: meta replicas is not at the table descriptor at this moment.
+    int numReplicas =
+      conf.getInt(HConstants.META_REPLICAS_NUM, 
HConstants.DEFAULT_META_REPLICA_NUM);
+
+    if (numReplicas <= 1) {
+      LOG.warn("Trying to register region {} without replica enabled", 
encodedName);
+      return;
+    }
+
+    CatalogReplicaShipper[] replicaShippers = new 
CatalogReplicaShipper[numReplicas - 1];
+
+    for (int i = 1; i < numReplicas; i++) {
+      // It leaves each ReplicaShipper to resolve its replica region location.
+      replicaShippers[i - 1] = new CatalogReplicaShipper(this, 
this.connection, null,
+        RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(i).build());
+
+      replicaShippers[i - 1].start();
+    }
+
+    primaryRegionsUnderReplication.put(encodedName, replicaShippers);
+  }
+
+  // Deregister a primary region from the endpoint when a region is closed, it 
will clean up all
+  // resources allocated for this region.
+  private void unregisterRegion(final String encodedName) {
+    LOG.info("Unregister region {}", encodedName);
+    // Make sure it does exist.
+    if (primaryRegionsUnderReplication.get(encodedName) == null) {
+      LOG.warn("Try to unregister a unregistered region {}", encodedName);
+      return;
+    }
+
+    CatalogReplicaShipper[] replicaShippers = 
primaryRegionsUnderReplication.remove(encodedName);
+
+    for (CatalogReplicaShipper s : replicaShippers) {
+      s.setRunning(false);
+    }
+  }
+
+  @Override public boolean replicate(ReplicateContext replicateContext) {

Review comment:
       Second this. Let's keep the same style.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java
##########
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
+ * This EndPoint is specially for system tables. It does a few things:
+ * 1. For each replica region, there is a shipper to ship edits for this 
region. Internally,
+ * the shipper maintains a in-memory queue, where edits from wal log are 
queued in this
+ * in-memory queue.
+ * 2. Each shipper will read edits from its queue and ship edits to the 
specific replica region.
+ * The pace of shippers are independent from each other, so they are not 
synced.
+ * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be 
discarded as
+ * those edits are already in flushed hfiles. This also prevents the queue 
grow out-of-bound
+ * when the replica region server is slow processing edits.
+ * 4. register/deregister region is based on region events from wal log.
+ * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as 
it does not need
+ * zookeeper support.
+ * TODO: the threading model for shippers needs to be improved. Right now, 
each shipper maps to
+ * one thread, which is not efficient. It needs to be improved so two threads 
are needed.
+ * One sending thread which loops through shipper's queue and send out the 
walEdits.
+ * One receiving thread which will wait for futures, once it is ready, it 
processes that
+ * future accordingly.
+ */
[email protected] public class CatalogReplicaReplicationEndpoint
+  extends BaseReplicationEndpoint {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private AsyncClusterConnection connection;
+
+  private int numRetries;
+  private long operationTimeoutNs;
+  private long shippedEdits;
+
+  // Track primary regions under replication.
+  ConcurrentHashMap<String, CatalogReplicaShipper[]> 
primaryRegionsUnderReplication;
+
+  private byte[][] replicatedFamilies;
+
+  public CatalogReplicaReplicationEndpoint() {
+    // TODO: right now, it only allow family info to be replicated.
+    // This need to be changed in the future.
+    replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY };
+    primaryRegionsUnderReplication = new ConcurrentHashMap<>();
+  }
+
+  public long getShippedEdits() {
+    return shippedEdits;
+  }
+
+  @Override public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = context.getConfiguration();
+    // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
+    // We are resetting it here because we want default number of retries (35) 
rather than 10 times
+    // that which makes very long retries for disabled tables etc.
+    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    if (defaultNumRetries > 10) {
+      int mult = 
conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
+        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
+      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
+    }
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
+      .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
+  }
+
+  /**
+   * returns true if the specified entry must be replicated. We should always 
replicate meta
+   * operations (e.g. flush) and use the configured family to filter out edits 
which do not
+   * need to be replicated.
+   */
+  private boolean requiresReplication(final WALEdit edit) {
+
+    // TODO: let meta edits go through. Need to filter out flush events which 
are not for
+    // targetting families.
+    if (edit.isMetaEdit()) {
+      return true;
+    }
+    // empty edit does not need to be replicated
+    if (edit.isEmpty()) {
+      return false;
+    }
+
+    for (byte[] fam : replicatedFamilies) {
+      if (edit.getFamilies().contains(fam)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  // Register a new primary region to the endpoint when a region is opened, it 
will setup its
+  // shipper to replicate its wal edits to replica regions.
+  private void registerRegion(final String encodedName, final byte[] 
regionName)
+    throws IOException {
+    // Make sure it does not exist.
+    LOG.info("Register region {}", encodedName);
+
+    if (primaryRegionsUnderReplication.get(encodedName) != null) {
+      LOG.warn("Try to register an existing regoin {}", encodedName);
+      return;
+    }
+
+    RegionInfo regionInfo = 
CatalogFamilyFormat.parseRegionInfoFromRegionName(regionName);
+
+    // TODO: meta replicas is not at the table descriptor at this moment.
+    int numReplicas =
+      conf.getInt(HConstants.META_REPLICAS_NUM, 
HConstants.DEFAULT_META_REPLICA_NUM);
+
+    if (numReplicas <= 1) {
+      LOG.warn("Trying to register region {} without replica enabled", 
encodedName);
+      return;
+    }
+
+    CatalogReplicaShipper[] replicaShippers = new 
CatalogReplicaShipper[numReplicas - 1];
+
+    for (int i = 1; i < numReplicas; i++) {
+      // It leaves each ReplicaShipper to resolve its replica region location.
+      replicaShippers[i - 1] = new CatalogReplicaShipper(this, 
this.connection, null,
+        RegionInfoBuilder.newBuilder(regionInfo).setReplicaId(i).build());
+
+      replicaShippers[i - 1].start();
+    }
+
+    primaryRegionsUnderReplication.put(encodedName, replicaShippers);
+  }
+
+  // Deregister a primary region from the endpoint when a region is closed, it 
will clean up all
+  // resources allocated for this region.
+  private void unregisterRegion(final String encodedName) {
+    LOG.info("Unregister region {}", encodedName);
+    // Make sure it does exist.
+    if (primaryRegionsUnderReplication.get(encodedName) == null) {
+      LOG.warn("Try to unregister a unregistered region {}", encodedName);
+      return;
+    }
+
+    CatalogReplicaShipper[] replicaShippers = 
primaryRegionsUnderReplication.remove(encodedName);
+
+    for (CatalogReplicaShipper s : replicaShippers) {
+      s.stopShipper();
+    }
+  }
+
+  @Override public boolean replicate(ReplicateContext replicateContext) {
+    Map<String, List<List<Entry>>> encodedRegionName2Entries = new TreeMap<>();
+    long skippedEdits = 0;
+
+    // First, separate edits based on primary region.
+    for (Entry entry : replicateContext.getEntries()) {
+      WALEdit edit = entry.getEdit();
+      if (!requiresReplication(edit)) {
+        skippedEdits++;
+        continue;
+      }
+      shippedEdits++;
+
+      try {
+        // If an edit is a meta regionevent edit, we need to check a few 
things.
+        //    1. If it is a region_open event, it needs to register this 
region.
+        //    2. If it is a region_close event, it needs to deregister this 
region.
+        if (edit.isMetaEdit()) {
+          WALProtos.RegionEventDescriptor regionEvent =
+            WALEdit.getRegionEventDescriptor(edit.getCells().get(0));
+
+          if (regionEvent != null) {
+            if (regionEvent.getEventType()
+              == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE) {
+              
unregisterRegion(Bytes.toString(entry.getKey().getEncodedRegionName()));
+            } else if (regionEvent.getEventType()
+              == WALProtos.RegionEventDescriptor.EventType.REGION_OPEN) {
+              try {
+                
registerRegion(Bytes.toString(regionEvent.getEncodedRegionName().toByteArray()),
+                  regionEvent.getRegionName().toByteArray());
+              } catch (IOException e) {
+                LOG.warn("REGION_OPEN, invalid region name {}", 
regionEvent.getRegionName());
+              }
+            }
+          }
+        }
+        // add entry to the lists, please note one thing, if it is flush 
event, it needs to be
+        // in its own list, it makes the cleanup of shipper's queue much 
easier.
+        String encodeName = 
Bytes.toString(entry.getKey().getEncodedRegionName());
+        List<List<Entry>> ll = encodedRegionName2Entries.get(encodeName);

Review comment:
       Could we have better variable names here?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java
##########
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CatalogFamilyFormat;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint 
which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
+ * This EndPoint is specially for system tables. It does a few things:
+ * 1. For each replica region, there is a shipper to ship edits for this 
region. Internally,
+ * the shipper maintains a in-memory queue, where edits from wal log are 
queued in this
+ * in-memory queue.
+ * 2. Each shipper will read edits from its queue and ship edits to the 
specific replica region.
+ * The pace of shippers are independent from each other, so they are not 
synced.
+ * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be 
discarded as
+ * those edits are already in flushed hfiles. This also prevents the queue 
grow out-of-bound
+ * when the replica region server is slow processing edits.
+ * 4. register/deregister region is based on region events from wal log.
+ * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as 
it does not need
+ * zookeeper support.
+ * TODO: the threading model for shippers needs to be improved. Right now, 
each shipper maps to
+ * one thread, which is not efficient. It needs to be improved so two threads 
are needed.
+ * One sending thread which loops through shipper's queue and send out the 
walEdits.
+ * One receiving thread which will wait for futures, once it is ready, it 
processes that
+ * future accordingly.
+ */
[email protected] public class CatalogReplicaReplicationEndpoint
+  extends BaseReplicationEndpoint {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class);
+
+  private Configuration conf;
+  private AsyncClusterConnection connection;
+
+  private int numRetries;
+  private long operationTimeoutNs;
+  private long shippedEdits;
+
+  // Track primary regions under replication.
+  ConcurrentHashMap<String, CatalogReplicaShipper[]> 
primaryRegionsUnderReplication;
+
+  private byte[][] replicatedFamilies;
+
+  public CatalogReplicaReplicationEndpoint() {
+    // TODO: right now, it only allow family info to be replicated.
+    // This need to be changed in the future.
+    replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY };
+    primaryRegionsUnderReplication = new ConcurrentHashMap<>();
+  }
+
+  public long getShippedEdits() {
+    return shippedEdits;
+  }
+
+  @Override public void init(Context context) throws IOException {
+    super.init(context);
+    this.conf = context.getConfiguration();
+    // HRS multiplies client retries by 10 globally for meta operations, but 
we do not want this.
+    // We are resetting it here because we want default number of retries (35) 
rather than 10 times
+    // that which makes very long retries for disabled tables etc.
+    int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+      HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    if (defaultNumRetries > 10) {
+      int mult = 
conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
+        HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
+      defaultNumRetries = defaultNumRetries / mult; // reset if HRS has 
multiplied this already
+    }
+    this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
+    // use the regular RPC timeout for replica replication RPC's
+    this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf
+      .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+    this.connection = context.getServer().getAsyncClusterConnection();
+  }
+
+  /**
+   * returns true if the specified entry must be replicated. We should always 
replicate meta
+   * operations (e.g. flush) and use the configured family to filter out edits 
which do not
+   * need to be replicated.
+   */
+  private boolean requiresReplication(final WALEdit edit) {

Review comment:
       Could this be done in a WALEntryFilter? IIRC you've done a prior work to 
be able to config different WALEntryFilters for replication endpoint or 
replication source?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to