Repository: storm Updated Branches: refs/heads/master e8e1a4e8f -> 21832ad40
STORM-2971: Replace storm-kafka with storm-kafka-client in Flux examples, fix Flux bug where setter argument types are not checked or coerced Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/288d97cb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/288d97cb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/288d97cb Branch: refs/heads/master Commit: 288d97cb5497f27b69b2fb126c2b0097cd02605a Parents: a38f0c5 Author: Stig Rohde Døssing <s...@apache.org> Authored: Sun Feb 25 16:10:35 2018 +0100 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Thu Mar 1 08:16:21 2018 +0100 ---------------------------------------------------------------------- flux/README.md | 137 ++++++++----------- flux/flux-core/pom.xml | 2 +- .../java/org/apache/storm/flux/FluxBuilder.java | 27 ++-- .../flux/test/OnlyValueRecordTranslator.java | 37 +++++ .../storm/flux/test/TridentTopologySource.java | 1 - .../src/test/resources/configs/kafka_test.yaml | 61 ++++----- flux/flux-examples/README.md | 2 +- flux/flux-examples/pom.xml | 6 +- .../examples/OnlyValueRecordTranslator.java | 37 +++++ .../src/main/resources/kafka_spout.yaml | 71 ++++------ 10 files changed, 200 insertions(+), 181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/README.md ---------------------------------------------------------------------- diff --git a/flux/README.md b/flux/README.md index 5aa76ae..58ed25d 100644 --- a/flux/README.md +++ b/flux/README.md @@ -57,7 +57,7 @@ the layout and configuration of your topologies. in your topology code * Support for existing topology code (see below) * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL - * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.) + * YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.) * Convenient support for multi-lang components * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style `${variable.name}` substitution) @@ -354,19 +354,20 @@ storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux --local my_co With the following `dev.properties` file: ```properties -kafka.zookeeper.hosts: localhost:2181 +kafka.bootstrap.hosts: localhost:9092 ``` You would then be able to reference those properties by key in your `.yaml` file using `${}` syntax: ```yaml - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - "${kafka.zookeeper.hosts}" + - "${kafka.bootstrap.hosts}" + - ["myKafkaTopic"] ``` -In this case, Flux would replace `${kafka.zookeeper.hosts}` with `localhost:2181` before parsing the YAML contents. +In this case, Flux would replace `${kafka.bootstrap.hosts}` with `localhost:9092` before parsing the YAML contents. ### Environment Variable Substitution/Filtering Flux also allows environment variable substitution. For example, if an environment variable named `ZK_HOSTS` if defined, @@ -378,16 +379,16 @@ ${ENV-ZK_HOSTS} ## Components Components are essentially named object instances that are made available as configuration options for spouts and -bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans. +bolts. If you are familiar with the Spring framework, components are roughly analogous to Spring beans. Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example, -the following will make an instance of the `org.apache.storm.kafka.StringScheme` class available as a reference under the key -`"stringScheme"` . This assumes the `org.apache.storm.kafka.StringScheme` has a default constructor. +the following will make an instance of the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` class available as a reference under the key +`"recordTranslator"` . This assumes the `org.apache.storm.flux.examples.OnlyValueRecordTranslator` has a default constructor. ```yaml components: - - id: "stringScheme" - className: "org.apache.storm.kafka.StringScheme" + - id: "recordTranslator" + className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator" ``` ### Contructor Arguments, References, Properties and Configuration Methods @@ -395,32 +396,36 @@ components: ####Constructor Arguments Arguments to a class constructor can be configured by adding a `contructorArgs` element to a components. `constructorArgs` is a list of objects that will be passed to the class' constructor. The following example creates an -object by calling the constructor that takes a single string as an argument: +object by calling the constructor that takes a string and an array of strings as arguments: ```yaml - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - "localhost:2181" - - true + - "${kafka.bootstrap.hosts}" + - ["myKafkaTopic"] ``` ####References Each component instance is identified by a unique id that allows it to be used/reused by other components. To reference an existing component, you specify the id of the component with the `ref` tag. -In the following example, a component with the id `"stringScheme"` is created, and later referenced, as a an argument +In the following example, a component with the id `"recordTranslator"` is created, and later referenced, as a an argument to another component's constructor: ```yaml components: - - id: "stringScheme" - className: "org.apache.storm.kafka.StringScheme" + - id: "recordTranslator" + className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator" - - id: "stringMultiScheme" - className: "org.apache.storm.spout.SchemeAsMultiScheme" + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - ref: "stringScheme" # component with id "stringScheme" must be declared above. + - "localhost:9092" + - ["myKafkaTopic"] + properties: + - name: "recordTranslator" + ref: "onlyValueRecordTranslator" #Component with id "recordTranslator" must be declared above ``` You can also reference existing components in list via specifying the id of the components with the `reflist` tag. @@ -448,27 +453,20 @@ In addition to calling constructors with different arguments, Flux also allows y JavaBean-like setter methods and fields declared as `public`: ```yaml - - id: "spoutConfig" - className: "org.apache.storm.kafka.SpoutConfig" + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - # brokerHosts - - ref: "zkHosts" - # topic - - "myKafkaTopic" - # zkRoot - - "/kafkaSpout" - # id - - "myId" + - "localhost:9092" + - ["myKafkaTopic"] properties: - - name: "ignoreZkOffsets" - value: true - - name: "scheme" - ref: "stringMultiScheme" + - name: "pollTimeoutMs" + value: 5000 ``` -In the example above, the `properties` declaration will cause Flux to look for a public method in the `SpoutConfig` with -the signature `setForceFromStart(boolean b)` and attempt to invoke it. If a setter method is not found, Flux will then -look for a public instance variable with the name `ignoreZkOffsets` and attempt to set its value. +In the example above, the `properties` declaration will cause Flux to look for a public method in the `KafkaSpoutConfig$Builder` with +the signature `setPollTimeoutMs(int i)` and attempt to invoke it. If a setter method is not found, Flux will then +look for a public instance variable with the name `pollTimeoutMs` and attempt to set its value. +Note that Flux will attempt to coerce actual parameter types to fit the setter parameter types, so e.g. calling `setMyFloat(float f)` with `value: 10` is possible. References may also be used as property values. @@ -623,46 +621,31 @@ Kafka spout example: ```yaml components: - - id: "stringScheme" - className: "org.apache.storm.kafka.StringScheme" - - - id: "stringMultiScheme" - className: "org.apache.storm.spout.SchemeAsMultiScheme" - constructorArgs: - - ref: "stringScheme" - - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" + - id: "onlyValueRecordTranslator" + className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator" + + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - "localhost:2181" - -# Alternative kafka config -# - id: "kafkaConfig" -# className: "org.apache.storm.kafka.KafkaConfig" -# constructorArgs: -# # brokerHosts -# - ref: "zkHosts" -# # topic -# - "myKafkaTopic" -# # clientId (optional) -# - "myKafkaClientId" - + - "localhost:9092" + - ["myKafkaTopic"] + properties: + - name: "firstPollOffsetStrategy" + value: EARLIEST + - name: "recordTranslator" + ref: "onlyValueRecordTranslator" + configMethods: + - name: "setProp" + args: + - { + "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" + } + - id: "spoutConfig" - className: "org.apache.storm.kafka.SpoutConfig" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig" constructorArgs: - # brokerHosts - - ref: "zkHosts" - # topic - - "myKafkaTopic" - # zkRoot - - "/kafkaSpout" - # id - - "myId" - properties: - - name: "ignoreZkOffsets" - value: true - - name: "scheme" - ref: "stringMultiScheme" + - ref: "spoutConfigBuilder" config: topology.workers: 1 @@ -670,7 +653,7 @@ config: # spout definitions spouts: - id: "kafka-spout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.storm.kafka.spout.KafkaSpout" constructorArgs: - ref: "spoutConfig" http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/pom.xml ---------------------------------------------------------------------- diff --git a/flux/flux-core/pom.xml b/flux/flux-core/pom.xml index acdf805..d29e619 100644 --- a/flux/flux-core/pom.xml +++ b/flux/flux-core/pom.xml @@ -38,7 +38,7 @@ </dependency> <dependency> <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> + <artifactId>storm-kafka-client</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java ---------------------------------------------------------------------- diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 20bde18..e511add 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -277,9 +278,10 @@ public class FluxBuilder { Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue(); Method setter = findSetter(clazz, prop.getName(), value); if (setter != null) { - LOG.debug("found setter, attempting to invoke"); + Object[] methodArgs = getArgsWithListCoercion(Collections.singletonList(value), setter.getParameterTypes()); + LOG.debug("found setter, attempting to invoke with {}", methodArgs); // invoke setter - setter.invoke(instance, new Object[]{value}); + setter.invoke(instance, methodArgs); } else { // look for a public instance variable LOG.debug("no setter found. Looking for a public instance variable..."); @@ -299,15 +301,20 @@ public class FluxBuilder { private static Method findSetter(Class clazz, String property, Object arg) { String setterName = toSetterName(property); - Method retval = null; Method[] methods = clazz.getMethods(); + LOG.debug("Target setter: {}, arg: {}", setterName, arg); for (Method method : methods) { if (setterName.equals(method.getName())) { - LOG.debug("Found setter method: " + method.getName()); - retval = method; + Class<?>[] parameterTypes = method.getParameterTypes(); + LOG.debug("Found setter method: {}, parameter types: {}", method.getName(), parameterTypes); + boolean invokable = canInvokeWithArgs(Collections.singletonList(arg), method.getParameterTypes()); + LOG.debug("** invokable --> {}", invokable); + if (invokable) { + return method; + } } } - return retval; + return null; } private static String toSetterName(String name) { @@ -350,7 +357,7 @@ public class FluxBuilder { Constructor con = findCompatibleConstructor(constructorArgs, clazz); if (con != null) { LOG.debug("Found something seemingly compatible, attempting invocation..."); - obj = con.newInstance(getArgsWithListCoercian(constructorArgs, con.getParameterTypes())); + obj = con.newInstance(getArgsWithListCoercion(constructorArgs, con.getParameterTypes())); } else { String msg = String.format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.", clazz.getName(), @@ -368,7 +375,7 @@ public class FluxBuilder { } method = findCompatibleMethod(methodArgs, clazz, def.getFactory()); if (method != null) { - obj = method.invoke(null, getArgsWithListCoercian(methodArgs, method.getParameterTypes())); + obj = method.invoke(null, getArgsWithListCoercion(methodArgs, method.getParameterTypes())); } else { String msg = String.format("Couldn't find a suitable static method '%s' for class '%s' with arguments '%s'.", def.getFactory(), @@ -530,7 +537,7 @@ public class FluxBuilder { String methodName = methodDef.getName(); Method method = findCompatibleMethod(args, clazz, methodName); if (method != null) { - Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes()); + Object[] methodArgs = getArgsWithListCoercion(args, method.getParameterTypes()); method.invoke(instance, methodArgs); } else { String msg = String.format("Unable to find configuration method '%s' in class '%s' with arguments %s.", @@ -580,7 +587,7 @@ public class FluxBuilder { * list to an java.lang.Object array that can be used to invoke the constructor. If an argument needs * to be coerced from a List to an Array, do so. */ - private static Object[] getArgsWithListCoercian(List<Object> args, Class[] parameterTypes) { + private static Object[] getArgsWithListCoercion(List<Object> args, Class[] parameterTypes) { if (parameterTypes.length != args.size()) { throw new IllegalArgumentException("Contructor parameter count does not egual argument size."); } http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java ---------------------------------------------------------------------- diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java new file mode 100644 index 0000000..1d48c22 --- /dev/null +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/OnlyValueRecordTranslator.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.flux.test; + +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> { + + @Override + public List<Object> apply(ConsumerRecord<K, V> record) { + return new Values(record.value()); + } + + @Override + public Fields getFieldsFor(String stream) { + return new Fields("value"); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java ---------------------------------------------------------------------- diff --git a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java index 36b272b..b39d771 100644 --- a/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java +++ b/flux/flux-core/src/test/java/org/apache/storm/flux/test/TridentTopologySource.java @@ -21,7 +21,6 @@ import org.apache.storm.Config; import org.apache.storm.generated.StormTopology; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import org.apache.storm.kafka.StringScheme; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-core/src/test/resources/configs/kafka_test.yaml ---------------------------------------------------------------------- diff --git a/flux/flux-core/src/test/resources/configs/kafka_test.yaml b/flux/flux-core/src/test/resources/configs/kafka_test.yaml index 1fb59ca..76d2ee8 100644 --- a/flux/flux-core/src/test/resources/configs/kafka_test.yaml +++ b/flux/flux-core/src/test/resources/configs/kafka_test.yaml @@ -25,46 +25,31 @@ name: "kafka-topology" # # for the time being, components must be declared in the order they are referenced components: - - id: "stringScheme" - className: "org.apache.storm.kafka.StringScheme" - - - id: "stringMultiScheme" - className: "org.apache.storm.spout.SchemeAsMultiScheme" - constructorArgs: - - ref: "stringScheme" - - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" + - id: "onlyValueRecordTranslator" + className: "org.apache.storm.flux.test.OnlyValueRecordTranslator" + + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - "localhost:2181" - -# Alternative kafka config -# - id: "kafkaConfig" -# className: "org.apache.storm.kafka.KafkaConfig" -# constructorArgs: -# # brokerHosts -# - ref: "zkHosts" -# # topic -# - "myKafkaTopic" -# # clientId (optional) -# - "myKafkaClientId" - + - "localhost:9092" + - ["myKafkaTopic"] + properties: + - name: "firstPollOffsetStrategy" + value: EARLIEST + - name: "recordTranslator" + ref: "onlyValueRecordTranslator" + configMethods: + - name: "setProp" + args: + - { + "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" + } + - id: "spoutConfig" - className: "org.apache.storm.kafka.SpoutConfig" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig" constructorArgs: - # brokerHosts - - ref: "zkHosts" - # topic - - "myKafkaTopic" - # zkRoot - - "/kafkaSpout" - # id - - "myId" - properties: - - name: "ignoreZkOffsets" - value: true - - name: "scheme" - ref: "stringMultiScheme" + - ref: "spoutConfigBuilder" # topology configuration # this will be passed to the submitter as a map of config options @@ -76,7 +61,7 @@ config: # spout definitions spouts: - id: "kafka-spout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.storm.kafka.spout.KafkaSpout" constructorArgs: - ref: "spoutConfig" http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/README.md ---------------------------------------------------------------------- diff --git a/flux/flux-examples/README.md b/flux/flux-examples/README.md index 3d610b4..ff2b2ac 100644 --- a/flux/flux-examples/README.md +++ b/flux/flux-examples/README.md @@ -40,7 +40,7 @@ written in java. ### [kafka_spout.yaml](src/main/resources/kafka_spout.yaml) -This example illustrates how to configure Storm's `storm-kafka` spout using Flux YAML DSL `components`, `references`, +This example illustrates how to configure Storm's `storm-kafka-client` spout using Flux YAML DSL `components`, `references`, and `constructor arguments` constructs. ### [simple_hdfs.yaml](src/main/resources/simple_hdfs.yaml) http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/pom.xml ---------------------------------------------------------------------- diff --git a/flux/flux-examples/pom.xml b/flux/flux-examples/pom.xml index a9d9c1e..dca5a83 100644 --- a/flux/flux-examples/pom.xml +++ b/flux/flux-examples/pom.xml @@ -94,13 +94,9 @@ </dependency> <dependency> <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka</artifactId> + <artifactId>storm-kafka-client</artifactId> <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>${storm.kafka.artifact.id}</artifactId> - </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java ---------------------------------------------------------------------- diff --git a/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java new file mode 100644 index 0000000..f35b6eb --- /dev/null +++ b/flux/flux-examples/src/main/java/org/apache/storm/flux/examples/OnlyValueRecordTranslator.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.flux.examples; + +import java.util.List; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.storm.kafka.spout.RecordTranslator; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +public class OnlyValueRecordTranslator<K, V> implements RecordTranslator<K, V> { + + @Override + public List<Object> apply(ConsumerRecord<K, V> record) { + return new Values(record.value()); + } + + @Override + public Fields getFieldsFor(String stream) { + return new Fields("value"); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/288d97cb/flux/flux-examples/src/main/resources/kafka_spout.yaml ---------------------------------------------------------------------- diff --git a/flux/flux-examples/src/main/resources/kafka_spout.yaml b/flux/flux-examples/src/main/resources/kafka_spout.yaml index 7533ce4..37f14f1 100644 --- a/flux/flux-examples/src/main/resources/kafka_spout.yaml +++ b/flux/flux-examples/src/main/resources/kafka_spout.yaml @@ -13,9 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - -# Test ability to wire together shell spouts/bolts --- # topology definition @@ -28,51 +25,31 @@ name: "kafka-topology" # # for the time being, components must be declared in the order they are referenced components: - - id: "stringScheme" - className: "org.apache.storm.kafka.StringScheme" - - - id: "stringMultiScheme" - className: "org.apache.storm.spout.SchemeAsMultiScheme" + - id: "onlyValueRecordTranslator" + className: "org.apache.storm.flux.examples.OnlyValueRecordTranslator" + + - id: "spoutConfigBuilder" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder" constructorArgs: - - ref: "stringScheme" - - - id: "zkHosts" - className: "org.apache.storm.kafka.ZkHosts" - constructorArgs: - - "localhost:2181" - -# Alternative kafka config -# - id: "kafkaConfig" -# className: "org.apache.storm.kafka.KafkaConfig" -# constructorArgs: -# # brokerHosts -# - ref: "zkHosts" -# # topic -# - "myKafkaTopic" -# # clientId (optional) -# - "myKafkaClientId" - + - "localhost:9092" + - ["myKafkaTopic"] + properties: + - name: "firstPollOffsetStrategy" + value: EARLIEST + - name: "recordTranslator" + ref: "onlyValueRecordTranslator" + configMethods: + - name: "setProp" + args: + - { + "key.deserializer" : "org.apache.kafka.common.serialization.StringDeserializer", + "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" + } + - id: "spoutConfig" - className: "org.apache.storm.kafka.SpoutConfig" + className: "org.apache.storm.kafka.spout.KafkaSpoutConfig" constructorArgs: - # brokerHosts - - ref: "zkHosts" - # topic - - "myKafkaTopic" - # zkRoot - - "/kafkaSpout" - # id - - "myId" - properties: - - name: "ignoreZkOffsets" - value: true - - name: "scheme" - ref: "stringMultiScheme" - - - -# NOTE: We may want to consider some level of spring integration. For example, allowing component references -# to a spring `ApplicationContext`. + - ref: "spoutConfigBuilder" # topology configuration # this will be passed to the submitter as a map of config options @@ -84,7 +61,7 @@ config: # spout definitions spouts: - id: "kafka-spout" - className: "org.apache.storm.kafka.KafkaSpout" + className: "org.apache.storm.kafka.spout.KafkaSpout" constructorArgs: - ref: "spoutConfig" @@ -98,7 +75,6 @@ bolts: # output fields - ["word"] parallelism: 1 - # ... - id: "log" className: "org.apache.storm.flux.wrappers.bolts.LogInfoBolt" @@ -108,7 +84,6 @@ bolts: - id: "count" className: "org.apache.storm.testing.TestWordCounter" parallelism: 1 - # ... #stream definitions # stream definitions define connections between spouts and bolts.