This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new ba308219c6 Register services after module started (#12450)
ba308219c6 is described below
commit ba308219c60e9faa943d85b1bbbbe0b7d1be30df
Author: Albumen Kevin <[email protected]>
AuthorDate: Wed Jun 7 13:49:49 2023 +0800
Register services after module started (#12450)
---
.../dubbo/common/deploy/ApplicationDeployer.java | 5 +
.../org/apache/dubbo/config/ServiceConfigBase.java | 7 +-
.../main/java/com/alibaba/dubbo/rpc/Exporter.java | 7 ++
.../org/apache/dubbo/config/ServiceConfig.java | 59 ++++++++----
.../config/deploy/DefaultApplicationDeployer.java | 11 +++
.../dubbo/config/deploy/DefaultModuleDeployer.java | 46 ++++++++--
...egistryCenterExportProviderIntegrationTest.java | 4 +-
.../javaconfig/JavaConfigReferenceBeanTest.java | 2 +-
.../registry/integration/RegistryProtocol.java | 102 +++++++++++++++++----
.../main/java/org/apache/dubbo/rpc/Exporter.java | 5 +
.../rpc/listener/ListenerExporterWrapper.java | 5 +
.../dubbo/rpc/protocol/AbstractExporter.java | 5 +
.../dubbo/rpc/protocol/injvm/InjvmProtocol.java | 10 +-
13 files changed, 209 insertions(+), 59 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
index 7cdd356872..0b0b9d4509 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/deploy/ApplicationDeployer.java
@@ -89,6 +89,11 @@ public interface ApplicationDeployer extends
Deployer<ApplicationModel> {
*/
void notifyModuleChanged(ModuleModel moduleModel, DeployState state);
+ /**
+ * refresh service instance
+ */
+ void refreshServiceInstance();
+
/**
* Increase the count of service update threads.
* NOTE: should call ${@link
ApplicationDeployer#decreaseServiceRefreshCount()} after update finished
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
index 78e6bf1c16..0db03d0208 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ServiceConfigBase.java
@@ -415,7 +415,9 @@ public abstract class ServiceConfigBase<T> extends
AbstractServiceConfig {
/**
* export service and auto start application instance
*/
- public abstract void export();
+ public final void export() {
+ export(true);
+ }
public abstract void unexport();
@@ -423,4 +425,7 @@ public abstract class ServiceConfigBase<T> extends
AbstractServiceConfig {
public abstract boolean isUnexported();
+ public abstract void export(boolean register);
+
+ public abstract void register();
}
diff --git a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Exporter.java
b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Exporter.java
index 5da68a3b20..d89fe3aa5c 100644
--- a/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Exporter.java
+++ b/dubbo-compatible/src/main/java/com/alibaba/dubbo/rpc/Exporter.java
@@ -23,6 +23,8 @@ public interface Exporter<T> extends
org.apache.dubbo.rpc.Exporter<T> {
@Override
Invoker<T> getInvoker();
+ default void register() {}
+
default void unregister() {}
class CompatibleExporter<T> implements Exporter<T> {
@@ -43,6 +45,11 @@ public interface Exporter<T> extends
org.apache.dubbo.rpc.Exporter<T> {
delegate.unexport();
}
+ @Override
+ public void register() {
+ delegate.register();
+ }
+
@Override
public void unregister() {
delegate.unregister();
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 291296240b..7fad3c8359 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -280,7 +280,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
@Override
- public void export() {
+ public void export(boolean register) {
if (this.exported) {
return;
}
@@ -300,19 +300,37 @@ public class ServiceConfig<T> extends
ServiceConfigBase<T> {
this.init();
if (shouldDelay()) {
- doDelayExport();
+ // should register if delay export
+ doDelayExport(true);
} else {
- doExport();
+ doExport(register);
}
}
}
}
- protected void doDelayExport() {
+ @Override
+ public void register() {
+ if (!this.exported) {
+ return;
+ }
+
+ synchronized (this) {
+ if (!this.exported) {
+ return;
+ }
+
+ for (Exporter<?> exporter : exporters) {
+ exporter.register();
+ }
+ }
+ }
+
+ protected void doDelayExport(boolean register) {
ExecutorRepository.getInstance(getScopeModel().getApplicationModel()).getServiceExportExecutor()
.schedule(() -> {
try {
- doExport();
+ doExport(register);
} catch (Exception e) {
logger.error(CONFIG_FAILED_EXPORT_SERVICE, "configuration
server disconnected", "", "Failed to (async)export service config: " +
interfaceName, e);
}
@@ -439,7 +457,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
checkAndUpdateSubConfigs();
}
- protected synchronized void doExport() {
+ protected synchronized void doExport(boolean register) {
if (unexported) {
throw new IllegalStateException("The service " +
interfaceClass.getName() + " has already unexported!");
}
@@ -450,12 +468,12 @@ public class ServiceConfig<T> extends
ServiceConfigBase<T> {
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
- doExportUrls();
+ doExportUrls(register);
exported();
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private void doExportUrls() {
+ private void doExportUrls(boolean register) {
ModuleServiceRepository repository =
getScopeModel().getServiceRepository();
ServiceDescriptor serviceDescriptor;
final boolean serverService = ref instanceof ServerService;
@@ -490,7 +508,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
// In case user specified path, register service one
more time to map it to path.
repository.registerService(pathKey, interfaceClass);
}
- doExportUrlsFor1Protocol(protocolConfig, registryURLs);
+ doExportUrlsFor1Protocol(protocolConfig, registryURLs,
register);
}
return null;
}
@@ -499,7 +517,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
providerModel.setServiceUrls(urls);
}
- private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig,
List<URL> registryURLs) {
+ private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig,
List<URL> registryURLs, boolean register) {
Map<String, String> map = buildAttributes(protocolConfig);
// remove null key and null value
@@ -511,7 +529,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
processServiceExecutor(url);
- exportUrl(url, registryURLs);
+ exportUrl(url, registryURLs, register);
}
private void processServiceExecutor(URL url) {
@@ -695,7 +713,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
return url;
}
- private void exportUrl(URL url, List<URL> registryURLs) {
+ private void exportUrl(URL url, List<URL> registryURLs, boolean register) {
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
@@ -719,7 +737,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
build();
}
- url = exportRemote(url, registryURLs);
+ url = exportRemote(url, registryURLs, register);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url,
providerModel.getServiceModel(), getApplicationModel());
}
@@ -734,7 +752,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
URL localUrl = URLBuilder.from(url).
setProtocol(protocol).
build();
- localUrl = exportRemote(localUrl, registryURLs);
+ localUrl = exportRemote(localUrl, registryURLs,
register);
if (!isGeneric(generic) &&
!getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(localUrl,
providerModel.getServiceModel(), getApplicationModel());
}
@@ -746,7 +764,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
this.urls.add(url);
}
- private URL exportRemote(URL url, List<URL> registryURLs) {
+ private URL exportRemote(URL url, List<URL> registryURLs, boolean
register) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if
(SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
@@ -778,7 +796,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
}
}
- doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true);
+ doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true,
register);
}
} else {
@@ -787,7 +805,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
logger.info("Export dubbo service " + interfaceClass.getName()
+ " to url " + url);
}
- doExportUrl(url, true);
+ doExportUrl(url, true, register);
}
@@ -795,7 +813,10 @@ public class ServiceConfig<T> extends ServiceConfigBase<T>
{
}
@SuppressWarnings({"unchecked", "rawtypes"})
- private void doExportUrl(URL url, boolean withMetaData) {
+ private void doExportUrl(URL url, boolean withMetaData, boolean register) {
+ if (!register) {
+ url = url.addParameter(REGISTER_KEY, false);
+ }
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class)
interfaceClass, url);
if (withMetaData) {
invoker = new DelegateProviderMetaDataInvoker(invoker, this);
@@ -817,7 +838,7 @@ public class ServiceConfig<T> extends ServiceConfigBase<T> {
local = local.setScopeModel(getScopeModel())
.setServiceModel(providerModel);
local = local.addParameter(EXPORTER_LISTENER_KEY, LOCAL_PROTOCOL);
- doExportUrl(local, false);
+ doExportUrl(local, false, true);
logger.info("Export dubbo service " + interfaceClass.getName() + " to
local registry url : " + local);
}
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
index 06579caa9b..ca597f2ee9 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultApplicationDeployer.java
@@ -936,6 +936,17 @@ public class DefaultApplicationDeployer extends
AbstractDeployer<ApplicationMode
}
}
+ @Override
+ public void refreshServiceInstance() {
+ if (registered) {
+ try {
+
ServiceInstanceMetadataUtils.refreshMetadataAndInstance(applicationModel);
+ } catch (Exception e) {
+ logger.error(CONFIG_REFRESH_INSTANCE_ERROR, "", "", "Refresh
instance and metadata error.", e);
+ }
+ }
+ }
+
@Override
public void increaseServiceRefreshCount() {
serviceRefreshState.incrementAndGet();
diff --git
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
index 919a31d22a..3bfaf4ea02 100644
---
a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
+++
b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/deploy/DefaultModuleDeployer.java
@@ -174,7 +174,14 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
// if no async export/refer services, just set started
if (asyncExportingFutures.isEmpty() &&
asyncReferringFutures.isEmpty()) {
+ // publish module started event
onModuleStarted();
+
+ // register services to registry
+ registerServices();
+
+ // complete module start future after application state changed
+ completeStartFuture(true);
} else {
frameworkExecutorRepository.getSharedExecutor().submit(() -> {
try {
@@ -182,17 +189,27 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
waitExportFinish();
// wait for refer finish
waitReferFinish();
+
+ // publish module started event
+ onModuleStarted();
+
+ // register services to registry
+ registerServices();
} catch (Throwable e) {
logger.warn(CONFIG_FAILED_WAIT_EXPORT_REFER, "", "",
"wait for export/refer services occurred an exception", e);
+ onModuleFailed(getIdentifier() + " start failed: " +
e, e);
} finally {
- onModuleStarted();
+ // complete module start future after application
state changed
+ completeStartFuture(true);
}
});
}
+
} catch (Throwable e) {
onModuleFailed(getIdentifier() + " start failed: " + e, e);
throw e;
}
+
return startFuture;
}
@@ -300,16 +317,11 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
}
private void onModuleStarted() {
- try {
if (isStarting()) {
setStarted();
logger.info(getIdentifier() + " has started.");
applicationDeployer.notifyModuleChanged(moduleModel,
DeployState.STARTED);
}
- } finally {
- // complete module start future after application state changed
- completeStartFuture(true);
- }
}
private void onModuleFailed(String msg, Throwable ex) {
@@ -366,6 +378,15 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
}
}
+ private void registerServices() {
+ for (ServiceConfigBase sc : configManager.getServices()) {
+ if (!Boolean.FALSE.equals(sc.isRegister())) {
+ registerServiceInternal(sc);
+ }
+ }
+ applicationDeployer.refreshServiceInstance();
+ }
+
private void exportServiceInternal(ServiceConfigBase sc) {
ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
if (!serviceConfig.isRefreshed()) {
@@ -390,12 +411,23 @@ public class DefaultModuleDeployer extends
AbstractDeployer<ModuleModel> impleme
asyncExportingFutures.add(future);
} else {
if (!sc.isExported()) {
- sc.export();
+ sc.export(false);
exportedServices.add(sc);
}
}
}
+ private void registerServiceInternal(ServiceConfigBase sc) {
+ ServiceConfig<?> serviceConfig = (ServiceConfig<?>) sc;
+ if (!serviceConfig.isRefreshed()) {
+ serviceConfig.refresh();
+ }
+ if (!sc.isExported()) {
+ return;
+ }
+ sc.register();
+ }
+
private void unexportServices() {
exportedServices.forEach(sc -> {
try {
diff --git
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/multiple/exportprovider/MultipleRegistryCenterExportProviderIntegrationTest.java
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/multiple/exportprovider/MultipleRegistryCenterExportProviderIntegrationTest.java
index bbf5277ebc..7ed2e63358 100644
---
a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/multiple/exportprovider/MultipleRegistryCenterExportProviderIntegrationTest.java
+++
b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/integration/multiple/exportprovider/MultipleRegistryCenterExportProviderIntegrationTest.java
@@ -188,7 +188,7 @@ class MultipleRegistryCenterExportProviderIntegrationTest
implements Integration
// 1. InjvmExporter
// 2. DubboExporter with service-discovery-registry protocol
// 3. DubboExporter with registry protocol
-
Assertions.assertEquals(exporterListener.getExportedExporters().size(), 3);
+
Assertions.assertEquals(exporterListener.getExportedExporters().size(), 5);
// The exported exporter contains
MultipleRegistryCenterExportProviderFilter
Assertions.assertTrue(exporterListener.getFilters().contains(filter));
@@ -244,4 +244,4 @@ class MultipleRegistryCenterExportProviderIntegrationTest
implements Integration
logger.info(getClass().getSimpleName() + " testcase is ending...");
registryProtocolListener = null;
}
-}
\ No newline at end of file
+}
diff --git
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/reference/javaconfig/JavaConfigReferenceBeanTest.java
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/reference/javaconfig/JavaConfigReferenceBeanTest.java
index d951dd03df..2d020d8c8b 100644
---
a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/reference/javaconfig/JavaConfigReferenceBeanTest.java
+++
b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/reference/javaconfig/JavaConfigReferenceBeanTest.java
@@ -447,7 +447,7 @@ class JavaConfigReferenceBeanTest {
}
@Bean
- @Reference(group = "${myapp.group}", interfaceName =
"org.apache.dubbo.config.spring.api.LocalMissClass")
+ @DubboReference(group = "${myapp.group}", interfaceName =
"org.apache.dubbo.config.spring.api.LocalMissClass", scope = "local")
public ReferenceBean<GenericService> genericServiceWithoutInterface() {
return new ReferenceBean();
}
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index c0b3dc116d..33f872ac75 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -70,6 +70,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
import static
org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
@@ -162,8 +163,8 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
private final Map<String, ServiceConfigurationListener>
serviceConfigurationListeners = new ConcurrentHashMap<>();
//To solve the problem of RMI repeated exposure port conflicts, the
services that have been exposed are no longer exposed.
- //provider url <--> exporter
- private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds =
new ConcurrentHashMap<>();
+ //provider url <--> registry url <--> exporter
+ private final Map<String, Map<String, ExporterChangeableWrapper<?>>>
bounds = new ConcurrentHashMap<>();
protected Protocol protocol;
protected ProxyFactory proxyFactory;
@@ -217,7 +218,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
return map;
}
- private void register(Registry registry, URL registeredProviderUrl) {
+ private static void register(Registry registry, URL registeredProviderUrl)
{
ApplicationDeployer deployer =
registeredProviderUrl.getOrDefaultApplicationModel().getDeployer();
try {
deployer.increaseServiceRefreshCount();
@@ -272,6 +273,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
exporter.setNotifyListener(overrideSubscribeListener);
+ exporter.setRegistered(register);
ApplicationModel applicationModel =
getApplicationModel(providerUrl.getScopeModel());
if
(applicationModel.getModelEnvironment().getConfiguration().convert(Boolean.class,
ENABLE_26X_CONFIGURATION_LISTEN, true)) {
@@ -308,12 +310,14 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T>
originInvoker, URL providerUrl) {
- String key = getCacheKey(originInvoker);
-
- return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s ->
{
- Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker,
providerUrl);
- return new ExporterChangeableWrapper<>((Exporter<T>)
protocol.export(invokerDelegate), originInvoker);
- });
+ String providerUrlKey = getProviderUrlKey(originInvoker);
+ String registryUrlKey = getRegistryUrlKey(originInvoker);
+
+ return (ExporterChangeableWrapper<T>)
bounds.computeIfAbsent(providerUrlKey, _k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(registryUrlKey, s ->{
+ Invoker<?> invokerDelegate = new
InvokerDelegate<>(originInvoker, providerUrl);
+ return new ExporterChangeableWrapper<>((Exporter<T>)
protocol.export(invokerDelegate), originInvoker);
+ });
}
public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) {
@@ -333,8 +337,18 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
*/
@SuppressWarnings("unchecked")
public <T> void reExport(final Invoker<T> originInvoker, URL
newInvokerUrl) {
- String key = getCacheKey(originInvoker);
- ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>)
bounds.get(key);
+ String providerUrlKey = getProviderUrlKey(originInvoker);
+ String registryUrlKey = getRegistryUrlKey(originInvoker);
+ Map<String, ExporterChangeableWrapper<?>> registryMap =
bounds.get(providerUrlKey);
+ if (registryMap == null) {
+ logger.warn(INTERNAL_ERROR, "error state, exporterMap can not be
null", "", "error state, exporterMap can not be null", new
IllegalStateException("error state, exporterMap can not be null"));
+ return;
+ }
+ ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>)
registryMap.get(registryUrlKey);
+ if (exporter == null) {
+ logger.warn(INTERNAL_ERROR, "error state, exporterMap can not be
null", "", "error state, exporterMap can not be null", new
IllegalStateException("error state, exporterMap can not be null"));
+ return;
+ }
URL registeredUrl = exporter.getRegisterUrl();
URL registryUrl = getRegistryUrl(originInvoker);
@@ -369,7 +383,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
private <T> void doReExport(final Invoker<T> originInvoker,
ExporterChangeableWrapper<T> exporter,
URL registryUrl, URL oldProviderUrl, URL
newProviderUrl) {
- if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
+ if (exporter.isRegistered()) {
Registry registry;
try {
registry = getRegistry(getRegistryUrl(originInvoker));
@@ -480,12 +494,18 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
* @param originInvoker
* @return
*/
- private String getCacheKey(final Invoker<?> originInvoker) {
+ private String getProviderUrlKey(final Invoker<?> originInvoker) {
URL providerUrl = getProviderUrl(originInvoker);
String key = providerUrl.removeParameters(DYNAMIC_KEY,
ENABLED_KEY).toFullString();
return key;
}
+ private String getRegistryUrlKey(final Invoker<?> originInvoker) {
+ URL registryUrl = getRegistryUrl(originInvoker);
+ String key = registryUrl.removeParameters(DYNAMIC_KEY,
ENABLED_KEY).toFullString();
+ return key;
+ }
+
@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
@@ -653,7 +673,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
}
}
- List<Exporter<?>> exporters = new ArrayList<>(bounds.values());
+ List<Exporter<?>> exporters = bounds.values().stream().flatMap(e ->
e.values().stream()).collect(Collectors.toList());
for (Exporter<?> exporter : exporters) {
exporter.unexport();
}
@@ -712,6 +732,11 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
exporter.unexport();
}
+ @Override
+ public void register() {
+ exporter.register();
+ }
+
@Override
public void unregister() {
exporter.unregister();
@@ -777,8 +802,14 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
}
//The origin invoker
URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
- String key = getCacheKey(originInvoker);
- ExporterChangeableWrapper<?> exporter = bounds.get(key);
+ String providerUrlKey = getProviderUrlKey(originInvoker);
+ String registryUrlKey = getRegistryUrlKey(originInvoker);
+ Map<String, ExporterChangeableWrapper<?>> exporterMap =
bounds.get(providerUrlKey);
+ if (exporterMap == null) {
+ logger.warn(INTERNAL_ERROR, "error state, exporterMap can not
be null", "", "error state, exporterMap can not be null", new
IllegalStateException("error state, exporterMap can not be null"));
+ return;
+ }
+ ExporterChangeableWrapper<?> exporter =
exporterMap.get(registryUrlKey);
if (exporter == null) {
logger.warn(INTERNAL_ERROR, "unknown error in registry
module", "", "error state, exporter should not be null", new
IllegalStateException("error state, exporter should not be null"));
return;
@@ -920,7 +951,7 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
private URL registerUrl;
private NotifyListener notifyListener;
- private final AtomicBoolean unregistered = new AtomicBoolean(false);
+ private final AtomicBoolean registered = new AtomicBoolean(false);
public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T>
originInvoker) {
this.exporter = exporter;
@@ -943,9 +974,28 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
this.exporter = exporter;
}
+ @Override
+ public void register() {
+ if (registered.compareAndSet(false, true)) {
+ URL registryUrl = getRegistryUrl(originInvoker);
+ Registry registry = getRegistry(registryUrl);
+ RegistryProtocol.register(registry, getRegisterUrl());
+
+ ProviderModel providerModel =
frameworkModel.getServiceRepository()
+ .lookupExportedService(getRegisterUrl().getServiceKey());
+
+ List<ProviderModel.RegisterStatedURL> statedUrls =
providerModel.getStatedUrl();
+ statedUrls.stream()
+ .filter(u -> u.getRegistryUrl().equals(registryUrl)
+ &&
u.getProviderUrl().getProtocol().equals(getRegisterUrl().getProtocol()))
+ .forEach(u -> u.setRegistered(true));
+ logger.info("Registered dubbo service " +
getRegisterUrl().getServiceKey() + " url " + getRegisterUrl() + " to registry "
+ registryUrl);
+ }
+ }
+
@Override
public synchronized void unregister() {
- if (unregistered.compareAndSet(false, true)) {
+ if (registered.compareAndSet(true, false)) {
Registry registry =
RegistryProtocol.this.getRegistry(getRegistryUrl(originInvoker));
try {
registry.unregister(registerUrl);
@@ -987,13 +1037,25 @@ public class RegistryProtocol implements Protocol,
ScopeModelAware {
@Override
public synchronized void unexport() {
- String key = getCacheKey(this.originInvoker);
- bounds.remove(key);
+ String providerUrlKey = getProviderUrlKey(this.originInvoker);
+ String registryUrlKey = getRegistryUrlKey(this.originInvoker);
+ Map<String, ExporterChangeableWrapper<?>> exporterMap =
bounds.remove(providerUrlKey);
+ if (exporterMap != null) {
+ exporterMap.remove(registryUrlKey);
+ }
unregister();
doUnExport();
}
+ public void setRegistered(boolean registered) {
+ this.registered.set(registered);
+ }
+
+ public boolean isRegistered() {
+ return registered.get();
+ }
+
private void doUnExport() {
try {
exporter.unexport();
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Exporter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Exporter.java
index 9b87ddb5d9..fd3d58bc8e 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Exporter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Exporter.java
@@ -41,6 +41,11 @@ public interface Exporter<T> {
*/
void unexport();
+ /**
+ * register to registry
+ */
+ void register();
+
/**
* unregister from registry
*/
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ListenerExporterWrapper.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ListenerExporterWrapper.java
index 025459588d..b5eb3c52cb 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ListenerExporterWrapper.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/listener/ListenerExporterWrapper.java
@@ -62,6 +62,11 @@ public class ListenerExporterWrapper<T> implements
Exporter<T> {
}
}
+ @Override
+ public void register() {
+ exporter.register();
+ }
+
@Override
public void unregister() {
exporter.unregister();
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractExporter.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractExporter.java
index b7f6c03e30..a9fd653bfa 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractExporter.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractExporter.java
@@ -60,6 +60,11 @@ public abstract class AbstractExporter<T> implements
Exporter<T> {
afterUnExport();
}
+ @Override
+ public void register() {
+
+ }
+
@Override
public void unregister() {
diff --git
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
index 46fc957e66..9f361054f3 100644
---
a/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmProtocol.java
@@ -25,7 +25,6 @@ import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ScopeModel;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
-import org.apache.dubbo.rpc.support.ProtocolUtils;
import java.util.Map;
@@ -66,14 +65,7 @@ public class InjvmProtocol extends AbstractProtocol {
}
}
- if (result == null) {
- return null;
- } else if (ProtocolUtils.isGeneric(
- result.getInvoker().getUrl().getParameter(GENERIC_KEY))) {
- return null;
- } else {
- return result;
- }
+ return result;
}
@Override