sunhelly commented on a change in pull request #3800: URL: https://github.com/apache/hbase/pull/3800#discussion_r764543345
########## File path: hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/monitor/ExcludeDatanodeManager.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.io.asyncfs.monitor; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.cache.Cache; +import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; + +/** + * The class to manage the excluded datanodes of the WALs on the regionserver. + */ [email protected] +public class ExcludeDatanodeManager implements ConfigurationObserver { + private static final Logger LOG = LoggerFactory.getLogger(ExcludeDatanodeManager.class); + + /** + * Configure for the max count the excluded datanodes. + */ + private static final String WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY = + "hbase.regionserver.async.wal.max.exclude.datanode.count"; + private static final int DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT = 3; + + /** + * Configure for the TTL time of the datanodes excluded + */ + private static final String WAL_EXCLUDE_DATANODE_TTL_KEY = + "hbase.regionserver.async.wal.exclude.datanode.info.ttl.hour"; + private static final int DEFAULT_WAL_EXCLUDE_DATANODE_TTL = 6; // 6 hours + + private volatile Cache<DatanodeInfo, Long> excludeDNsCache; + private final int maxExcludeDNCount; + private final Configuration conf; + // This is a map of providerId->StreamSlowMonitor + private final Map<String, StreamSlowMonitor> streamSlowMonitors = + new ConcurrentHashMap<>(1); + + public ExcludeDatanodeManager(Configuration conf) { + this.conf = conf; + this.maxExcludeDNCount = conf.getInt(WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT_KEY, + DEFAULT_WAL_MAX_EXCLUDE_SLOW_DATANODE_COUNT); + this.excludeDNsCache = CacheBuilder.newBuilder() + .expireAfterWrite(this.conf.getLong(WAL_EXCLUDE_DATANODE_TTL_KEY, + DEFAULT_WAL_EXCLUDE_DATANODE_TTL), + TimeUnit.HOURS) + .maximumSize(this.maxExcludeDNCount) + .concurrencyLevel(10) + .build(); + } + + /** + * Try to add a datanode to the regionserver excluding cache + * @param datanodeInfo the datanode to be added to the excluded cache + * @param cause the cause that the datanode is hope to be excluded + * @return True if the datanode is added to the regionserver excluding cache, false otherwise + */ + public boolean tryAddExcludeDN(DatanodeInfo datanodeInfo, String cause) { + boolean alreadyMarkedSlow = getExcludeDNs().containsKey(datanodeInfo); + if (excludeDNsCache.size() < maxExcludeDNCount) { + if (!alreadyMarkedSlow) { + excludeDNsCache.put(datanodeInfo, System.currentTimeMillis()); Review comment: Hi, @Apache9 , do you mean that in the put method of guava cache, concurrently put operation of segments may cause the cache size be larger than the configured maximumSize? And in each segment, there is lock for put operation, so that the table value set and the eviction method are all in the lock. But for cache here has maximumSize=3(no matter what the concurrencyLevel is), the segment count is always 1, so I think that concurrently put will be serially executed internally through the lock. This is the source code of LocalCache set segment count, `int segmentShift = 0; int segmentCount = 1; while (segmentCount < concurrencyLevel && (!evictsBySize() || segmentCount * 20 <= maxWeight)) { ++segmentShift; segmentCount <<= 1; }` Here the evictsBySize() is always true, because maxWeight=maximumSize=3(default config)>=0, and segmentCount * 20 <= maxWeight is false in most circumstances, because we will not let the exclude cache contains more than 20 datanodes? If some one want to set the maximumSize be larger than 20 and we do not want to let the size over maximumSize happen, we can set concurrencyLevel=1 (which is default 4, that means in the worst case the size of cache is maximumSize+3). As mentioned before, in the put method of guava cache, after set the table entry in the segment, it will evict entries by LRU util the segment weight less than the maxSegmentWeight(sum of segment max weights = overall max weights). I think we can keep using the default concurrencyLevel here, WDYT @Apache9 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
