This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/flume.git
commit 1521f5c32e73b01faafec214a628bbf477deb8a4 Author: rgoers <[email protected]> AuthorDate: Thu Sep 22 12:00:48 2022 -0700 Spring Boot support --- flume-spring-boot/pom.xml | 94 ++++++++++++++++++ .../org/apache/flume/spring/boot/Application.java | 33 +++++++ .../boot/config/AbstractFlumeConfiguration.java | 107 +++++++++++++++++++++ .../spring/boot/config/SpringConfiguration.java | 59 ++++++++++++ .../flume/spring/boot/runner/SpringFlume.java | 59 ++++++++++++ .../java/org/apache/flume/spring/boot/AppTest.java | 43 +++++++++ .../apache/flume/spring/boot/config/AppConfig.java | 75 +++++++++++++++ .../src/test/resources/application.yml | 8 ++ .../src/test/resources/log4j2-test.xml | 31 ++++++ pom.xml | 3 +- 10 files changed, 511 insertions(+), 1 deletion(-) diff --git a/flume-spring-boot/pom.xml b/flume-spring-boot/pom.xml new file mode 100644 index 000000000..bfd058fcc --- /dev/null +++ b/flume-spring-boot/pom.xml @@ -0,0 +1,94 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flume-parent</artifactId> + <groupId>org.apache.flume</groupId> + <version>1.11.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flume-spring-boot</artifactId> + <name>Flume Spring Boot Support</name> + + <properties> + <module.name>org.apache.flume.spring.boot</module.name> + <spring-boot.version>2.7.3</spring-boot.version> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-dependencies</artifactId> + <version>${spring-boot.version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + + <dependencies> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-configuration</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.flume</groupId> + <artifactId>flume-ng-node</artifactId> + </dependency> + <!-- Spring Boot dependencies --> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-actuator</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter</artifactId> + <exclusions> + <exclusion> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + </dependency> + <!-- log dependencies --> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-log4j2</artifactId> + </dependency> + </dependencies> + <build> + + </build> + +</project> \ No newline at end of file diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java new file mode 100644 index 000000000..4edbad5f5 --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ + +package org.apache.flume.spring.boot; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; + +/** + * + */ +@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"}) +@EnableConfigurationProperties +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java new file mode 100644 index 000000000..0846e46dd --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.config; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.FlumeException; +import org.apache.flume.Sink; +import org.apache.flume.Source; +import org.apache.flume.SourceRunner; +import org.apache.flume.channel.ChannelProcessor; +import org.apache.flume.conf.Configurables; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * + */ +public abstract class AbstractFlumeConfiguration { + + protected <T extends Channel> T configureChannel(final String name, final Class<T> clazz, + final Map<String, String> params) { + T channel; + try { + channel = clazz.newInstance(); + } catch (Exception ex) { + throw new FlumeException("Unable to create channel " + name, ex); + } + channel.setName(name); + Configurables.configure(channel, createContext(params)); + return channel; + } + + protected <T extends Source> SourceRunner configureSource(final String name, final Class<T> clazz, + final ChannelSelector selector, final Map<String, String> params) { + T source; + try { + source = clazz.newInstance(); + } catch (Exception ex) { + throw new FlumeException("Unable to create source " + name, ex); + } + source.setName(name); + Configurables.configure(source, createContext(params)); + source.setChannelProcessor(new ChannelProcessor(selector)); + return SourceRunner.forSource(source); + } + + protected <T extends Sink> Sink configureSink(final String name, final Class<T> clazz, + final Channel channel, + final Map<String, String> params) { + T sink; + try { + sink = clazz.newInstance(); + } catch (Exception ex) { + throw new FlumeException("Unable to create sink " + name, ex); + } + sink.setName(name); + Configurables.configure(sink, createContext(params)); + sink.setChannel(channel); + return sink; + } + + protected ChannelSelector createChannelSelector(Class<? extends ChannelSelector> clazz, + Map<String, String> params) { + ChannelSelector selector; + try { + selector = clazz.newInstance(); + } catch (Exception ex) { + throw new FlumeException("Unable to create channel selector " + clazz.getName(), ex); + } + Configurables.configure(selector, createContext(params)); + return selector; + } + + /** + * Creates a List from a Varargs array. + * + * @param items The items to add to the list. + * @param <T> The type of objects in the List. + * @return a List containing the supplied items. + */ + @SafeVarargs + protected final <T> List<T> listOf(T... items) { + return Arrays.asList(items); + } + + private static Context createContext(Map<String, String> map) { + return map != null ? new Context(map) : new Context(); + } +} diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java new file mode 100644 index 000000000..eac9a765a --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/SpringConfiguration.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.config; + +import org.apache.flume.Channel; +import org.apache.flume.SinkRunner; +import org.apache.flume.SourceRunner; +import org.apache.flume.node.MaterializedConfiguration; +import org.apache.flume.node.SimpleMaterializedConfiguration; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; + +/** + * + */ +@Configuration +public class SpringConfiguration { + + @Autowired + public Map<String, Channel> channels; + + @Autowired + public Map<String, SinkRunner> sinkRunners; + + @Autowired + public Map<String, SourceRunner> sourceRunners; + + @Bean + public MaterializedConfiguration configuration() { + MaterializedConfiguration config = new SimpleMaterializedConfiguration(); + for (Map.Entry<String, Channel> entry : channels.entrySet()) { + config.addChannel(entry.getKey(), entry.getValue()); + } + for (Map.Entry<String, SinkRunner> entry : sinkRunners.entrySet()) { + config.addSinkRunner(entry.getKey(), entry.getValue()); + } + for (Map.Entry<String, SourceRunner> entry : sourceRunners.entrySet()) { + config.addSourceRunner(entry.getKey(), entry.getValue()); + } + return config; + } +} diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java new file mode 100644 index 000000000..2ddecad29 --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/runner/SpringFlume.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.runner; + +import com.google.common.collect.Lists; +import org.apache.flume.lifecycle.LifecycleAware; +import org.apache.flume.node.Application; +import org.apache.flume.node.MaterializedConfiguration; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.List; + +/** + * + */ +@Component +public class SpringFlume { + private MaterializedConfiguration materializedConfiguration; + + private final Application application; + + @Autowired + public SpringFlume(MaterializedConfiguration configuration) { + this.materializedConfiguration = configuration; + List<LifecycleAware> components = Lists.newArrayList(); + application = new Application(components); + } + + @PostConstruct + public void startUp() { + application.start(); + application.handleConfigurationEvent(materializedConfiguration); + } + + @PreDestroy + public void shutdown() { + application.stop(); + } + + +} diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java new file mode 100644 index 000000000..af2654348 --- /dev/null +++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot; + +import org.apache.flume.node.MaterializedConfiguration; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +/** + * + */ +@ExtendWith(SpringExtension.class) +@SpringBootTest +public class AppTest { + + @Autowired + MaterializedConfiguration configuration; + + @Test + public void contextLoads() { + Assertions.assertThat(configuration).isNotNull(); + Assertions.assertThat(configuration.getSinkRunners()).isNotNull(); + Assertions.assertThat(configuration.getSinkRunners().size()).isEqualTo(1); + } +} diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java new file mode 100644 index 000000000..cc9534376 --- /dev/null +++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.config; + +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Sink; +import org.apache.flume.SinkProcessor; +import org.apache.flume.SinkRunner; +import org.apache.flume.SourceRunner; +import org.apache.flume.channel.MemoryChannel; +import org.apache.flume.channel.ReplicatingChannelSelector; +import org.apache.flume.sink.DefaultSinkProcessor; +import org.apache.flume.sink.NullSink; +import org.apache.flume.source.SequenceGeneratorSource; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +@Configuration +public class AppConfig extends AbstractFlumeConfiguration { + + @Bean + @ConfigurationProperties(prefix = "flume.sources.source1") + public Map<String, String> source1Properties() { + return new HashMap<>(); + } + + @Bean + @ConfigurationProperties(prefix = "flume.channels.channel1") + public Map<String, String> channel1Properties() { + return new HashMap<>(); + } + + @Bean + public Channel memoryChannel(Map<String, String> channel1Properties) { + return configureChannel("channel1", MemoryChannel.class, channel1Properties); + } + + @Bean + public SourceRunner seqSource(Channel memoryChannel, Map<String, String> source1Properties) { + ChannelSelector selector = new ReplicatingChannelSelector(); + selector.setChannels(listOf(memoryChannel)); + return configureSource("source1", SequenceGeneratorSource.class, selector, + source1Properties); + } + + @Bean + public SinkRunner nullSink(Channel memoryChannel) { + SinkProcessor sinkProcessor = new DefaultSinkProcessor(); + Sink sink = configureSink("null", NullSink.class, memoryChannel,null); + sinkProcessor.setSinks(listOf(sink)); + return new SinkRunner(sinkProcessor); + } +} diff --git a/flume-spring-boot/src/test/resources/application.yml b/flume-spring-boot/src/test/resources/application.yml new file mode 100644 index 000000000..8e2e2e5ed --- /dev/null +++ b/flume-spring-boot/src/test/resources/application.yml @@ -0,0 +1,8 @@ + +flume: + sources: + source1: + totalEvents: 10000 + channels: + channel1: + capacity: 10000 diff --git a/flume-spring-boot/src/test/resources/log4j2-test.xml b/flume-spring-boot/src/test/resources/log4j2-test.xml new file mode 100644 index 000000000..d912f3105 --- /dev/null +++ b/flume-spring-boot/src/test/resources/log4j2-test.xml @@ -0,0 +1,31 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<Configuration status="ERROR"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d [%t] %-5level: %msg%n%throwable" /> + </Console> + </Appenders> + <Loggers> + <Logger name="org.apache.flume" level="INFO" /> + <Root level="WARN"> + <AppenderRef ref="Console" /> + </Root> + </Loggers> +</Configuration> \ No newline at end of file diff --git a/pom.xml b/pom.xml index f81743465..416035d2a 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ limitations under the License. <fasterxml.jackson.databind.version>2.13.2.1</fasterxml.jackson.databind.version> <fest-reflect.version>1.4</fest-reflect.version> <geronimo-jms.version>1.1.1</geronimo-jms.version> - <gson.version>2.2.2</gson.version> + <gson.version>2.9.1</gson.version> <guava.version>18.0</guava.version> <guava-old.version>11.0.2</guava-old.version> <hadoop.version>2.10.1</hadoop.version> @@ -149,6 +149,7 @@ limitations under the License. <module>flume-shared</module> <module>flume-ng-configfilters</module> <module>build-support</module> + <module>flume-spring-boot</module> </modules> <profiles>
