This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 04d132fee1 Check reference after register (#12472)
04d132fee1 is described below
commit 04d132fee1cf23f6c0eada7131c9b5ab18b8ded7
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jun 8 10:13:55 2023 +0800
Check reference after register (#12472)
* Check reference after register
* Add logs
* Fix method
* Fix style
* Fix remove
* Fix remove
---
.../apache/dubbo/common/config/ReferenceCache.java | 12 ++-
.../apache/dubbo/config/ReferenceConfigBase.java | 10 +-
.../org/apache/dubbo/config/ReferenceConfig.java | 107 +++++++++++++++------
.../dubbo/config/deploy/DefaultModuleDeployer.java | 20 +++-
.../config/utils/CompositeReferenceCache.java | 18 +++-
.../dubbo/config/utils/SimpleReferenceCache.java | 27 +++++-
.../dubbo/config/utils/MockReferenceConfig.java | 4 +-
.../dubbo/config/utils/XxxMockReferenceConfig.java | 4 +-
8 files changed, 160 insertions(+), 42 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ReferenceCache.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ReferenceCache.java
index a7f23d9f22..c396fe0270 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ReferenceCache.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ReferenceCache.java
@@ -22,7 +22,12 @@ import java.util.List;
public interface ReferenceCache {
@SuppressWarnings("unchecked")
- <T> T get(ReferenceConfigBase<T> referenceConfig);
+ default <T> T get(ReferenceConfigBase<T> referenceConfig) {
+ return get(referenceConfig, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> T get(ReferenceConfigBase<T> referenceConfig, boolean check);
@SuppressWarnings("unchecked")
<T> T get(String key, Class<T> type);
@@ -36,6 +41,11 @@ public interface ReferenceCache {
@SuppressWarnings("unchecked")
<T> T get(Class<T> type);
+ @SuppressWarnings("unchecked")
+ <T> void check(ReferenceConfigBase<T> referenceConfig, long timeout);
+
+ void check(String key, Class<?> type, long timeout);
+
void destroy(String key, Class<?> type);
void destroy(Class<?> type);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
index 867e195561..bfeacacba5 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/config/ReferenceConfigBase.java
@@ -368,7 +368,15 @@ public abstract class ReferenceConfigBase<T> extends
AbstractReferenceConfig {
}
@Transient
- public abstract T get();
+ public abstract T get(boolean check);
+
+ @Transient
+ public abstract void checkOrDestroy(long timeout);
+
+ @Transient
+ public final T get() {
+ return get(true);
+ }
public void destroy() {
getModuleConfigManager().removeConfig(this);
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index fd2868864b..0794718538 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.config;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
@@ -220,7 +221,7 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
@Override
@Transient
- public T get() {
+ public T get(boolean check) {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig("
+ url + ") has already destroyed!");
}
@@ -229,12 +230,53 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
// ensure start module, compatible with old api usage
getScopeModel().getDeployer().start();
- init();
+ init(check);
}
return ref;
}
+ @Override
+ public void checkOrDestroy(long timeout) {
+ if (!initialized || ref == null) {
+ return;
+ }
+ try {
+ checkInvokerAvailable(timeout);
+ } catch (Throwable t) {
+ logAndCleanup(t);
+ throw t;
+ }
+ }
+
+ private void logAndCleanup(Throwable t) {
+ try {
+ if (invoker != null) {
+ invoker.destroy();
+ }
+ } catch (Throwable destroy) {
+ logger.warn(CONFIG_FAILED_DESTROY_INVOKER, "", "", "Unexpected
error occurred when destroy invoker of ReferenceConfig(" + url + ").", t);
+ }
+ if (consumerModel != null) {
+ ModuleServiceRepository repository =
getScopeModel().getServiceRepository();
+ repository.unregisterConsumer(consumerModel);
+ }
+ initialized = false;
+ invoker = null;
+ ref = null;
+ consumerModel = null;
+ serviceMetadata.setTarget(null);
+ serviceMetadata.getAttributeMap().remove(PROXY_CLASS_REF);
+
+ // Thrown by checkInvokerAvailable().
+ if (t.getClass() == IllegalStateException.class &&
+ t.getMessage().contains("No provider available for the service")) {
+
+ // 2-2 - No provider available.
+ logger.error(CLUSTER_NO_VALID_PROVIDER, "server crashed", "", "No
provider available.", t);
+ }
+ }
+
@Override
public synchronized void destroy() {
super.destroy();
@@ -258,6 +300,10 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
}
protected synchronized void init() {
+ init(true);
+ }
+
+ protected synchronized void init(boolean check) {
if (initialized && ref != null) {
return;
}
@@ -308,33 +354,11 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
consumerModel.setProxyObject(ref);
consumerModel.initMethodModels();
- checkInvokerAvailable();
- } catch (Throwable t) {
- try {
- if (invoker != null) {
- invoker.destroy();
- }
- } catch (Throwable destroy) {
- logger.warn(CONFIG_FAILED_DESTROY_INVOKER, "", "", "Unexpected
error occurred when destroy invoker of ReferenceConfig(" + url + ").", t);
- }
- if (consumerModel != null) {
- ModuleServiceRepository repository =
getScopeModel().getServiceRepository();
- repository.unregisterConsumer(consumerModel);
- }
- initialized = false;
- invoker = null;
- ref = null;
- consumerModel = null;
- serviceMetadata.setTarget(null);
- serviceMetadata.getAttributeMap().remove(PROXY_CLASS_REF);
-
- // Thrown by checkInvokerAvailable().
- if (t.getClass() == IllegalStateException.class &&
- t.getMessage().contains("No provider available for the
service")) {
-
- // 2-2 - No provider available.
- logger.error(CLUSTER_NO_VALID_PROVIDER, "server crashed", "",
"No provider available.", t);
+ if (check) {
+ checkInvokerAvailable(0);
}
+ } catch (Throwable t) {
+ logAndCleanup(t);
throw t;
}
@@ -631,8 +655,31 @@ public class ReferenceConfig<T> extends
ReferenceConfigBase<T> {
}
}
- private void checkInvokerAvailable() throws IllegalStateException {
- if (shouldCheck() && !invoker.isAvailable()) {
+ private void checkInvokerAvailable(long timeout) throws
IllegalStateException {
+ if (!shouldCheck()) {
+ return;
+ }
+ boolean available = invoker.isAvailable();
+ if (available) {
+ return;
+ }
+
+ long startTime = System.currentTimeMillis();
+ long checkDeadline = startTime + timeout;
+ do {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ available = invoker.isAvailable();
+ } while (!available && checkDeadline > System.currentTimeMillis());
+ logger.warn(LoggerCodeConstants.REGISTRY_EMPTY_ADDRESS, "", "",
+ "Check reference of [" + getUniqueServiceName() + "] failed very
beginning. " +
+ "After " + (System.currentTimeMillis() - startTime) + "ms
reties, finally " +
+ (available ? "succeed" : "failed") + ".");
+ if (!available) {
// 2-2 - No provider available.
IllegalStateException illegalStateException = new
IllegalStateException("Failed to check the status of the service "
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
index 3bfaf4ea02..7c4eaf6baa 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
@@ -32,6 +32,7 @@ import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.ModuleConfig;
import org.apache.dubbo.config.ProviderConfig;
import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.config.ReferenceConfigBase;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.ServiceConfigBase;
import org.apache.dubbo.config.context.ModuleConfigManager;
@@ -180,6 +181,9 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
// register services to registry
registerServices();
+ // check reference config
+ checkReferences();
+
// complete module start future after application state changed
completeStartFuture(true);
} else {
@@ -195,6 +199,9 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
// register services to registry
registerServices();
+
+ // check reference config
+ checkReferences();
} catch (Throwable e) {
logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "",
"wait for export/refer services occurred an exception", e);
onModuleFailed(getIdentifier() + " start failed: " +
e, e);
@@ -387,6 +394,12 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
applicationDeployer.refreshServiceInstance();
}
+ private void checkReferences() {
+ for (ReferenceConfigBase<?> rc : configManager.getReferences()) {
+ referenceCache.check(rc, 3000);
+ }
+ }
+
private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
if (!serviceConfig.isRefreshed()) {
@@ -460,7 +473,7 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
ExecutorService executor =
executorRepository.getServiceReferExecutor();
CompletableFuture<Void> future =
CompletableFuture.runAsync(() -> {
try {
- referenceCache.get(rc);
+ referenceCache.get(rc, false);
} catch (Throwable t) {
logger.error(CONFIG_FAILED_EXPORT_SERVICE, "",
"", "Failed to async export service config: " + getIdentifier() + " , catch
error : " + t.getMessage(), t);
}
@@ -468,7 +481,7 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
asyncReferringFutures.add(future);
} else {
- referenceCache.get(rc);
+ referenceCache.get(rc, false);
}
}
} catch (Throwable t) {
@@ -488,6 +501,9 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
});
asyncReferringFutures.clear();
referenceCache.destroyAll();
+ for (ReferenceConfigBase<?> rc : configManager.getReferences()) {
+ rc.destroy();
+ }
} catch (Exception ignored) {
}
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/CompositeReferenceCache.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/CompositeReferenceCache.java
index fef648e5d0..5ae8f010a6 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/CompositeReferenceCache.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/CompositeReferenceCache.java
@@ -43,7 +43,7 @@ public class CompositeReferenceCache implements
ReferenceCache {
}
@Override
- public <T> T get(ReferenceConfigBase<T> referenceConfig) {
+ public <T> T get(ReferenceConfigBase<T> referenceConfig, boolean check) {
Class<?> type = referenceConfig.getInterfaceClass();
String key = BaseServiceMetadata.buildServiceKey(type.getName(),
referenceConfig.getGroup(), referenceConfig.getVersion());
@@ -57,7 +57,7 @@ public class CompositeReferenceCache implements
ReferenceCache {
"Call ReferenceConfig#get() directly for non-singleton
ReferenceConfig instead of using ReferenceCache#get(ReferenceConfig)");
}
if (proxy == null) {
- proxy = referenceConfig.get();
+ proxy = referenceConfig.get(check);
}
return proxy;
}
@@ -111,6 +111,20 @@ public class CompositeReferenceCache implements
ReferenceCache {
}
}
+ @Override
+ public void check(String key, Class<?> type, long timeout) {
+ for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
+ moduleModel.getDeployer().getReferenceCache().check(key, type,
timeout);
+ }
+ }
+
+ @Override
+ public <T> void check(ReferenceConfigBase<T> referenceConfig, long
timeout) {
+ for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
+
moduleModel.getDeployer().getReferenceCache().check(referenceConfig, timeout);
+ }
+ }
+
@Override
public void destroy(Class<?> type) {
for (ModuleModel moduleModel : applicationModel.getModuleModels()) {
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/SimpleReferenceCache.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/SimpleReferenceCache.java
index 29d3514127..7e798004fc 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/SimpleReferenceCache.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/SimpleReferenceCache.java
@@ -110,7 +110,7 @@ public class SimpleReferenceCache implements ReferenceCache
{
@Override
@SuppressWarnings("unchecked")
- public <T> T get(ReferenceConfigBase<T> rc) {
+ public <T> T get(ReferenceConfigBase<T> rc, boolean check) {
String key = generator.generateKey(rc);
Class<?> type = rc.getInterfaceClass();
@@ -129,7 +129,7 @@ public class SimpleReferenceCache implements ReferenceCache
{
referencesOfType.add(rc);
List<ReferenceConfigBase<?>> referenceConfigList =
ConcurrentHashMapUtils.computeIfAbsent(referenceKeyMap, key, _k ->
Collections.synchronizedList(new ArrayList<>()));
referenceConfigList.add(rc);
- proxy = rc.get();
+ proxy = rc.get(check);
}
return proxy;
@@ -203,6 +203,29 @@ public class SimpleReferenceCache implements
ReferenceCache {
return null;
}
+ @Override
+ public void check(String key, Class<?> type, long timeout) {
+ List<ReferenceConfigBase<?>> referencesOfKey =
referenceKeyMap.get(key);
+ if (CollectionUtils.isEmpty(referencesOfKey)) {
+ return;
+ }
+ List<ReferenceConfigBase<?>> referencesOfType =
referenceTypeMap.get(type);
+ if (CollectionUtils.isEmpty(referencesOfType)) {
+ return;
+ }
+ for (ReferenceConfigBase<?> rc : referencesOfKey) {
+ rc.checkOrDestroy(timeout);
+ }
+
+ }
+
+ @Override
+ public <T> void check(ReferenceConfigBase<T> referenceConfig, long
timeout) {
+ String key = generator.generateKey(referenceConfig);
+ Class<?> type = referenceConfig.getInterfaceClass();
+ check(key, type, timeout);
+ }
+
@Override
public void destroy(String key, Class<?> type) {
List<ReferenceConfigBase<?>> referencesOfKey =
referenceKeyMap.remove(key);
diff --git
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/MockReferenceConfig.java
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/MockReferenceConfig.java
index f6d0967368..1bb4519f90 100644
---
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/MockReferenceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/MockReferenceConfig.java
@@ -40,11 +40,11 @@ public class MockReferenceConfig extends
ReferenceConfig<FooService> {
}
@Override
- public synchronized FooService get() {
+ public synchronized FooService get(boolean check) {
if (value != null) return value;
counter.getAndIncrement();
- value = super.get();
+ value = super.get(check);
return value;
}
diff --git
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/XxxMockReferenceConfig.java
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/XxxMockReferenceConfig.java
index fbddd6db52..69fa54312a 100644
---
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/XxxMockReferenceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/utils/XxxMockReferenceConfig.java
@@ -40,11 +40,11 @@ public class XxxMockReferenceConfig extends
ReferenceConfig<XxxService> {
}
@Override
- public synchronized XxxService get() {
+ public synchronized XxxService get(boolean check) {
if (value != null) return value;
counter.getAndIncrement();
- value = super.get();
+ value = super.get(check);
return value;
}