huaxiangsun commented on a change in pull request #2522: URL: https://github.com/apache/hbase/pull/2522#discussion_r502695123
########## File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicaReplicationEndpoint.java ########## @@ -0,0 +1,493 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CatalogFamilyFormat; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.AsyncTableRegionLocator; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.WALEntryFilter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL + * edits from the WAL, and sends the edits to replicas of regions. + * This EndPoint is specially for system tables. It does a few things: + * + * 1. For each replica region, there is a shipper to ship edits for this region. Internally, + * the shipper maintains a in-memory queue, where edits from wal log are queued in this + * in-memory queue. + * 2. Each shipper will read edits from its queue and ship edits to the specific replica region. + * The pace of shippers are independent from each other, so they are not synced. + * 3. In case of flush event COMMIT_FLUSH, edits in the slow shipper will be discarded as + * those edits are already in flushed hfiles. This also prevents the queue grow out-of-bound + * when the replica region server is slow processing edits. + * 4. register/deregister region is based on region events from wal log. + * 5. CatalogReplicaReplicationEndpoint is based on BaseReplicationEndpoint as it does not need + * zookeeper support. + * + * TODO: the threading model for shippers needs to be improved. Right now, each shipper maps to + * one thread, which is not efficient. It needs to be improved so two threads are needed. + * One sending thread which loops through shipper's queue and send out the walEdits. + * One receiving thread which will wait for futures, once it is ready, it processes that + * future accordingly. + */ [email protected] public class CatalogReplicaReplicationEndpoint + extends BaseReplicationEndpoint { + + private static final Logger LOG = + LoggerFactory.getLogger(CatalogReplicaReplicationEndpoint.class); + + private Configuration conf; + private AsyncClusterConnection connection; + + private int numRetries; + private long operationTimeoutNs; + private long shippedEdits; + + // Track primary regions under replication. + ConcurrentHashMap<String, CatalogReplicaShipper[]> primaryRegionsUnderReplication; + + private byte[][] replicatedFamilies; + + public CatalogReplicaReplicationEndpoint() { + // TODO: right now, it only allow family info to be replicated. + // This need to be changed in the future. + replicatedFamilies = new byte[][] { HConstants.CATALOG_FAMILY }; + primaryRegionsUnderReplication = new ConcurrentHashMap<>(); + } + + public long getShippedEdits() { + return shippedEdits; + } + + @Override public void init(Context context) throws IOException { + super.init(context); + this.conf = context.getConfiguration(); + // HRS multiplies client retries by 10 globally for meta operations, but we do not want this. + // We are resetting it here because we want default number of retries (35) rather than 10 times + // that which makes very long retries for disabled tables etc. + int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + if (defaultNumRetries > 10) { + int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, + HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER); + defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already + } + this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries); + // use the regular RPC timeout for replica replication RPC's + this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf + .getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); + this.connection = context.getServer().getAsyncClusterConnection(); + } + + /** + * returns true if the specified entry must be replicated. We should always replicate meta + * operations (e.g. flush) and use the configured family to filter out edits which do not + * need to be replicated. + */ + private boolean requiresReplication(final WALEdit edit) { + + // TODO: let meta edits go through. Need to filter out flush events which are not for + // targetting families. + if (edit.isMetaEdit()) { + return true; + } + // empty edit does not need to be replicated + if (edit.isEmpty()) { + return false; + } + + for (byte[] fam : replicatedFamilies) { + if (edit.getFamilies().contains(fam)) { + return true; + } + } + + return false; + } + + // Register a new primary region to the endpoint when a region is opened, it will setup its + // shipper to replicate its wal edits to replica regions. + private void registerRegion(final String encodedName, final byte[] regionName) + throws IOException { + // Make sure it does not exist. + LOG.info("Register region {}", encodedName); + + if (primaryRegionsUnderReplication.get(encodedName) != null) { + LOG.warn("Try to register an existing regoin {}", encodedName); Review comment: Will update in the next update. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
