This is an automated email from the ASF dual-hosted git repository.
jkevan pushed a commit to branch async-save-export-config
in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/async-save-export-config by
this push:
new d6840c8eb small cleanup
d6840c8eb is described below
commit d6840c8ebc25cecd2caec9b15ff23d1d3b166213
Author: Kevan <[email protected]>
AuthorDate: Tue Mar 28 17:33:35 2023 +0200
small cleanup
---
.../unomi/router/api/IRouterCamelContext.java | 4 +-
.../apache/unomi/router/api/RouterConstants.java | 5 +-
.../services/ImportExportConfigurationService.java | 16 +--
.../router/core/context/RouterCamelContext.java | 117 ++++++++++++++++-----
.../core/event/UpdateCamelRouteEventHandler.java | 6 +-
.../processor/ExportRouteCompletionProcessor.java | 7 +-
.../route/ProfileExportCollectRouteBuilder.java | 17 +--
.../resources/OSGI-INF/blueprint/blueprint.xml | 2 +-
.../services/ExportConfigurationServiceImpl.java | 56 ++++------
.../services/ImportConfigurationServiceImpl.java | 51 +++------
.../router/services/ProfileExportServiceImpl.java | 14 ++-
.../router/services/ProfileImportServiceImpl.java | 9 +-
.../resources/OSGI-INF/blueprint/blueprint.xml | 68 ++++++++++++
.../test/java/org/apache/unomi/itests/BaseIT.java | 4 +-
.../unomi/itests/ProfileImportRankingIT.java | 2 +-
15 files changed, 232 insertions(+), 146 deletions(-)
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
index d35c3d4a0..5ec1adb57 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/IRouterCamelContext.java
@@ -23,7 +23,9 @@ public interface IRouterCamelContext {
void killExistingRoute(String routeId, boolean fireEvent) throws Exception;
- void updateProfileReaderRoute(Object configuration, boolean fireEvent)
throws Exception;
+ void updateProfileImportReaderRoute(String configId, boolean fireEvent)
throws Exception;
+
+ void updateProfileExportReaderRoute(String configId, boolean fireEvent)
throws Exception;
void setTracing(boolean tracing);
}
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
index 3b04703ca..ff3d4c2f7 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
@@ -20,6 +20,10 @@ package org.apache.unomi.router.api;
* Created by amidani on 13/06/2017.
*/
public interface RouterConstants {
+ enum CONFIG_CAMEL_REFRESH {
+ UPDATED,
+ REMOVED
+ }
String CONFIG_TYPE_NOBROKER = "nobroker";
String CONFIG_TYPE_KAFKA = "kafka";
@@ -40,7 +44,6 @@ public interface RouterConstants {
String HEADER_CONFIG_TYPE = "configType";
- String HEADER_EXPORT_CONFIG = "exportConfig";
String HEADER_FAILED_MESSAGE = "failedMessage";
String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
diff --git
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
index dd561e745..edb103cc5 100644
---
a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
+++
b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/services/ImportExportConfigurationService.java
@@ -17,10 +17,11 @@
package org.apache.unomi.router.api.services;
import org.apache.unomi.router.api.ExportConfiguration;
-import org.apache.unomi.router.api.IRouterCamelContext;
import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
import java.util.List;
+import java.util.Map;
/**
* A service to access and operate on {@link ImportConfiguration}s / {@link
ExportConfiguration}s.
@@ -60,15 +61,8 @@ public interface ImportExportConfigurationService<T> {
void delete(String configId);
/**
- * Set the router camel context to share
- *
- * @param routerCamelContext the router Camel context to use for all
operations
- */
- void setRouterCamelContext(IRouterCamelContext routerCamelContext);
-
- /**
- * Retrieve the configured router camel context
- * @return the configured instance, or null if not configured
+ * Used by camel route system to get the latest changes on configs and
reflect changes on camel routes if necessary
+ * @return map of configId per operation to be done in camel
*/
- IRouterCamelContext getRouterCamelContext();
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
consumeConfigsToBeRefresh();
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 524a9efbf..642e94008 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -24,6 +24,7 @@ import org.apache.camel.model.RouteDefinition;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.api.services.ConfigSharingService;
import org.apache.unomi.api.services.ProfileService;
+import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.IRouterCamelContext;
@@ -43,6 +44,11 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* Created by amidani on 04/05/2017.
@@ -71,6 +77,11 @@ public class RouterCamelContext implements
IRouterCamelContext {
private ConfigSharingService configSharingService;
private ClusterService clusterService;
+ // TODO UNOMI-572: when fixing UNOMI-572 please remove the usage of the
custom ScheduledExecutorService and re-introduce the Unomi Scheduler Service
+ private ScheduledExecutorService scheduler;
+ private Integer configsRefreshInterval = 1000;
+ private ScheduledFuture<?> scheduledFuture;
+
public static String EVENT_ID_REMOVE =
"org.apache.unomi.router.event.remove";
public static String EVENT_ID_IMPORT =
"org.apache.unomi.router.event.import";
public static String EVENT_ID_EXPORT =
"org.apache.unomi.router.event.export";
@@ -99,12 +110,71 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.setTracing(true);
}
- public void initCamelContext() throws Exception {
+ public void init() throws Exception {
logger.info("Initialize Camel Context...");
+ scheduler = Executors.newSingleThreadScheduledExecutor();
configSharingService.setProperty(RouterConstants.IMPORT_ONESHOT_UPLOAD_DIR,
uploadDir);
configSharingService.setProperty(RouterConstants.KEY_HISTORY_SIZE,
execHistorySize);
+ initCamel();
+
+ initTimers();
+ logger.info("Camel Context initialized successfully.");
+ }
+
+ public void destroy() throws Exception {
+ scheduledFuture.cancel(true);
+ if (scheduler != null) {
+ scheduler.shutdown();
+ }
+ //This is to shutdown Camel context
+ //(will stop all routes/components/endpoints etc and clear internal
state/cache)
+ this.camelContext.stop();
+ logger.info("Camel context for profile import is shutdown.");
+ }
+
+ private void initTimers() {
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
importConfigsToRefresh = importConfigurationService.consumeConfigsToBeRefresh();
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
exportConfigsToRefresh = exportConfigurationService.consumeConfigsToBeRefresh();
+
+ for (Map.Entry<String,
RouterConstants.CONFIG_CAMEL_REFRESH> importConfigToRefresh :
importConfigsToRefresh.entrySet()) {
+ try {
+ if
(importConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED))
{
+
updateProfileImportReaderRoute(importConfigToRefresh.getKey(), true);
+ } else if
(importConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED))
{
+
killExistingRoute(importConfigToRefresh.getKey(), true);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing("
+ importConfigToRefresh.getValue() + ") camel route: " +
importConfigToRefresh.getKey(), e);
+ }
+ }
+
+
+ for (Map.Entry<String,
RouterConstants.CONFIG_CAMEL_REFRESH> exportConfigToRefresh :
exportConfigsToRefresh.entrySet()) {
+ try {
+ if
(exportConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED))
{
+
updateProfileExportReaderRoute(exportConfigToRefresh.getKey(), true);
+ } else if
(exportConfigToRefresh.getValue().equals(RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED))
{
+
killExistingRoute(exportConfigToRefresh.getKey(), true);
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing("
+ exportConfigToRefresh.getValue() + ") camel route: " +
exportConfigToRefresh.getKey(), e);
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Unexpected error while refreshing
import/export camel routes", e);
+ }
+ }
+ };
+ scheduledFuture = scheduler.scheduleWithFixedDelay(task, 0,
configsRefreshInterval, TimeUnit.MILLISECONDS);
+ }
+
+ private void initCamel() throws Exception {
camelContext = new OsgiDefaultCamelContext(bundleContext);
//--IMPORT ROUTES
@@ -139,7 +209,7 @@ public class RouterCamelContext implements
IRouterCamelContext {
//Profiles collect
ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder =
new ProfileExportCollectRouteBuilder(kafkaProps, configType);
-
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
+
profileExportCollectRouteBuilder.setExportConfigurationList(exportConfigurationService.getAll());
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
@@ -157,13 +227,6 @@ public class RouterCamelContext implements
IRouterCamelContext {
camelContext.addRoutes(profileExportProducerRouteBuilder);
camelContext.start();
-
- logger.debug("postConstruct {" + bundleContext.getBundle() + "}");
-
- importConfigurationService.setRouterCamelContext(this);
- exportConfigurationService.setRouterCamelContext(this);
-
- logger.info("Camel Context {} initialized successfully.");
}
public void killExistingRoute(String routeId, boolean fireEvent) throws
Exception {
@@ -183,17 +246,15 @@ public class RouterCamelContext implements
IRouterCamelContext {
}
}
- public void updateProfileReaderRoute(Object configuration, boolean
fireEvent) throws Exception {
- if (configuration instanceof ImportConfiguration) {
- updateProfileImportReaderRoute((ImportConfiguration)
configuration, fireEvent);
- } else {
- updateProfileExportReaderRoute((ExportConfiguration)
configuration, fireEvent);
+ public void updateProfileImportReaderRoute(String configId, boolean
fireEvent) throws Exception {
+ killExistingRoute(configId, false);
+
+ ImportConfiguration importConfiguration =
importConfigurationService.load(configId);
+ if (importConfiguration == null) {
+ logger.warn("Cannot update profile import reader route, config: {}
not found", configId);
+ return;
}
- }
- private void updateProfileImportReaderRoute(ImportConfiguration
importConfiguration, boolean fireEvent) throws Exception {
- killExistingRoute(importConfiguration.getItemId(), false);
- //Handle transforming an import config oneshot <--> recurrent
if
(RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType()))
{
ProfileImportFromSourceRouteBuilder builder = new
ProfileImportFromSourceRouteBuilder(kafkaProps, configType);
builder.setImportConfigurationList(Arrays.asList(importConfiguration));
@@ -212,13 +273,18 @@ public class RouterCamelContext implements
IRouterCamelContext {
}
}
- private void updateProfileExportReaderRoute(ExportConfiguration
exportConfiguration, boolean fireEvent) throws Exception {
- killExistingRoute(exportConfiguration.getItemId(), false);
- //Handle transforming an import config oneshot <--> recurrent
+ public void updateProfileExportReaderRoute(String configId, boolean
fireEvent) throws Exception {
+ killExistingRoute(configId, false);
+
+ ExportConfiguration exportConfiguration =
exportConfigurationService.load(configId);
+ if (exportConfiguration == null) {
+ logger.warn("Cannot update profile export reader route, config: {}
not found", configId);
+ return;
+ }
+
if
(RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType()))
{
ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder
= new ProfileExportCollectRouteBuilder(kafkaProps, configType);
profileExportCollectRouteBuilder.setExportConfigurationList(Arrays.asList(exportConfiguration));
-
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
@@ -292,11 +358,4 @@ public class RouterCamelContext implements
IRouterCamelContext {
public void setAllowedEndpoints(String allowedEndpoints) {
this.allowedEndpoints = allowedEndpoints;
}
-
- public void preDestroy() throws Exception {
- //This is to shutdown Camel context
- //(will stop all routes/components/endpoints etc and clear internal
state/cache)
- this.camelContext.stop();
- logger.info("Camel context for profile import is shutdown.");
- }
}
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
index c75207273..91cd09df1 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEventHandler.java
@@ -49,8 +49,10 @@ public class UpdateCamelRouteEventHandler extends
CellarSupport implements Event
logger.debug("Event id is {}", event.getId());
if (event.getId().equals(RouterCamelContext.EVENT_ID_REMOVE)
&& StringUtils.isNotBlank(event.getRouteId())) {
routerCamelContext.killExistingRoute(event.getRouteId(),
false);
- } else if
((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT) ||
event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) &&
event.getConfiguration() != null) {
-
routerCamelContext.updateProfileReaderRoute(event.getConfiguration(), false);
+ } else if
((event.getId().equals(RouterCamelContext.EVENT_ID_IMPORT))) {
+
routerCamelContext.updateProfileImportReaderRoute(event.getRouteId(), false);
+ } else if
(event.getId().equals(RouterCamelContext.EVENT_ID_EXPORT)) {
+
routerCamelContext.updateProfileExportReaderRoute(event.getRouteId(), false);
}
} catch (Exception e) {
logger.error("Error when executing event", e);
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
index 309c7c2a2..6d16caa88 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -40,14 +40,15 @@ public class ExportRouteCompletionProcessor implements
Processor {
@Override
public void process(Exchange exchange) throws Exception {
- ExportConfiguration exportConfig = (ExportConfiguration)
exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG);
+ ExportConfiguration exportConfiguration =
exportConfigurationService.load(exchange.getFromRouteId());
+ if (exportConfiguration == null) {
+ logger.warn("Unable to complete export, config cannot not found:
{}", exchange.getFromRouteId());
+ }
Map execution = new HashMap();
execution.put(RouterConstants.KEY_EXECS_DATE, ((Date)
exchange.getProperty("CamelCreatedTimestamp")).getTime());
execution.put(RouterConstants.KEY_EXECS_EXTRACTED,
exchange.getProperty("CamelSplitSize"));
- ExportConfiguration exportConfiguration =
exportConfigurationService.load(exportConfig.getItemId());
-
exportConfiguration = (ExportConfiguration)
RouterUtils.addExecutionEntry(exportConfiguration, execution,
executionsHistorySize);
exportConfiguration.setStatus(RouterConstants.CONFIG_STATUS_COMPLETE_SUCCESS);
diff --git
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index bff0d6d46..1acf51b55 100644
---
a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++
b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -39,7 +39,6 @@ public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder
private static final Logger logger =
LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
private List<ExportConfiguration> exportConfigurationList;
- private ImportExportConfigurationService<ExportConfiguration>
exportConfigurationService;
private PersistenceService persistenceService;
public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps,
String configType) {
@@ -48,16 +47,16 @@ public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder
@Override
public void configure() throws Exception {
- logger.info("Configure Recurrent Route 'Export :: Collect Data'");
-
- if (exportConfigurationList == null) {
- exportConfigurationList = exportConfigurationService.getAll();
+ if (exportConfigurationList == null ||
exportConfigurationList.isEmpty()) {
+ // Nothing to configure
+ return;
}
+ logger.info("Configure Recurrent Route 'Export :: Collect Data'");
+
CollectProfileBean collectProfileBean = new CollectProfileBean();
collectProfileBean.setPersistenceService(persistenceService);
-
//Loop on multiple export configuration
for (final ExportConfiguration exportConfiguration :
exportConfigurationList) {
if
(RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())
&&
@@ -76,7 +75,6 @@ public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder
.split(body())
.marshal(jacksonDataFormat)
.convertBodyTo(String.class)
-
.setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration))
.log(LoggingLevel.DEBUG, "BODY : ${body}");
if
(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
prDef.to((KafkaEndpoint)
getEndpointURI(RouterConstants.DIRECTION_FROM,
RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
@@ -99,12 +97,7 @@ public class ProfileExportCollectRouteBuilder extends
RouterAbstractRouteBuilder
this.exportConfigurationList = exportConfigurationList;
}
- public void
setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration>
exportConfigurationService) {
- this.exportConfigurationService = exportConfigurationService;
- }
-
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
-
}
diff --git
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index d61d64f2d..b366556be 100644
---
a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++
b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -82,7 +82,7 @@
<bean id="camelContext"
class="org.apache.unomi.router.core.context.RouterCamelContext"
- init-method="initCamelContext" destroy-method="preDestroy">
+ init-method="init" destroy-method="destroy">
<property name="configType" value="${router.config.type}"/>
<property name="allowedEndpoints" value="${config.allowedEndpoints}"/>
<property name="uploadDir" value="${import.oneshot.uploadDir}"/>
diff --git
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
index bb9e21f23..d1b8d8f75 100644
---
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
+++
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ExportConfigurationServiceImpl.java
@@ -16,36 +16,32 @@
*/
package org.apache.unomi.router.services;
-import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
-import org.apache.unomi.router.api.IRouterCamelContext;
+import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Service to manage Configuration of Item to export
* Created by amidani on 28/04/2017.
*/
-@Component(immediate = true, property = "configDiscriminator=EXPORT", service
= ImportExportConfigurationService.class)
public class ExportConfigurationServiceImpl implements
ImportExportConfigurationService<ExportConfiguration> {
private static final Logger logger =
LoggerFactory.getLogger(ExportConfigurationServiceImpl.class.getName());
- @Reference
+
private PersistenceService persistenceService;
- @Reference
- private SchedulerService schedulerService;
- private IRouterCamelContext routerCamelContext;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ private final Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
camelConfigsToRefresh = new ConcurrentHashMap<>();
public ExportConfigurationServiceImpl() {
logger.info("Initializing export configuration service...");
@@ -66,41 +62,25 @@ public class ExportConfigurationServiceImpl implements
ImportExportConfiguration
if (exportConfiguration.getItemId() == null) {
exportConfiguration.setItemId(UUID.randomUUID().toString());
}
+ persistenceService.save(exportConfiguration);
+
if (updateRunningRoute) {
- TimerTask updateRoute = new TimerTask() {
- @Override
- public void run() {
- try {
-
routerCamelContext.updateProfileReaderRoute(exportConfiguration, true);
- } catch (Exception e) {
- logger.error("Error when trying to save/update running
Apache Camel Route: {}", exportConfiguration.getItemId());
- }
- }
- };
- // Defer config update.
-
schedulerService.getScheduleExecutorService().schedule(updateRoute, 0,
TimeUnit.MILLISECONDS);
+ camelConfigsToRefresh.put(exportConfiguration.getItemId(),
RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED);
}
- persistenceService.save(exportConfiguration);
+
return persistenceService.load(exportConfiguration.getItemId(),
ExportConfiguration.class);
}
@Override
public void delete(String configId) {
- try {
- routerCamelContext.killExistingRoute(configId, true);
- } catch (Exception e) {
- logger.error("Error when trying to delete running Apache Camel
Route: {}", configId);
- }
persistenceService.remove(configId, ExportConfiguration.class);
+ camelConfigsToRefresh.put(configId,
RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED);
}
@Override
- public void setRouterCamelContext(IRouterCamelContext routerCamelContext) {
- this.routerCamelContext = routerCamelContext;
- }
-
- @Override
- public IRouterCamelContext getRouterCamelContext() {
- return routerCamelContext;
+ public Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
consumeConfigsToBeRefresh() {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> result = new
HashMap<>(camelConfigsToRefresh);
+ camelConfigsToRefresh.clear();
+ return result;
}
}
diff --git
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
index 28c083412..a12d2991f 100644
---
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
+++
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ImportConfigurationServiceImpl.java
@@ -16,36 +16,31 @@
*/
package org.apache.unomi.router.services;
-import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
-import org.apache.unomi.router.api.IRouterCamelContext;
import org.apache.unomi.router.api.ImportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Service to manage Configuration of object to import
* Created by amidani on 28/04/2017.
*/
-@Component(immediate = true, property = "configDiscriminator=IMPORT", service
= ImportExportConfigurationService.class)
public class ImportConfigurationServiceImpl implements
ImportExportConfigurationService<ImportConfiguration> {
private static final Logger logger =
LoggerFactory.getLogger(ImportConfigurationServiceImpl.class.getName());
- @Reference
private PersistenceService persistenceService;
- @Reference
- private SchedulerService schedulerService;
- private IRouterCamelContext routerCamelContext;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ private final Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
camelConfigsToRefresh = new ConcurrentHashMap<>();
public ImportConfigurationServiceImpl() {
logger.info("Initializing import configuration service...");
@@ -67,18 +62,7 @@ public class ImportConfigurationServiceImpl implements
ImportExportConfiguration
importConfiguration.setItemId(UUID.randomUUID().toString());
}
if (updateRunningRoute) {
- TimerTask updateRoute = new TimerTask() {
- @Override
- public void run() {
- try {
-
routerCamelContext.updateProfileReaderRoute(importConfiguration, true);
- } catch (Exception e) {
- logger.error("Error when trying to save/update running
Apache Camel Route: {}", importConfiguration.getItemId());
- }
- }
- };
- // Defer config update.
-
schedulerService.getScheduleExecutorService().schedule(updateRoute, 0,
TimeUnit.MILLISECONDS);
+ camelConfigsToRefresh.put(importConfiguration.getItemId(),
RouterConstants.CONFIG_CAMEL_REFRESH.UPDATED);
}
persistenceService.save(importConfiguration);
return persistenceService.load(importConfiguration.getItemId(),
ImportConfiguration.class);
@@ -86,21 +70,14 @@ public class ImportConfigurationServiceImpl implements
ImportExportConfiguration
@Override
public void delete(String configId) {
- try {
- routerCamelContext.killExistingRoute(configId, true);
- } catch (Exception e) {
- logger.error("Error when trying to delete running Apache Camel
Route: {}", configId);
- }
persistenceService.remove(configId, ImportConfiguration.class);
+ camelConfigsToRefresh.put(configId,
RouterConstants.CONFIG_CAMEL_REFRESH.REMOVED);
}
@Override
- public void setRouterCamelContext(IRouterCamelContext routerCamelContext) {
- this.routerCamelContext = routerCamelContext;
- }
-
- @Override
- public IRouterCamelContext getRouterCamelContext() {
- return routerCamelContext;
+ public Map<String, RouterConstants.CONFIG_CAMEL_REFRESH>
consumeConfigsToBeRefresh() {
+ Map<String, RouterConstants.CONFIG_CAMEL_REFRESH> result = new
HashMap<>(camelConfigsToRefresh);
+ camelConfigsToRefresh.clear();
+ return result;
}
}
diff --git
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
index 971ccb7e6..95fd4a6a9 100644
---
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
+++
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java
@@ -26,8 +26,6 @@ import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.RouterUtils;
import org.apache.unomi.router.api.services.ProfileExportService;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +34,22 @@ import java.util.*;
/**
* Created by amidani on 30/06/2017.
*/
-@Component(immediate = true, service = ProfileExportService.class)
public class ProfileExportServiceImpl implements ProfileExportService {
private static final Logger logger =
LoggerFactory.getLogger(ProfileExportServiceImpl.class.getName());
- @Reference
+
private PersistenceService persistenceService;
- @Reference
private ConfigSharingService configSharingService;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
+ public void setConfigSharingService(ConfigSharingService
configSharingService) {
+ this.configSharingService = configSharingService;
+ }
+
public String extractProfilesBySegment(ExportConfiguration
exportConfiguration) {
List<Profile> profileList = persistenceService.query("segments",
(String) exportConfiguration.getProperty("segment"), null, Profile.class);
StringBuilder csvContent = new StringBuilder();
diff --git
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
index 2e3acc5a7..578aee22c 100644
---
a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
+++
b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileImportServiceImpl.java
@@ -21,8 +21,6 @@ import org.apache.unomi.api.Profile;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ProfileToImport;
import org.apache.unomi.router.api.services.ProfileImportService;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +30,16 @@ import java.util.List;
/**
* Created by amidani on 18/05/2017.
*/
-@Component(immediate = true, service = ProfileImportService.class)
public class ProfileImportServiceImpl implements ProfileImportService {
private static final Logger logger =
LoggerFactory.getLogger(ProfileImportServiceImpl.class.getName());
- @Reference
+
private PersistenceService persistenceService;
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+
public boolean saveMergeDeleteImportedProfile(ProfileToImport
profileToImport) throws InvocationTargetException, IllegalAccessException {
logger.debug("Importing profile with ID : {}",
profileToImport.getItemId());
Profile existingProfile = new Profile();
diff --git
a/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
b/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644
index 000000000..74e67907a
--- /dev/null
+++
b/extensions/router/router-service/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
+ xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd">
+
+ <reference id="persistenceService"
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
+ <reference id="configSharingService"
interface="org.apache.unomi.api.services.ConfigSharingService"/>
+
+ <bean id="importConfigurationServiceImpl"
class="org.apache.unomi.router.services.ImportConfigurationServiceImpl">
+ <property name="persistenceService" ref="persistenceService"/>
+ </bean>
+ <service id="importConfigurationService"
ref="importConfigurationServiceImpl">
+ <interfaces>
+
<value>org.apache.unomi.router.api.services.ImportExportConfigurationService</value>
+ </interfaces>
+ <service-properties>
+ <entry key="configDiscriminator" value="IMPORT"/>
+ </service-properties>
+ </service>
+
+ <bean id="exportConfigurationServiceImpl"
class="org.apache.unomi.router.services.ExportConfigurationServiceImpl">
+ <property name="persistenceService" ref="persistenceService"/>
+ </bean>
+ <service id="exportConfigurationService"
ref="exportConfigurationServiceImpl">
+ <interfaces>
+
<value>org.apache.unomi.router.api.services.ImportExportConfigurationService</value>
+ </interfaces>
+ <service-properties>
+ <entry key="configDiscriminator" value="EXPORT"/>
+ </service-properties>
+ </service>
+
+ <bean id="profileImportServiceImpl"
class="org.apache.unomi.router.services.ProfileImportServiceImpl">
+ <property name="persistenceService" ref="persistenceService"/>
+ </bean>
+ <service id="profileImportService" ref="profileImportServiceImpl">
+ <interfaces>
+
<value>org.apache.unomi.router.api.services.ProfileImportService</value>
+ </interfaces>
+ </service>
+
+ <bean id="profileExportServiceImpl"
class="org.apache.unomi.router.services.ProfileExportServiceImpl">
+ <property name="persistenceService" ref="persistenceService"/>
+ <property name="configSharingService" ref="configSharingService" />
+ </bean>
+ <service id="profileExportService" ref="profileExportServiceImpl">
+ <interfaces>
+
<value>org.apache.unomi.router.api.services.ProfileExportService</value>
+ </interfaces>
+ </service>
+</blueprint>
diff --git a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
index f62d32d8a..7e70e40ab 100644
--- a/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/BaseIT.java
@@ -52,6 +52,7 @@ import org.apache.unomi.lifecycle.BundleWatcher;
import org.apache.unomi.persistence.spi.CustomObjectMapper;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.IRouterCamelContext;
import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.schema.api.SchemaService;
@@ -150,6 +151,7 @@ public abstract class BaseIT extends KarafTestSupport {
protected PatchService patchService;
protected ImportExportConfigurationService<ImportConfiguration>
importConfigurationService;
protected ImportExportConfigurationService<ExportConfiguration>
exportConfigurationService;
+ protected IRouterCamelContext routerCamelContext;
protected UserListService userListService;
protected TopicService topicService;
@@ -194,9 +196,9 @@ public abstract class BaseIT extends KarafTestSupport {
patchService = getOsgiService(PatchService.class, 600000);
userListService = getOsgiService(UserListService.class, 600000);
topicService = getOsgiService(TopicService.class, 600000);
- patchService = getOsgiService(PatchService.class, 600000);
importConfigurationService =
getOsgiService(ImportExportConfigurationService.class,
"(configDiscriminator=IMPORT)", 600000);
exportConfigurationService =
getOsgiService(ImportExportConfigurationService.class,
"(configDiscriminator=EXPORT)", 600000);
+ routerCamelContext = getOsgiService(IRouterCamelContext.class, 600000);
// init httpClient
httpClient = initHttpClient();
diff --git
a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
index 2b321b26e..18c7e8128 100644
--- a/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
+++ b/itests/src/test/java/org/apache/unomi/itests/ProfileImportRankingIT.java
@@ -50,7 +50,7 @@ public class ProfileImportRankingIT extends BaseIT {
@Test
public void testImportRanking() throws InterruptedException {
- importConfigurationService.getRouterCamelContext().setTracing(true);
+ routerCamelContext.setTracing(true);
/*** Create Missing Properties ***/
PropertyType propertyTypeUciId = new PropertyType(new
Metadata("integration", "uciId", "UCI ID", "UCI ID"));