This is an automated email from the ASF dual-hosted git repository. rgoers pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/flume.git
commit 37273bc677ca14692b48dc3c3460a6ef58318377 Author: Ralph Goers <[email protected]> AuthorDate: Mon Oct 3 09:28:17 2022 -0700 FLUME-3440 - Add Spring Boot support --- .../{Application.java => FlumeApplication.java} | 10 ++--- .../boot/config/AbstractFlumeConfiguration.java | 32 +++++++++++++- .../flume/spring/boot/config/FlumeInitializer.java | 51 ++++++++++++++++++++++ .../FlumePackageProvider.java} | 21 +++++---- .../PackageProvider.java} | 17 +++----- .../boot/controller/FlumeMetricsController.java | 48 ++++++++++++++++++++ ...apache.flume.spring.boot.config.PackageProvider | 15 +++++++ .../src/main/resources/META-INF/spring.factories | 16 +++++++ .../AppTest.java => app/TestSpringFlume.java} | 24 ++++++++-- .../spring/{boot => app}/config/AppConfig.java | 9 ++-- .../spring/app/config/AppPackageProvider.java} | 21 ++++----- ...apache.flume.spring.boot.config.PackageProvider | 1 + .../src/test/resources/application.yml | 7 +++ flume-spring-boot/src/test/resources/bootstrap.yml | 3 ++ .../src/test/resources/log4j2-test.xml | 2 +- pom.xml | 1 + 16 files changed, 228 insertions(+), 50 deletions(-) diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/FlumeApplication.java similarity index 78% copy from flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java copy to flume-spring-boot/src/main/java/org/apache/flume/spring/boot/FlumeApplication.java index 4edbad5f5..d4a5146ab 100644 --- a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/FlumeApplication.java @@ -19,15 +19,13 @@ package org.apache.flume.spring.boot; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; /** - * + * The application to run. */ -@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"}) -@EnableConfigurationProperties -public class Application { +@SpringBootApplication +public class FlumeApplication { public static void main(String[] args) { - SpringApplication.run(Application.class, args); + SpringApplication.run(FlumeApplication.class, args); } } diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java index 0846e46dd..79db4521b 100644 --- a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/AbstractFlumeConfiguration.java @@ -21,6 +21,8 @@ import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.FlumeException; import org.apache.flume.Sink; +import org.apache.flume.SinkProcessor; +import org.apache.flume.SinkRunner; import org.apache.flume.Source; import org.apache.flume.SourceRunner; import org.apache.flume.channel.ChannelProcessor; @@ -62,12 +64,38 @@ public abstract class AbstractFlumeConfiguration { return SourceRunner.forSource(source); } - protected <T extends Sink> Sink configureSink(final String name, final Class<T> clazz, + protected <T extends Source> SourceRunner configureSource(final T source, + final ChannelSelector selector, final Map<String, String> params) { + source.setChannelProcessor(new ChannelProcessor(selector)); + return SourceRunner.forSource(source); + } + + protected <T extends SinkProcessor> T configureSinkProcessor(final Map<String, String> params, + final Class<T> clazz, final List<Sink> sinks) { + T processor; + try { + processor = clazz.newInstance(); + } catch (Exception ex) { + throw new FlumeException("Unable to create SinkProcessor of type: " + clazz.getName(), ex); + } + processor.setSinks(sinks); + Configurables.configure(processor, createContext(params)); + return processor; + } + + + protected SinkRunner createSinkRunner(SinkProcessor sinkProcessor) { + SinkRunner runner = new SinkRunner(sinkProcessor); + runner.setSink(sinkProcessor); + return runner; + } + + protected <T extends Sink> Sink configureSink(final String name, final Class<T> sinkClazz, final Channel channel, final Map<String, String> params) { T sink; try { - sink = clazz.newInstance(); + sink = sinkClazz.newInstance(); } catch (Exception ex) { throw new FlumeException("Unable to create sink " + name, ex); } diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumeInitializer.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumeInitializer.java new file mode 100644 index 000000000..ad98c6b02 --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumeInitializer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.config; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigRegistry; + +/** + * Dynamically add to the ApplicationContext. + */ +public class FlumeInitializer implements ApplicationContextInitializer { + private Logger logger = LoggerFactory.getLogger(FlumeInitializer.class); + + @Override + public void initialize(ConfigurableApplicationContext applicationContext) { + if (applicationContext instanceof AnnotationConfigRegistry) { + AnnotationConfigRegistry registry = (AnnotationConfigRegistry) applicationContext; + ServiceLoader<PackageProvider> serviceLoader = + ServiceLoader.load(PackageProvider.class, FlumeInitializer.class.getClassLoader()); + List<String> basePackages = new ArrayList<>(); + for (PackageProvider provider : serviceLoader) { + basePackages.addAll(provider.getPackages()); + } + logger.debug("Adding packages {} for component scanning", basePackages); + registry.scan(basePackages.toArray(new String[0])); + } else { + logger.warn("ApplicationContext is not an AnnotationConfigRegistry. Application loading will likely fail"); + } + } +} diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumePackageProvider.java similarity index 63% copy from flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java copy to flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumePackageProvider.java index 4edbad5f5..8cb91f8bb 100644 --- a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/FlumePackageProvider.java @@ -14,20 +14,19 @@ * See the license for the specific language governing permissions and * limitations under the license. */ +package org.apache.flume.spring.boot.config; -package org.apache.flume.spring.boot; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; +import java.util.Collections; +import java.util.List; /** - * + * Defines the Flume Spring package. */ -@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"}) -@EnableConfigurationProperties -public class Application { - public static void main(String[] args) { - SpringApplication.run(Application.class, args); +public class FlumePackageProvider implements PackageProvider { + private static final String PACKAGE = "org.apache.flume.spring.boot"; + + @Override + public List<String> getPackages() { + return Collections.singletonList(PACKAGE); } } diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/PackageProvider.java similarity index 63% copy from flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java copy to flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/PackageProvider.java index 4edbad5f5..4157c6705 100644 --- a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/config/PackageProvider.java @@ -14,20 +14,13 @@ * See the license for the specific language governing permissions and * limitations under the license. */ +package org.apache.flume.spring.boot.config; -package org.apache.flume.spring.boot; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; +import java.util.List; /** - * + * Retrieves a List of packages to include. */ -@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"}) -@EnableConfigurationProperties -public class Application { - public static void main(String[] args) { - SpringApplication.run(Application.class, args); - } +public interface PackageProvider { + List<String> getPackages(); } diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/controller/FlumeMetricsController.java b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/controller/FlumeMetricsController.java new file mode 100644 index 000000000..3f4642cd7 --- /dev/null +++ b/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/controller/FlumeMetricsController.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache license, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the license for the specific language governing permissions and + * limitations under the license. + */ +package org.apache.flume.spring.boot.controller; + +import java.lang.reflect.Type; +import java.util.Map; + +import org.apache.flume.instrumentation.util.JMXPollUtil; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +/** + * Retrieves Flume Metrics. + */ +@RestController +@ConditionalOnProperty(prefix = "flume", name = "metrics", havingValue = "http") +public class FlumeMetricsController { + + private final Type mapType = new MapTypeToken().getType(); + private final Gson gson = new Gson(); + + @GetMapping(value = "/metrics", produces = "application/json;charset=utf-8") + public String metrics() { + Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans(); + return gson.toJson(metricsMap, mapType); + } + + private static class MapTypeToken extends TypeToken<Map<String, Map<String, String>>> { + } +} diff --git a/flume-spring-boot/src/main/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider b/flume-spring-boot/src/main/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider new file mode 100644 index 000000000..ccfbf561e --- /dev/null +++ b/flume-spring-boot/src/main/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.flume.spring.boot.config.FlumePackageProvider \ No newline at end of file diff --git a/flume-spring-boot/src/main/resources/META-INF/spring.factories b/flume-spring-boot/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..6f7899871 --- /dev/null +++ b/flume-spring-boot/src/main/resources/META-INF/spring.factories @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.springframework.context.ApplicationContextInitializer=org.apache.flume.spring.boot.config.FlumeInitializer \ No newline at end of file diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/TestSpringFlume.java similarity index 64% rename from flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java rename to flume-spring-boot/src/test/java/org/apache/flume/spring/app/TestSpringFlume.java index af2654348..b30af8857 100644 --- a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/AppTest.java +++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/TestSpringFlume.java @@ -14,22 +14,36 @@ * See the license for the specific language governing permissions and * limitations under the license. */ -package org.apache.flume.spring.boot; +package org.apache.flume.spring.app; + +import java.net.URI; import org.apache.flume.node.MaterializedConfiguration; +import org.apache.flume.spring.boot.FlumeApplication; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.test.context.junit.jupiter.SpringExtension; /** * */ @ExtendWith(SpringExtension.class) -@SpringBootTest -public class AppTest { +@SpringBootTest(classes = {FlumeApplication.class}, + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class TestSpringFlume { + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; @Autowired MaterializedConfiguration configuration; @@ -39,5 +53,9 @@ public class AppTest { Assertions.assertThat(configuration).isNotNull(); Assertions.assertThat(configuration.getSinkRunners()).isNotNull(); Assertions.assertThat(configuration.getSinkRunners().size()).isEqualTo(1); + String uri = "http://localhost:" + port + "/metrics"; + ResponseEntity<String> response = restTemplate.getForEntity(URI.create(uri), String.class); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); } } diff --git a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppConfig.java similarity index 91% rename from flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java rename to flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppConfig.java index cc9534376..ac57a9985 100644 --- a/flume-spring-boot/src/test/java/org/apache/flume/spring/boot/config/AppConfig.java +++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppConfig.java @@ -14,12 +14,11 @@ * See the license for the specific language governing permissions and * limitations under the license. */ -package org.apache.flume.spring.boot.config; +package org.apache.flume.spring.app.config; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; import org.apache.flume.Sink; -import org.apache.flume.SinkProcessor; import org.apache.flume.SinkRunner; import org.apache.flume.SourceRunner; import org.apache.flume.channel.MemoryChannel; @@ -27,6 +26,7 @@ import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.sink.DefaultSinkProcessor; import org.apache.flume.sink.NullSink; import org.apache.flume.source.SequenceGeneratorSource; +import org.apache.flume.spring.boot.config.AbstractFlumeConfiguration; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -67,9 +67,8 @@ public class AppConfig extends AbstractFlumeConfiguration { @Bean public SinkRunner nullSink(Channel memoryChannel) { - SinkProcessor sinkProcessor = new DefaultSinkProcessor(); Sink sink = configureSink("null", NullSink.class, memoryChannel,null); - sinkProcessor.setSinks(listOf(sink)); - return new SinkRunner(sinkProcessor); + return createSinkRunner(configureSinkProcessor(null, DefaultSinkProcessor.class, + listOf(sink))); } } diff --git a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppPackageProvider.java similarity index 63% rename from flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java rename to flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppPackageProvider.java index 4edbad5f5..262dc4b16 100644 --- a/flume-spring-boot/src/main/java/org/apache/flume/spring/boot/Application.java +++ b/flume-spring-boot/src/test/java/org/apache/flume/spring/app/config/AppPackageProvider.java @@ -14,20 +14,21 @@ * See the license for the specific language governing permissions and * limitations under the license. */ +package org.apache.flume.spring.app.config; -package org.apache.flume.spring.boot; +import java.util.Collections; +import java.util.List; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.apache.flume.spring.boot.config.PackageProvider; /** - * + * Defines the Flume Spring package. */ -@SpringBootApplication(scanBasePackages = {"org.apache.flume.spring.boot"}) -@EnableConfigurationProperties -public class Application { - public static void main(String[] args) { - SpringApplication.run(Application.class, args); +public class AppPackageProvider implements PackageProvider { + private static final String PACKAGE = "org.apache.flume.spring.app"; + + @Override + public List<String> getPackages() { + return Collections.singletonList(PACKAGE); } } diff --git a/flume-spring-boot/src/test/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider b/flume-spring-boot/src/test/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider new file mode 100644 index 000000000..5eb81f252 --- /dev/null +++ b/flume-spring-boot/src/test/resources/META-INF/services/org.apache.flume.spring.boot.config.PackageProvider @@ -0,0 +1 @@ +org.apache.flume.spring.app.config.AppPackageProvider \ No newline at end of file diff --git a/flume-spring-boot/src/test/resources/application.yml b/flume-spring-boot/src/test/resources/application.yml index 8e2e2e5ed..7b5230923 100644 --- a/flume-spring-boot/src/test/resources/application.yml +++ b/flume-spring-boot/src/test/resources/application.yml @@ -1,5 +1,12 @@ +spring: + application: + name: flume-test + +server: + port: 41414 flume: + metrics: http sources: source1: totalEvents: 10000 diff --git a/flume-spring-boot/src/test/resources/bootstrap.yml b/flume-spring-boot/src/test/resources/bootstrap.yml new file mode 100644 index 000000000..e2905906c --- /dev/null +++ b/flume-spring-boot/src/test/resources/bootstrap.yml @@ -0,0 +1,3 @@ +flume: + app: + packages: "org.apache.flume.spring.boot.app" \ No newline at end of file diff --git a/flume-spring-boot/src/test/resources/log4j2-test.xml b/flume-spring-boot/src/test/resources/log4j2-test.xml index d912f3105..a98e44860 100644 --- a/flume-spring-boot/src/test/resources/log4j2-test.xml +++ b/flume-spring-boot/src/test/resources/log4j2-test.xml @@ -24,7 +24,7 @@ </Appenders> <Loggers> <Logger name="org.apache.flume" level="INFO" /> - <Root level="WARN"> + <Root level="INFO"> <AppenderRef ref="Console" /> </Root> </Loggers> diff --git a/pom.xml b/pom.xml index 416035d2a..aa659f5f7 100644 --- a/pom.xml +++ b/pom.xml @@ -557,6 +557,7 @@ limitations under the License. <excludes> <exclude>**/.idea/</exclude> <exclude>**/*.iml</exclude> + <exclude>src/main/resources/META-INF/services/**/*</exclude> <exclude>**/nb-configuration.xml</exclude> <exclude>.git/</exclude> <exclude>patchprocess/</exclude>
