This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch 1423-update-archetypes in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit dfeaa1298cc5d26453e5f5765de0eb8a1e3eea1f Author: Dominik Riemer <[email protected]> AuthorDate: Sat Mar 18 15:10:51 2023 +0100 Update maven archetypes (#1423) --- .../src/main/resources/archetype-resources/pom.xml | 12 ++-- .../archetype-resources/src/main/java/Init.java | 6 +- .../src/main/resources/archetype-resources/pom.xml | 61 ++--------------- .../archetype-resources/src/main/java/Init.java | 10 ++- .../__classNamePrefix__Controller.java | 2 +- .../__classNamePrefix__Program.java | 2 +- .../src/main/resources/archetype-resources/pom.xml | 56 +--------------- .../archetype-resources/src/main/java/Init.java | 52 +++++++++------ .../src/main/java/config/Config.java | 76 ---------------------- .../src/main/java/config/ConfigKeys.java | 15 +++-- .../__classNamePrefix__Controller.java | 11 ++-- .../__classNamePrefix__Program.java | 23 +++++-- 12 files changed, 88 insertions(+), 238 deletions(-) diff --git a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml index 95139da33..3eefc3820 100644 --- a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/pom.xml @@ -44,6 +44,11 @@ <artifactId>streampipes-sources</artifactId> <version>${sp.version}</version> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>2.0.6</version> + </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> @@ -62,7 +67,7 @@ <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> - <version>2.6.2</version> + <version>3.0.1</version> <executions> <execution> <goals> @@ -77,10 +82,9 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> + <version>3.10.1</version> <configuration> - <source>1.8</source> - <target>1.8</target> + <release>17</release> <encoding>UTF-8</encoding> </configuration> </plugin> diff --git a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java index 4863966d1..777e6dc31 100644 --- a/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-extensions-jvm/src/main/resources/archetype-resources/src/main/java/Init.java @@ -21,16 +21,16 @@ #set( $symbol_escape = '\' ) package ${package}; -import org.apache.streampipes.container.extensions.ExtensionsModelSubmitter; -import org.apache.streampipes.container.model.SpServiceDefinition; -import org.apache.streampipes.container.model.SpServiceDefinitionBuilder; import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; +import org.apache.streampipes.extensions.management.model.SpServiceDefinition; +import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory; import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory; +import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter; import ${package}.pe.${packageName}.${classNamePrefix}DataProcessor; import ${package}.pe.${packageName}.${classNamePrefix}DataSink; diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml index 1a82a1561..0efd4fd92 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/pom.xml @@ -31,22 +31,7 @@ <dependencies> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-container-standalone</artifactId> - <version>${sp.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-commons</artifactId> + <artifactId>streampipes-service-extensions</artifactId> <version>${sp.version}</version> <exclusions> <exclusion> @@ -92,48 +77,13 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> - <version>1.7.24</version> + <version>2.0.6</version> </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-config</artifactId> <version>${sp.version}</version> </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-json</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-cbor</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-smile</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-fst</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-jms</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-kafka</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-mqtt</artifactId> - <version>${sp.version}</version> - </dependency> </dependencies> <build> @@ -146,7 +96,7 @@ <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> - <version>2.4.1</version> + <version>3.0.1</version> </dependency> </dependencies> <executions> @@ -202,10 +152,9 @@ <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> - <version>3.1</version> + <version>3.10.1</version> <configuration> - <source>1.8</source> - <target>1.8</target> + <release>17</release> <encoding>UTF-8</encoding> </configuration> </plugin> diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java index 8b05fdfe3..dc52194fb 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/Init.java @@ -21,11 +21,6 @@ package ${package}; -import org.apache.streampipes.container.init.DeclarersSingleton; -import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter; -import org.apache.streampipes.container.model.SpServiceDefinition; -import org.apache.streampipes.container.model.SpServiceDefinitionBuilder; - import ${package}.config.ConfigKeys; import ${package}.pe.processor.${packageName}.${classNamePrefix}Controller; @@ -34,11 +29,14 @@ import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; +import org.apache.streampipes.extensions.management.model.SpServiceDefinition; +import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory; import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory; +import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter; -public class Init extends StandaloneModelSubmitter { +public class Init extends ExtensionsModelSubmitter { public static void main(String[] args) throws Exception { new Init().init(); diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java index 981a6a8c9..dc810a0ce 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Controller.java @@ -21,10 +21,10 @@ package ${package}.pe.processor.${packageName}; +import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder; import org.apache.streampipes.sdk.utils.Datatypes; import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.container.config.ConfigExtractor; import org.apache.streampipes.model.DataProcessorType; import org.apache.streampipes.model.graph.DataProcessorDescription; import org.apache.streampipes.model.graph.DataProcessorInvocation; diff --git a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java index 929e3a87a..fcad195dc 100644 --- a/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java +++ b/archetypes/streampipes-archetype-pe-processors-flink/src/main/resources/archetype-resources/src/main/java/pe/processor/__packageName__/__classNamePrefix__Program.java @@ -26,7 +26,7 @@ import ${package}.config.ConfigKeys; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.streampipes.client.StreamPipesClient; -import org.apache.streampipes.container.config.ConfigExtractor; +import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml index 0c29e2aa1..7890d8f4e 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/pom.xml @@ -31,22 +31,7 @@ <dependencies> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-container-standalone</artifactId> - <version>${sp.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-commons</artifactId> + <artifactId>streampipes-service-extensions</artifactId> <version>${sp.version}</version> <exclusions> <exclusion> @@ -81,7 +66,7 @@ </dependency> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-sdk</artifactId> + <artifactId>streampipes-sdk-bundle</artifactId> <version>${sp.version}</version> </dependency> <dependency> @@ -92,48 +77,13 @@ <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> - <version>1.7.24</version> + <version>2.0.6</version> </dependency> <dependency> <groupId>org.apache.streampipes</groupId> <artifactId>streampipes-config</artifactId> <version>${sp.version}</version> </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-json</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-cbor</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-smile</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-dataformat-fst</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-jms</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-kafka</artifactId> - <version>${sp.version}</version> - </dependency> - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-messaging-mqtt</artifactId> - <version>${sp.version}</version> - </dependency> </dependencies> <build> diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java index 39235cb3d..ef8c40914 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/Init.java @@ -21,36 +21,48 @@ #set( $symbol_escape = '\' ) package ${package}; -import org.apache.streampipes.container.init.DeclarersSingleton; -import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter; -import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; -import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory; import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory; import org.apache.streampipes.dataformat.fst.FstDataFormatFactory; +import org.apache.streampipes.dataformat.json.JsonDataFormatFactory; import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory; +import org.apache.streampipes.extensions.management.model.SpServiceDefinition; +import org.apache.streampipes.extensions.management.model.SpServiceDefinitionBuilder; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; +import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory; import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory; +import org.apache.streampipes.service.extensions.ExtensionsModelSubmitter; -import ${package}.config.Config; +import ${package}.config.ConfigKeys; import ${package}.pe.sink.${packageName}.${classNamePrefix}Controller; -public class Init extends StandaloneModelSubmitter { +public class Init extends ExtensionsModelSubmitter { - public static void main(String[] args) { - DeclarersSingleton.getInstance() - .add(new ${classNamePrefix}Controller()); - - DeclarersSingleton.getInstance().registerDataFormats( - new JsonDataFormatFactory(), - new CborDataFormatFactory(), - new SmileDataFormatFactory(), - new FstDataFormatFactory()); + public static void main(String[] args) throws Exception { + new Init().init(); + } - DeclarersSingleton.getInstance().registerProtocols( + @Override + public SpServiceDefinition provideServiceDefinition() { + return SpServiceDefinitionBuilder.create("${package}", + "Apache Flink sink", + "", + 8090) + .registerPipelineElement(new ${classNamePrefix}Controller()) + .registerMessagingFormats( + new JsonDataFormatFactory(), + new CborDataFormatFactory(), + new SmileDataFormatFactory(), + new FstDataFormatFactory()) + .registerMessagingProtocols( new SpKafkaProtocolFactory(), - new SpMqttProtocolFactory(), - new SpJmsProtocolFactory()); - - new Init().init(Config.INSTANCE); + new SpJmsProtocolFactory(), + new SpMqttProtocolFactory()) + .addConfig(ConfigKeys.FLINK_HOST, "jobmanager", "Hostname of the Flink Jobmanager") + .addConfig(ConfigKeys.FLINK_PORT, 8081, "Port of the Flink Jobmanager") + .addConfig(ConfigKeys.DEBUG, false, "Debug/Mini cluster mode of Flink program") + .addConfig(ConfigKeys.FLINK_JAR_FILE_LOC, "./streampipes-processing-element-container.jar", "Jar file location") + .addConfig(ConfigKeys.SERVICE_NAME, "sp fft stream analytics metrics", "Data processor service name") + .addConfig(ConfigKeys.HOST, "${artifactId}", "Data processor host") + .build(); } } diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java deleted file mode 100644 index 16e02a446..000000000 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/Config.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#set( $symbol_pound = '#' ) -#set( $symbol_dollar = '$' ) -#set( $symbol_escape = '\' ) -#set( $svc_name = $package.getClass().forName("org.apache.velocity.util.StringUtils").sub("$artifactId", "-", " ") ) -package ${package}.config; - -import org.apache.streampipes.config.SpConfig; -import org.apache.streampipes.container.model.PeConfig; - -public enum Config implements PeConfig { - INSTANCE; - - private SpConfig config; - public static final String JAR_FILE = "./streampipes-processing-element-container.jar"; - private final static String SERVICE_ID = "pe/${package}.sink.flink"; - - Config() { - config = SpConfig.getSpConfig(SERVICE_ID); - config.register(ConfigKeys.HOST, "${artifactId}", "Data sink host"); - config.register(ConfigKeys.PORT, 8090, "Data sink port"); - config.register(ConfigKeys.SERVICE_NAME, "${svc_name}", "Data sink service name"); - config.register(ConfigKeys.FLINK_HOST, "jobmanager", "Flink jobmanager host"); - config.register(ConfigKeys.FLINK_PORT, 8081, "Flink jobmanager port"); - config.register(ConfigKeys.FLINK_DEBUG, false, "When set to true programs are not deployed to cluster, but executed locally"); - } - - public String getFlinkHost() { - return config.getString(ConfigKeys.FLINK_HOST); - } - - public int getFlinkPort() { - return config.getInteger(ConfigKeys.FLINK_PORT); - } - - public boolean getFlinkDebug() { - return config.getBoolean(ConfigKeys.FLINK_DEBUG); - } - - @Override - public String getHost() { - return config.getString(ConfigKeys.HOST); - } - - @Override - public int getPort() { - return config.getInteger(ConfigKeys.PORT); - } - - @Override - public String getId() { - return SERVICE_ID; - } - - @Override - public String getName() { - return config.getString(ConfigKeys.SERVICE_NAME); - } - -} diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java index e307f0da9..a828f4810 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/config/ConfigKeys.java @@ -22,10 +22,11 @@ package ${package}.config; public class ConfigKeys { - final static String HOST = "SP_HOST"; - final static String PORT = "SP_PORT"; - final static String SERVICE_NAME = "SP_SERVICE_NAME"; - final static String FLINK_HOST = "SP_FLINK_HOST"; - final static String FLINK_PORT = "SP_FLINK_PORT"; - final static String FLINK_DEBUG = "SP_FLINK_DEBUG"; -} \ No newline at end of file + public final static String HOST = "SP_HOST"; + public final static String PORT = "SP_PORT"; + public final static String SERVICE_NAME = "SP_SERVICE_NAME"; + public final static String FLINK_HOST = "SP_FLINK_HOST"; + public final static String FLINK_PORT = "SP_FLINK_PORT"; + public static final String DEBUG = "SP_FLINK_DEBUG"; + public static final String FLINK_JAR_FILE_LOC = "SP_FLINK_JAR_FILE_LOC"; +} diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java index b0bf3ee6b..eaae81d25 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Controller.java @@ -21,6 +21,8 @@ package ${package}.pe.sink.${packageName}; import ${package}.config.Config; +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.model.DataSinkType; import org.apache.streampipes.model.graph.DataSinkDescription; import org.apache.streampipes.model.graph.DataSinkInvocation; @@ -29,8 +31,6 @@ import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder; import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor; import org.apache.streampipes.sdk.helpers.EpRequirements; import org.apache.streampipes.sdk.helpers.Labels; -import org.apache.streampipes.sdk.helpers.SupportedFormats; -import org.apache.streampipes.sdk.helpers.SupportedProtocols; import org.apache.streampipes.wrapper.flink.FlinkDataSinkDeclarer; import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime; import org.apache.streampipes.sdk.helpers.*; @@ -59,13 +59,16 @@ public class ${classNamePrefix}Controller extends FlinkDataSinkDeclarer<${classN } @Override - public FlinkDataSinkRuntime<${classNamePrefix}Parameters> getRuntime(DataSinkInvocation graph, DataSinkParameterExtractor extractor) { + public FlinkDataSinkRuntime<${classNamePrefix}Parameters> getRuntime(DataSinkInvocation graph, + DataSinkParameterExtractor extractor, + ConfigExtractor configExtractor, + StreamPipesClient streamPipesClient) { String host = extractor.singleValueParameter(HOST_KEY, String.class); int port = extractor.singleValueParameter(PORT_KEY, Integer.class); String password = extractor.secretValue(PASSWORD_KEY); ${classNamePrefix}Parameters params = new ${classNamePrefix}Parameters(graph, host, port, password); - return new ${classNamePrefix}Program(params, Config.INSTANCE.getFlinkDebug()); + return new ${classNamePrefix}Program(params, configExtractor, streamPipesClient); } } diff --git a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java index 3fedc44d7..621a810b6 100644 --- a/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java +++ b/archetypes/streampipes-archetype-pe-sinks-flink/src/main/resources/archetype-resources/src/main/java/pe/sink/__packageName__/__classNamePrefix__Program.java @@ -21,13 +21,17 @@ #set( $symbol_escape = '\' ) package ${package}.pe.sink.${packageName}; -import ${package}.config.Config; +import ${package}.config.ConfigKeys; -import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.extensions.management.config.ConfigExtractor; import org.apache.streampipes.model.runtime.Event; +import org.apache.streampipes.svcdiscovery.api.SpConfig; import org.apache.streampipes.wrapper.flink.FlinkDataSinkRuntime; import org.apache.streampipes.wrapper.flink.FlinkDeploymentConfig; +import org.apache.flink.streaming.api.datastream.DataStream; + import java.io.Serializable; public class ${classNamePrefix}Program extends FlinkDataSinkRuntime<${classNamePrefix}Parameters> @@ -36,15 +40,20 @@ implements Serializable { private static final long serialVersionUID = 1L; private final ${classNamePrefix}Parameters params; - public ${classNamePrefix}Program(${classNamePrefix}Parameters params, boolean debug) { - super(params, debug); + public ${classNamePrefix}Program(${classNamePrefix}Parameters params, + ConfigExtractor configExtractor, + StreamPipesClient streamPipesClient) { + super(params, configExtractor, streamPipesClient); this.params = params; } @Override - protected FlinkDeploymentConfig getDeploymentConfig() { - return new FlinkDeploymentConfig(Config.JAR_FILE, - Config.INSTANCE.getFlinkHost(), Config.INSTANCE.getFlinkPort()); + protected FlinkDeploymentConfig getDeploymentConfig(ConfigExtractor configExtractor) { + SpConfig config = configExtractor.getConfig(); + return new FlinkDeploymentConfig(config.getString(ConfigKeys.FLINK_JAR_FILE_LOC), + config.getString(ConfigKeys.FLINK_HOST), + config.getInteger(ConfigKeys.FLINK_PORT), + config.getBoolean(ConfigKeys.DEBUG)); } @Override
