imay closed pull request #408: Clear client pool when heartbeat failed
URL: https://github.com/apache/incubator-doris/pull/408
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/fe/src/main/java/org/apache/doris/common/GenericPool.java
b/fe/src/main/java/org/apache/doris/common/GenericPool.java
index 549a6dab..a38cc4fd 100644
--- a/fe/src/main/java/org/apache/doris/common/GenericPool.java
+++ b/fe/src/main/java/org/apache/doris/common/GenericPool.java
@@ -17,8 +17,6 @@
package org.apache.doris.common;
-import java.lang.reflect.Constructor;
-
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
@@ -26,13 +24,15 @@
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
+
+import java.lang.reflect.Constructor;
public class GenericPool<VALUE extends org.apache.thrift.TServiceClient> {
private static final Logger LOG = LogManager.getLogger(GenericPool.class);
@@ -73,6 +73,10 @@ public boolean reopen(VALUE object) {
return ok;
}
+ public void clearPool(TNetworkAddress addr) {
+ pool.clear(addr);
+ }
+
public boolean peak(VALUE object) {
return object.getOutputProtocol().getTransport().peek();
}
diff --git a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index 7c8d4226..ee3e84bc 100644
--- a/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -26,7 +26,6 @@
import org.apache.doris.common.util.Util;
import org.apache.doris.http.rest.BootstrapFinishAction;
import org.apache.doris.persist.HbPackage;
-import org.apache.doris.system.BackendEvent.BackendEventType;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TBackendInfo;
@@ -165,9 +164,9 @@ private boolean handleHbResponse(HeartbeatResponse
response, boolean isReplay) {
Backend be = nodeMgr.getBackend(hbResponse.getBeId());
if (be != null) {
boolean isChanged = be.handleHbResponse(hbResponse);
- if (hbResponse.getStatus() != HbStatus.OK && !isReplay) {
- nodeMgr.getEventBus().post(new
BackendEvent(BackendEventType.BACKEND_DOWN,
- "missing heartbeat",
Long.valueOf(hbResponse.getBeId())));
+ if (hbResponse.getStatus() != HbStatus.OK) {
+ // invalid all connections cached in ClientPool
+ ClientPool.backendPool.clearPool(new
TNetworkAddress(be.getHost(), be.getBePort()));
}
return isChanged;
}
@@ -178,7 +177,12 @@ private boolean handleHbResponse(HeartbeatResponse
response, boolean isReplay) {
FsBroker broker =
Catalog.getCurrentCatalog().getBrokerMgr().getBroker(
hbResponse.getName(), hbResponse.getHost(),
hbResponse.getPort());
if (broker != null) {
- return broker.handleHbResponse(hbResponse);
+ boolean isChanged = broker.handleHbResponse(hbResponse);
+ if (hbResponse.getStatus() != HbStatus.OK) {
+ // invalid all connections cached in ClientPool
+ ClientPool.brokerPool.clearPool(new
TNetworkAddress(broker.ip, broker.port));
+ }
+ return isChanged;
}
break;
}
diff --git a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
index f8e9e36d..1fb198e1 100644
--- a/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -26,7 +26,6 @@
import org.apache.doris.common.Pair;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend.BackendState;
-import org.apache.doris.system.BackendEvent.BackendEventType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -35,7 +34,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.eventbus.EventBus;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.logging.log4j.LogManager;
@@ -65,8 +63,6 @@
private volatile AtomicReference<ImmutableMap<Long, Backend>>
idToBackendRef;
private volatile AtomicReference<ImmutableMap<Long, AtomicLong>>
idToReportVersionRef;
- private final EventBus eventBus;
-
// last backend id used by round robin for sequential choosing backends for
// tablet creation
private ConcurrentHashMap<String, Long> lastBackendIdForCreationMap;
@@ -94,16 +90,10 @@ public SystemInfoService() {
idToReportVersionRef = new AtomicReference<ImmutableMap<Long,
AtomicLong>>(
ImmutableMap.<Long, AtomicLong> of());
- eventBus = new EventBus("backendEvent");
-
lastBackendIdForCreationMap = new ConcurrentHashMap<String, Long>();
lastBackendIdForOtherMap = new ConcurrentHashMap<String, Long>();
}
- public EventBus getEventBus() {
- return this.eventBus;
- }
-
// for deploy manager
public void addBackends(List<Pair<String, Integer>> hostPortPairs, boolean
isFree) throws DdlException {
addBackends(hostPortPairs, isFree, "");
@@ -217,10 +207,6 @@ public void dropBackend(String host, int heartbeatPort)
throws DdlException {
Backend droppedBackend = getBackendWithHeartbeatPort(host,
heartbeatPort);
- // publish
- eventBus.post(new BackendEvent(BackendEventType.BACKEND_DROPPED,
"backend has been dropped",
- Long.valueOf(droppedBackend.getId())));
-
// update idToBackend
Map<Long, Backend> copiedBackends =
Maps.newHashMap(idToBackendRef.get());
copiedBackends.remove(droppedBackend.getId());
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]