This is an automated email from the ASF dual-hosted git repository. jkevan pushed a commit to branch async-save-export-config in repository https://gitbox.apache.org/repos/asf/unomi.git
The following commit(s) were added to refs/heads/async-save-export-config by this push: new 3077a8d27 small cleanup 3077a8d27 is described below commit 3077a8d27b64c5b652127e717a5b2029c1f24e1f Author: Kevan <ke...@jahia.com> AuthorDate: Wed Mar 29 12:12:48 2023 +0200 small cleanup --- .../main/java/org/apache/unomi/router/api/RouterConstants.java | 1 + .../src/main/java/org/apache/unomi/router/api/RouterUtils.java | 4 ++++ .../org/apache/unomi/router/core/bean/CollectProfileBean.java | 1 + .../org/apache/unomi/router/core/context/RouterCamelContext.java | 5 ++--- .../apache/unomi/router/core/event/UpdateCamelRouteEvent.java | 9 --------- .../router/core/processor/ExportRouteCompletionProcessor.java | 2 ++ .../router/core/route/ProfileExportCollectRouteBuilder.java | 3 ++- .../router/core/route/ProfileExportProducerRouteBuilder.java | 6 ++---- .../src/main/resources/OSGI-INF/blueprint/blueprint.xml | 1 + .../apache/unomi/router/services/ProfileExportServiceImpl.java | 1 + 10 files changed, 16 insertions(+), 17 deletions(-) diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java index ff3d4c2f7..5ef19fe44 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java @@ -44,6 +44,7 @@ public interface RouterConstants { String HEADER_CONFIG_TYPE = "configType"; + String HEADER_EXPORT_CONFIG = "exportConfig"; String HEADER_FAILED_MESSAGE = "failedMessage"; String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java index 464f90839..a535206c9 100644 --- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java +++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterUtils.java @@ -18,6 +18,7 @@ package org.apache.unomi.router.api; import org.apache.unomi.api.PropertyType; +import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -27,6 +28,9 @@ import java.util.Map; public class RouterUtils { public static ImportExportConfiguration addExecutionEntry(ImportExportConfiguration configuration, Map execution, int executionsHistorySize) { + if (configuration.getExecutions() == null) { + configuration.setExecutions(new ArrayList<>()); + } if (configuration.getExecutions().size() >= executionsHistorySize) { int oldestExecIndex = 0; long oldestExecDate = (Long) configuration.getExecutions().get(0).get(RouterConstants.KEY_EXECS_DATE); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java index 452501974..1ea03eb3a 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java @@ -29,6 +29,7 @@ public class CollectProfileBean { private PersistenceService persistenceService; public List<Profile> extractProfileBySegment(String segment) { + // TODO: UNOMI-759 avoid loading all profiles in RAM here return persistenceService.query("segments", segment,null, Profile.class); } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java index 642e94008..4ad24e2d1 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java @@ -43,6 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; +import java.util.Collections; import java.util.Map; import java.util.TimerTask; import java.util.concurrent.Executors; @@ -267,7 +268,6 @@ public class RouterCamelContext implements IRouterCamelContext { if (fireEvent) { UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_IMPORT); - event.setConfiguration(importConfiguration); clusterService.sendEvent(event); } } @@ -284,7 +284,7 @@ public class RouterCamelContext implements IRouterCamelContext { if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) { ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType); - profileExportCollectRouteBuilder.setExportConfigurationList(Arrays.asList(exportConfiguration)); + profileExportCollectRouteBuilder.setExportConfigurationList(Collections.singletonList(exportConfiguration)); profileExportCollectRouteBuilder.setPersistenceService(persistenceService); profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints); profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat); @@ -293,7 +293,6 @@ public class RouterCamelContext implements IRouterCamelContext { if (fireEvent) { UpdateCamelRouteEvent event = new UpdateCamelRouteEvent(EVENT_ID_EXPORT); - event.setConfiguration(exportConfiguration); clusterService.sendEvent(event); } } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java index 7e1dc81d2..2f3d2cb3f 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/event/UpdateCamelRouteEvent.java @@ -23,7 +23,6 @@ import org.apache.karaf.cellar.core.event.Event; */ public class UpdateCamelRouteEvent extends Event { private String routeId; - private Object configuration; public UpdateCamelRouteEvent(String id) { super(id); @@ -36,12 +35,4 @@ public class UpdateCamelRouteEvent extends Event { public void setRouteId(String routeId) { this.routeId = routeId; } - - public Object getConfiguration() { - return configuration; - } - - public void setConfiguration(Object configuration) { - this.configuration = configuration; - } } diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java index 6d16caa88..414e1c00f 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java @@ -40,9 +40,11 @@ public class ExportRouteCompletionProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { + // We load the conf from ES because we are going to increment the execution number ExportConfiguration exportConfiguration = exportConfigurationService.load(exchange.getFromRouteId()); if (exportConfiguration == null) { logger.warn("Unable to complete export, config cannot not found: {}", exchange.getFromRouteId()); + return; } Map execution = new HashMap(); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java index 1acf51b55..e87ec50be 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java @@ -73,8 +73,9 @@ public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder .autoStartup(exportConfiguration.isActive()) .bean(collectProfileBean, "extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") + ")") .split(body()) - .marshal(jacksonDataFormat) + .marshal(jacksonDataFormat) // TODO: UNOMI-759 avoid unnecessary marshalling .convertBodyTo(String.class) + .setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration)) .log(LoggingLevel.DEBUG, "BODY : ${body}"); if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java index 9f25c3bb9..86288019f 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java @@ -59,10 +59,8 @@ public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilde rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER)); } - LineBuildProcessor processor = new LineBuildProcessor(profileExportService); - - rtDef.unmarshal(jacksonDataFormat) - .process(processor) + rtDef.unmarshal(jacksonDataFormat) // TODO: UNOMI-759 avoid unnecessary marshalling + .process(new LineBuildProcessor(profileExportService)) .aggregate(constant(true), new StringLinesAggregationStrategy()) .completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize"))) .eagerCheckCompletion() diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index b366556be..d7b7a36c0 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -114,6 +114,7 @@ <property name="profileService" ref="profileService"/> <property name="clusterService" ref="clusterService" /> </bean> + <service id="camelContextOSGI" ref="camelContext" interface="org.apache.unomi.router.api.IRouterCamelContext"/> <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean"> <property name="persistenceService" ref="persistenceService"/> diff --git a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java index 95fd4a6a9..8b9471b54 100644 --- a/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java +++ b/extensions/router/router-service/src/main/java/org/apache/unomi/router/services/ProfileExportServiceImpl.java @@ -70,6 +70,7 @@ public class ProfileExportServiceImpl implements ProfileExportService { } public String convertProfileToCSVLine(Profile profile, ExportConfiguration exportConfiguration) { + // TODO: UNOMI-759 querying this everytimes Collection<PropertyType> propertiesDef = persistenceService.query("target", "profiles", null, PropertyType.class); Map<String, String> mapping = (Map<String, String>) exportConfiguration.getProperty("mapping"); String lineToWrite = "";