UNOMI-101 : No broker config allowed
Project: http://git-wip-us.apache.org/repos/asf/incubator-unomi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-unomi/commit/8231d564 Tree: http://git-wip-us.apache.org/repos/asf/incubator-unomi/tree/8231d564 Diff: http://git-wip-us.apache.org/repos/asf/incubator-unomi/diff/8231d564 Branch: refs/heads/feature-UNOMI-5-KARAF4 Commit: 8231d5640d45f1f9ee8b20a8b239c54a4205ab7c Parents: f597a69 Author: Abdelkader Midani <[email protected]> Authored: Wed Jun 14 12:12:20 2017 +0200 Committer: Abdelkader Midani <[email protected]> Committed: Wed Jun 14 12:12:33 2017 +0200 ---------------------------------------------------------------------- .../unomi/router/core/RouterConstants.java | 35 ++++++ .../core/context/ProfileImportCamelContext.java | 17 ++- .../BadProfileDataFormatException.java | 35 ++++++ .../ImportConfigByFileNameProcessor.java | 3 +- .../core/processor/LineSplitFailureHandler.java | 34 ++++++ .../core/processor/LineSplitProcessor.java | 31 +++-- .../ProfileImportAbstractRouteBuilder.java | 83 +++++++++++++ .../ProfileImportFromSourceRouteBuilder.java | 102 ++++++++++++++++ .../ProfileImportKafkaToUnomiRouteBuilder.java | 77 ------------ .../route/ProfileImportOneShotRouteBuilder.java | 42 +++---- .../ProfileImportSourceToKafkaRouteBuilder.java | 120 ------------------- .../route/ProfileImportToUnomiRouteBuilder.java | 70 +++++++++++ .../resources/OSGI-INF/blueprint/blueprint.xml | 6 + .../main/resources/org.apache.unomi.router.cfg | 15 ++- 14 files changed, 422 insertions(+), 248 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java new file mode 100644 index 0000000..f09e993 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/RouterConstants.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core; + +/** + * Created by amidani on 13/06/2017. + */ +public interface RouterConstants { + + String CONFIG_TYPE_NOBROKER = "nobroker"; + String CONFIG_TYPE_KAFKA = "kafka"; + + String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer"; + + String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot"; + String HEADER_CONFIG_TYPE = "configType"; + String HEADER_PROFILES_COUNT = "profilesCount"; + + String DIRECTION_FROM = "from"; + String DIRECTION_TO = "to"; +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java index df734d3..9a4a004 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/ProfileImportCamelContext.java @@ -24,9 +24,9 @@ import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.services.ImportConfigurationService; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; import org.apache.unomi.router.core.processor.UnomiStorageProcessor; -import org.apache.unomi.router.core.route.ProfileImportKafkaToUnomiRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder; import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder; -import org.apache.unomi.router.core.route.ProfileImportSourceToKafkaRouteBuilder; +import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder; import org.osgi.framework.Bundle; import org.osgi.framework.BundleContext; import org.osgi.framework.BundleEvent; @@ -53,6 +53,7 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { private JacksonDataFormat jacksonDataFormat; private String uploadDir; private Map<String, String> kafkaProps; + private String configType; private final String IMPORT_CONFIG_TYPE_RECURRENT = "recurrent"; @@ -66,14 +67,14 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { logger.info("Initialize Camel Context..."); camelContext = new DefaultCamelContext(); List<ImportConfiguration> importConfigurationList = importConfigurationService.getImportConfigurations(); - ProfileImportSourceToKafkaRouteBuilder builderReader = new ProfileImportSourceToKafkaRouteBuilder(kafkaProps); + ProfileImportFromSourceRouteBuilder builderReader = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); builderReader.setImportConfigurationList(importConfigurationList); builderReader.setJacksonDataFormat(jacksonDataFormat); builderReader.setContext(camelContext); camelContext.addRoutes(builderReader); //One shot import route - ProfileImportOneShotRouteBuilder builderOneShot = new ProfileImportOneShotRouteBuilder(kafkaProps); + ProfileImportOneShotRouteBuilder builderOneShot = new ProfileImportOneShotRouteBuilder(kafkaProps, configType); builderOneShot.setImportConfigByFileNameProcessor(importConfigByFileNameProcessor); builderOneShot.setJacksonDataFormat(jacksonDataFormat); builderOneShot.setUploadDir(uploadDir); @@ -81,7 +82,7 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { camelContext.addRoutes(builderOneShot); - ProfileImportKafkaToUnomiRouteBuilder builderProcessor = new ProfileImportKafkaToUnomiRouteBuilder(kafkaProps); + ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType); builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor); builderProcessor.setJacksonDataFormat(jacksonDataFormat); builderProcessor.setContext(camelContext); @@ -113,7 +114,7 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { } //Handle transforming an import config oneshot <--> recurrent if(IMPORT_CONFIG_TYPE_RECURRENT.equals(importConfiguration.getConfigType())){ - ProfileImportSourceToKafkaRouteBuilder builder = new ProfileImportSourceToKafkaRouteBuilder(kafkaProps); + ProfileImportFromSourceRouteBuilder builder = new ProfileImportFromSourceRouteBuilder(kafkaProps, configType); builder.setImportConfigurationList(Arrays.asList(importConfiguration)); builder.setJacksonDataFormat(jacksonDataFormat); builder.setContext(camelContext); @@ -149,6 +150,10 @@ public class ProfileImportCamelContext implements SynchronousBundleListener { this.kafkaProps = kafkaProps; } + public void setConfigType(String configType) { + this.configType = configType; + } + public void preDestroy() throws Exception { bundleContext.removeBundleListener(this); //This is to shutdown Camel context http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java new file mode 100644 index 0000000..6c947ff --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/exception/BadProfileDataFormatException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.exception; + +/** + * Created by amidani on 13/06/2017. + */ +public class BadProfileDataFormatException extends Exception{ + + public BadProfileDataFormatException() { + super(); + } + + public BadProfileDataFormatException(String message) { + super(message); + } + + public BadProfileDataFormatException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java index 7fc7730..c2968cd 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportConfigByFileNameProcessor.java @@ -21,6 +21,7 @@ import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFile; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.services.ImportConfigurationService; +import org.apache.unomi.router.core.RouterConstants; /** * Created by amidani on 22/05/2017. @@ -35,7 +36,7 @@ public class ImportConfigByFileNameProcessor implements Processor{ String fileName = exchange.getIn().getBody(GenericFile.class).getFileName(); String importConfigId = fileName.substring(0, fileName.indexOf('.')); ImportConfiguration importConfiguration = importConfigurationService.load(importConfigId); - exchange.getIn().setHeader("importConfigOneShot", importConfiguration); + exchange.getIn().setHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT, importConfiguration); } public void setImportConfigurationService(ImportConfigurationService importConfigurationService) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java new file mode 100644 index 0000000..b1de82a --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitFailureHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by amidani on 14/06/2017. + */ +public class LineSplitFailureHandler implements Processor { + + private static final Logger logger = LoggerFactory.getLogger(LineSplitFailureHandler.class.getName()); + + public void process(Exchange exchange) throws Exception { + logger.error("{}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java index 150ef6d..da14aa5 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineSplitProcessor.java @@ -22,11 +22,10 @@ import org.apache.camel.component.kafka.KafkaConstants; import org.apache.commons.lang3.StringUtils; import org.apache.unomi.router.api.ImportConfiguration; import org.apache.unomi.router.api.ProfileToImport; +import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.exception.BadProfileDataFormatException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; /** * Created by amidani on 29/12/2016. @@ -42,7 +41,8 @@ public class LineSplitProcessor implements Processor { @Override public void process(Exchange exchange) throws Exception { //In case of one shot import we check the header and overwrite import config - ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader("importConfigOneShot"); + ImportConfiguration importConfigOneShot = (ImportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_IMPORT_CONFIG_ONESHOT); + String configType = (String) exchange.getIn().getHeader(RouterConstants.HEADER_CONFIG_TYPE); if(importConfigOneShot!=null) { fieldsMapping = (Map<String, Integer>)importConfigOneShot.getProperties().get("mapping"); propertiesToOverwrite = importConfigOneShot.getPropertiesToOverwrite(); @@ -50,15 +50,18 @@ public class LineSplitProcessor implements Processor { overwriteExistingProfiles = importConfigOneShot.isOverwriteExistingProfiles(); columnSeparator = importConfigOneShot.getColumnSeparator(); } - String[] profileData = ((String)exchange.getIn().getBody()).split(columnSeparator); + String[] profileData = ((String)exchange.getIn().getBody()).split(columnSeparator, -1); ProfileToImport profileToImport = new ProfileToImport(); profileToImport.setItemId(UUID.randomUUID().toString()); profileToImport.setItemType("profile"); profileToImport.setScope("system"); - if(profileData.length > 0) { + if(profileData.length > 0 && StringUtils.isNotBlank(profileData[0])) { + if(fieldsMapping.size() != (profileData.length - 1)) { + throw new BadProfileDataFormatException("The index does not match the number of column : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("MAPPING_COLUMN_MATCH")); + } Map<String, Object> properties = new HashMap<>(); - for(String fieldMappingKey : fieldsMapping.keySet()) { - if(profileData.length > fieldsMapping.get(fieldMappingKey)) { + for (String fieldMappingKey : fieldsMapping.keySet()) { + if (profileData.length > fieldsMapping.get(fieldMappingKey)) { properties.put(fieldMappingKey, profileData[fieldsMapping.get(fieldMappingKey)].trim()); } } @@ -66,13 +69,17 @@ public class LineSplitProcessor implements Processor { profileToImport.setMergingProperty(mergingProperty); profileToImport.setPropertiesToOverwrite(propertiesToOverwrite); profileToImport.setOverwriteExistingProfiles(overwriteExistingProfiles); - if(StringUtils.isNotBlank(profileData[profileData.length - 1]) && Boolean.parseBoolean(profileData[profileData.length - 1].trim())) { + if (StringUtils.isNotBlank(profileData[profileData.length - 1]) && Boolean.parseBoolean(profileData[profileData.length - 1].trim())) { profileToImport.setProfileToDelete(true); } + } else { + throw new BadProfileDataFormatException("Empty line : line ["+((Integer)exchange.getProperty("CamelSplitIndex")+1)+"]", new Throwable("EMPTY_LINE")); } exchange.getIn().setBody(profileToImport, ProfileToImport.class); - exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); - exchange.getIn().setHeader(KafkaConstants.KEY, "1"); + if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); + exchange.getIn().setHeader(KafkaConstants.KEY, "1"); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java new file mode 100644 index 0000000..ca28baf --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.core.RouterConstants; + +import java.util.Map; + +/** + * Created by amidani on 13/06/2017. + */ +public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder { + + protected JacksonDataFormat jacksonDataFormat; + + protected String kafkaHost; + protected String kafkaPort; + protected String kafkaImportTopic; + protected String kafkaImportGroupId; + protected String kafkaImportConsumerCount; + protected String kafkaImportAutoCommit; + + protected String configType; + + public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) { + this.kafkaHost = kafkaProps.get("kafkaHost"); + this.kafkaPort = kafkaProps.get("kafkaPort"); + this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); + this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + this.kafkaImportConsumerCount = kafkaProps.get("kafkaImportConsumerCount"); + this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit"); + this.configType = configType; + } + + public Object getEndpointURI(String direction) { + Object endpoint; + if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + //Prepare Kafka Deposit + StringBuilder kafkaUri = new StringBuilder("kafka:"); + kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); + if (StringUtils.isNotBlank(kafkaImportGroupId)) { + kafkaUri.append("&groupId=" + kafkaImportGroupId); + } + if(RouterConstants.DIRECTION_TO.equals(direction)) { + kafkaUri.append("&autoCommitEnable="+kafkaImportAutoCommit+"&consumersCount="+kafkaImportConsumerCount); + } + KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); + kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort); + kafkaConfiguration.setTopic(kafkaImportTopic); + kafkaConfiguration.setGroupId(kafkaImportGroupId); + endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); + ((KafkaEndpoint)endpoint).setConfiguration(kafkaConfiguration); + } else { + endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER; + } + + return endpoint; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java new file mode 100644 index 0000000..9be6fb6 --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.LoggingLevel; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.api.ImportConfiguration; +import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.exception.BadProfileDataFormatException; +import org.apache.unomi.router.core.processor.LineSplitFailureHandler; +import org.apache.unomi.router.core.processor.LineSplitProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +/** + * Created by amidani on 26/04/2017. + */ + +public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRouteBuilder { + + private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName()); + + private List<ImportConfiguration> importConfigurationList; + + + public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) { + super(kafkaProps, configType); + } + + @Override + public void configure() throws Exception { + + logger.info("Configure Recurrent Route 'From Source'"); + + //Loop on multiple import configuration + for (ImportConfiguration importConfiguration : importConfigurationList) { + if (importConfiguration.getProperties().size() > 0 && + StringUtils.isNotEmpty((String) importConfiguration.getProperties().get("source"))) { + //Prepare Split Processor + LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); + lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping")); + lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles()); + lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite()); + lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty()); + lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator()); + + onException(BadProfileDataFormatException.class) + .log(LoggingLevel.ERROR, "Error processing record ${exchangeProperty.CamelSplitIndex}++ !") + .handled(true) + .process(new LineSplitFailureHandler()) + .to("direct:errors"); + + errorHandler(deadLetterChannel("direct:errors")); + + ProcessorDefinition prDef = from((String) importConfiguration.getProperties().get("source")) + .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop + .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active + .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) + .log(LoggingLevel.INFO, "Splitted into ${exchangeProperty.CamelSplitSize} records") + .setHeader(RouterConstants.HEADER_PROFILES_COUNT, exchangeProperty("CamelSplitSize}")) + .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType)) + .process(lineSplitProcessor) + .log(LoggingLevel.INFO, "Split IDX ${exchangeProperty.CamelSplitIndex} record") + .to("log:org.apache.unomi.router?level=INFO") + .marshal(jacksonDataFormat) + .convertBodyTo(String.class); + + if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) { + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } + + from("direct:errors").to("log:org.apache.unomi.router?level=ERROR"); + } + } + } + + public void setImportConfigurationList(List<ImportConfiguration> importConfigurationList) { + this.importConfigurationList = importConfigurationList; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java deleted file mode 100644 index 1b056fe..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportKafkaToUnomiRouteBuilder.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.route; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.component.kafka.KafkaComponent; -import org.apache.camel.component.kafka.KafkaConfiguration; -import org.apache.camel.component.kafka.KafkaEndpoint; -import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.router.core.processor.UnomiStorageProcessor; - -import java.util.Map; - -/** - * Created by amidani on 26/04/2017. - */ -public class ProfileImportKafkaToUnomiRouteBuilder extends RouteBuilder { - - private UnomiStorageProcessor unomiStorageProcessor; - private JacksonDataFormat jacksonDataFormat; - private String kafkaHost; - private String kafkaPort; - private String kafkaImportTopic; - private String kafkaImportGroupId; - - public ProfileImportKafkaToUnomiRouteBuilder(Map<String, String> kafkaProps) { - kafkaHost = kafkaProps.get("kafkaHost"); - kafkaPort = kafkaProps.get("kafkaPort"); - kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); - kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); - } - - @Override - public void configure() throws Exception { - - StringBuilder kafkaUri = new StringBuilder("kafka:"); - kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); - if(StringUtils.isNotBlank(kafkaImportGroupId)) { - kafkaUri.append("&groupId="+kafkaImportGroupId); - } - kafkaUri.append("&autoCommitEnable=true&consumersCount=10"); - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); - kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); - kafkaConfiguration.setTopic(kafkaImportTopic); - kafkaConfiguration.setGroupId(kafkaImportGroupId); - KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); - endpoint.setConfiguration(kafkaConfiguration); - from(endpoint) - .unmarshal(jacksonDataFormat) - .process(unomiStorageProcessor) - .to("log:org.apache.unomi.router?level=INFO"); - - } - - public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { - this.unomiStorageProcessor = unomiStorageProcessor; - } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; - } -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java index d095f3e..84b220c 100644 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java @@ -21,7 +21,9 @@ import org.apache.camel.component.jackson.JacksonDataFormat; import org.apache.camel.component.kafka.KafkaComponent; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.ProcessorDefinition; import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.core.RouterConstants; import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor; import org.apache.unomi.router.core.processor.LineSplitProcessor; import org.slf4j.Logger; @@ -32,57 +34,41 @@ import java.util.Map; /** * Created by amidani on 22/05/2017. */ -public class ProfileImportOneShotRouteBuilder extends RouteBuilder { +public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRouteBuilder { private Logger logger = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName()); private ImportConfigByFileNameProcessor importConfigByFileNameProcessor; - private JacksonDataFormat jacksonDataFormat; private String uploadDir; - private String kafkaHost; - private String kafkaPort; - private String kafkaImportTopic; - private String kafkaImportGroupId; private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE"; - public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps) { - kafkaHost = kafkaProps.get("kafkaHost"); - kafkaPort = kafkaProps.get("kafkaPort"); - kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); - kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); + public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, String configType) { + super(kafkaProps, configType); } @Override public void configure() throws Exception { - //Prepare Kafka Deposit - StringBuilder kafkaUri = new StringBuilder("kafka:"); - kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); - if(StringUtils.isNotBlank(kafkaImportGroupId)) { - kafkaUri.append("&groupId="+ kafkaImportGroupId); - } - - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); - kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); - kafkaConfiguration.setTopic(kafkaImportTopic); - kafkaConfiguration.setGroupId(kafkaImportGroupId); - KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); - endpoint.setConfiguration(kafkaConfiguration); + logger.info("Configure OneShot Route..."); LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); - - from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m") + ProcessorDefinition prDef = from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m") .routeId(IMPORT_ONESHOT_ROUTE_ID) .autoStartup(true) .process(importConfigByFileNameProcessor) .split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}")) + .setHeader("configType", constant(configType)) .process(lineSplitProcessor) .to("log:org.apache.unomi.router?level=INFO") .marshal(jacksonDataFormat) - .convertBodyTo(String.class) - .to(endpoint); + .convertBodyTo(String.class); + if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){ + prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } else { + prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM)); + } } public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) { http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java deleted file mode 100644 index 37ae59e..0000000 --- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportSourceToKafkaRouteBuilder.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.unomi.router.core.route; - -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.jackson.JacksonDataFormat; -import org.apache.camel.component.kafka.KafkaComponent; -import org.apache.camel.component.kafka.KafkaConfiguration; -import org.apache.camel.component.kafka.KafkaEndpoint; -import org.apache.commons.lang3.StringUtils; -import org.apache.unomi.router.api.ImportConfiguration; -import org.apache.unomi.router.core.processor.LineSplitProcessor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - -/** - * Created by amidani on 26/04/2017. - */ - -public class ProfileImportSourceToKafkaRouteBuilder extends RouteBuilder { - - private static final Logger logger = LoggerFactory.getLogger(ProfileImportSourceToKafkaRouteBuilder.class.getName()); - - private List<ImportConfiguration> importConfigurationList; - private JacksonDataFormat jacksonDataFormat; - private String kafkaHost; - private String kafkaPort; - private String kafkaImportTopic; - private String kafkaImportGroupId; - - public ProfileImportSourceToKafkaRouteBuilder(Map<String, String> kafkaProps) { - kafkaHost = kafkaProps.get("kafkaHost"); - kafkaPort = kafkaProps.get("kafkaPort"); - kafkaImportTopic = kafkaProps.get("kafkaImportTopic"); - kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId"); - } - - @Override - public void configure() throws Exception { - //Prepare Kafka Deposit - StringBuilder kafkaUri = new StringBuilder("kafka:"); - kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic); - if(StringUtils.isNotBlank(kafkaImportGroupId)) { - kafkaUri.append("&groupId="+ kafkaImportGroupId); - } - - KafkaConfiguration kafkaConfiguration = new KafkaConfiguration(); - kafkaConfiguration.setBrokers(kafkaHost+":"+kafkaPort); - kafkaConfiguration.setTopic(kafkaImportTopic); - kafkaConfiguration.setGroupId(kafkaImportGroupId); - KafkaEndpoint endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext())); - endpoint.setConfiguration(kafkaConfiguration); - - //Loop on multiple import configuration - for(ImportConfiguration importConfiguration : importConfigurationList) { - if(importConfiguration.getProperties().size() > 0 && - StringUtils.isNotEmpty((String) importConfiguration.getProperties().get("source"))) { - //Prepare Split Processor - LineSplitProcessor lineSplitProcessor = new LineSplitProcessor(); - lineSplitProcessor.setFieldsMapping((Map<String, Integer>) importConfiguration.getProperties().get("mapping")); - lineSplitProcessor.setOverwriteExistingProfiles(importConfiguration.isOverwriteExistingProfiles()); - lineSplitProcessor.setPropertiesToOverwrite(importConfiguration.getPropertiesToOverwrite()); - lineSplitProcessor.setMergingProperty(importConfiguration.getMergingProperty()); - lineSplitProcessor.setColumnSeparator(importConfiguration.getColumnSeparator()); - - from((String) importConfiguration.getProperties().get("source")) - .routeId(importConfiguration.getItemId())// This allow identification of the route for manual start/stop - .autoStartup(importConfiguration.isActive())// Auto-start if the import configuration is set active - .split(bodyAs(String.class).tokenize(importConfiguration.getLineSeparator())) - .process(lineSplitProcessor) - .to("log:org.apache.unomi.router?level=INFO") - .marshal(jacksonDataFormat) - .convertBodyTo(String.class) - .to(endpoint); - } - } - } - - public void setImportConfigurationList(List<ImportConfiguration> importConfigurationList) { - this.importConfigurationList = importConfigurationList; - } - - public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { - this.jacksonDataFormat = jacksonDataFormat; - } - - public void setKafkaHost(String kafkaHost) { - this.kafkaHost = kafkaHost; - } - - public void setKafkaPort(String kafkaPort) { - this.kafkaPort = kafkaPort; - } - - public void setKafkaImportTopic(String kafkaImportTopic) { - this.kafkaImportTopic = kafkaImportTopic; - } - - public void setKafkaImportGroupId(String kafkaImportGroupId) { - this.kafkaImportGroupId = kafkaImportGroupId; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java new file mode 100644 index 0000000..9bec24b --- /dev/null +++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.unomi.router.core.route; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.jackson.JacksonDataFormat; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.camel.component.kafka.KafkaEndpoint; +import org.apache.camel.model.RouteDefinition; +import org.apache.commons.lang3.StringUtils; +import org.apache.unomi.router.core.RouterConstants; +import org.apache.unomi.router.core.processor.UnomiStorageProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** + * Created by amidani on 26/04/2017. + */ +public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRouteBuilder { + + private Logger logger = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName()); + + private UnomiStorageProcessor unomiStorageProcessor; + + public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) { + super(kafkaProps, configType); + } + + @Override + public void configure() throws Exception { + + logger.info("Configure Recurrent Route 'To Target'"); + + RouteDefinition rtDef; + if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){ + rtDef=from((KafkaEndpoint)getEndpointURI(RouterConstants.DIRECTION_TO)); + } else { + rtDef=from((String)getEndpointURI(RouterConstants.DIRECTION_TO)); + } + rtDef.unmarshal(jacksonDataFormat) + .process(unomiStorageProcessor) + .to("log:org.apache.unomi.router?level=INFO"); + + } + + public void setUnomiStorageProcessor(UnomiStorageProcessor unomiStorageProcessor) { + this.unomiStorageProcessor = unomiStorageProcessor; + } + + public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) { + this.jacksonDataFormat = jacksonDataFormat; + } +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml index 4c36b9e..53efbf3 100644 --- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml +++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -25,10 +25,13 @@ <cm:property-placeholder persistent-id="org.apache.unomi.router" update-strategy="reload"> <cm:default-properties> + <cm:property name="import.config.type" value="nobroker"/> <cm:property name="kafka.host" value="localhost"/> <cm:property name="kafka.port" value="9092"/> <cm:property name="kafka.import.topic" value="camel-deposit"/> <cm:property name="kafka.import.groupId" value="unomi-import-group"/> + <cm:property name="kafka.import.consumerCount" value="10"/> + <cm:property name="kafka.import.autoCommit" value="true"/> <cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/> </cm:default-properties> </cm:property-placeholder> @@ -63,12 +66,15 @@ <bean id="camelContext" class="org.apache.unomi.router.core.context.ProfileImportCamelContext" init-method="initCamelContext" destroy-method="preDestroy"> + <property name="configType" value="${import.config.type}"/> <property name="kafkaProps"> <map> <entry key="kafkaHost" value="${kafka.host}"/> <entry key="kafkaPort" value="${kafka.port}"/> <entry key="kafkaImportTopic" value="${kafka.import.topic}"/> <entry key="kafkaImportGroupId" value="${kafka.import.groupId}"/> + <entry key="kafkaImportConsumerCount" value="${kafka.import.consumerCount}"/> + <entry key="kafkaImportAutoCommit" value="${kafka.import.autoCommit}"/> </map> </property> <property name="uploadDir" value="${import.oneshot.uploadDir}"/> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/8231d564/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg ---------------------------------------------------------------------- diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg index ff2c8ef..0ce4bb2 100644 --- a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg +++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg @@ -15,11 +15,18 @@ # limitations under the License. # +#Configuration Type values {'nobroker', 'kafka'} +import.config.type=nobroker + +#Uncomment and update Kafka settings to use Kafka as a broker + #Kafka - settingskafka.host=localhost -kafka.port=9092 -kafka.import.topic=camel-deposit -kafka.import.groupId=unomi-import-group +#kafka.host=localhost +#kafka.port=9092 +#kafka.import.topic=camel-deposit +#kafka.import.groupId=unomi-import-group +#kafka.import.consumerCount=10 +#kafka.import.autoCommit=true #Import One Shot upload directory import.oneshot.uploadDir=/tmp/unomi_oneshot_import_configs/ \ No newline at end of file
