[FLINK-6695] Activate strict checkstyle for flink-storm
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40cb093c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40cb093c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40cb093c Branch: refs/heads/master Commit: 40cb093cc02783fe1066241c9b8b9fa87414d8c2 Parents: 60721e0 Author: zentol <[email protected]> Authored: Tue May 23 22:41:09 2017 +0200 Committer: zentol <[email protected]> Committed: Thu Jun 1 11:14:11 2017 +0200 ---------------------------------------------------------------------- flink-contrib/flink-storm/pom.xml | 39 +++++++++++++ .../org/apache/flink/storm/api/FlinkClient.java | 60 ++++++++++---------- .../flink/storm/api/FlinkLocalCluster.java | 28 ++++----- .../storm/api/FlinkOutputFieldsDeclarer.java | 17 +++--- .../apache/flink/storm/api/FlinkSubmitter.java | 27 ++++----- .../apache/flink/storm/api/FlinkTopology.java | 37 ++++++------ .../flink/storm/api/StormFlinkStreamMerger.java | 5 +- .../flink/storm/api/TwoFlinkStreamsMerger.java | 5 +- .../apache/flink/storm/util/FiniteSpout.java | 2 +- .../flink/storm/util/NullTerminatingSpout.java | 11 ++-- .../flink/storm/util/SplitStreamMapper.java | 3 +- .../flink/storm/util/SplitStreamType.java | 1 + .../util/SpoutOutputCollectorObserver.java | 9 +-- .../apache/flink/storm/util/StormConfig.java | 7 ++- .../flink/storm/util/StormStreamSelector.java | 3 +- .../storm/wrappers/AbstractStormCollector.java | 7 ++- .../flink/storm/wrappers/BoltCollector.java | 11 ++-- .../flink/storm/wrappers/BoltWrapper.java | 44 +++++++------- .../storm/wrappers/FlinkTopologyContext.java | 2 +- .../storm/wrappers/MergedInputsBoltWrapper.java | 12 ++-- .../flink/storm/wrappers/SpoutCollector.java | 9 ++- .../flink/storm/wrappers/SpoutWrapper.java | 36 ++++++------ .../apache/flink/storm/wrappers/StormTuple.java | 2 +- .../storm/wrappers/WrapperSetupHelper.java | 22 +++---- .../api/FlinkOutputFieldsDeclarerTest.java | 9 ++- .../flink/storm/api/FlinkTopologyTest.java | 12 ++-- .../org/apache/flink/storm/api/TestBolt.java | 4 ++ .../org/apache/flink/storm/api/TestSpout.java | 4 ++ .../apache/flink/storm/util/AbstractTest.java | 3 + .../flink/storm/util/FiniteTestSpout.java | 3 + .../storm/util/NullTerminatingSpoutTest.java | 17 +++--- .../util/SpoutOutputCollectorObserverTest.java | 13 +++-- .../storm/util/StormStreamSelectorTest.java | 4 ++ .../apache/flink/storm/util/TestDummyBolt.java | 16 ++++-- .../apache/flink/storm/util/TestDummySpout.java | 8 ++- .../org/apache/flink/storm/util/TestSink.java | 12 ++-- .../flink/storm/wrappers/BoltCollectorTest.java | 7 ++- .../flink/storm/wrappers/BoltWrapperTest.java | 21 ++++--- .../wrappers/FlinkTopologyContextTest.java | 6 +- .../wrappers/SetupOutputFieldsDeclarerTest.java | 6 +- .../storm/wrappers/SpoutCollectorTest.java | 7 ++- .../flink/storm/wrappers/SpoutWrapperTest.java | 12 ++-- .../flink/storm/wrappers/StormTupleTest.java | 19 ++++--- .../storm/wrappers/WrapperSetupHelperTest.java | 4 ++ .../WrapperSetupInLocalClusterTest.java | 36 +++++++----- .../src/test/resources/log4j-test.properties | 2 +- 46 files changed, 369 insertions(+), 255 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index a10ff68..1baf26b 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -181,5 +181,44 @@ under the License. </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <dependencies> + <dependency> + <groupId>com.puppycrawl.tools</groupId> + <artifactId>checkstyle</artifactId> + <version>6.19</version> + </dependency> + </dependencies> + <configuration> + <configLocation>/tools/maven/strict-checkstyle.xml</configLocation> + <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation> + <includeTestSourceDirectory>true</includeTestSourceDirectory> + <logViolationsToConsole>true</logViolationsToConsole> + <failOnViolation>true</failOnViolation> + </configuration> + <executions> + <!-- + Execute checkstyle after compilation but before tests. + + This ensures that any parsing or type checking errors are from + javac, so they look as expected. Beyond that, we want to + fail as early as possible. + --> + <execution> + <phase>test-compile</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 626335d..88a38e2 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -18,21 +18,6 @@ package org.apache.flink.storm.api; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.pattern.Patterns; -import akka.util.Timeout; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; -import org.apache.flink.runtime.jobmaster.JobMaster; -import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; -import org.apache.storm.Config; -import org.apache.storm.generated.AlreadyAliveException; -import org.apache.storm.generated.InvalidTopologyException; -import org.apache.storm.generated.KillOptions; -import org.apache.storm.generated.NotAliveException; -import org.apache.storm.utils.NimbusClient; -import org.apache.storm.utils.Utils; -import com.esotericsoftware.kryo.Serializer; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.ClusterClient; @@ -46,17 +31,29 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.graph.StreamGraph; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.pattern.Patterns; +import akka.util.Timeout; +import com.esotericsoftware.kryo.Serializer; +import org.apache.storm.Config; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.generated.NotAliveException; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Some; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.IOException; @@ -67,6 +64,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import scala.Some; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + /** * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with * Flink's JobManager instead of Storm's Nimbus. @@ -76,13 +78,13 @@ public class FlinkClient { /** The log used by this client. */ private static final Logger LOG = LoggerFactory.getLogger(FlinkClient.class); - /** The client's configuration */ - private final Map<?,?> conf; - /** The jobmanager's host name */ + /** The client's configuration. */ + private final Map<?, ?> conf; + /** The jobmanager's host name. */ private final String jobManagerHost; - /** The jobmanager's rpc port */ + /** The jobmanager's rpc port. */ private final int jobManagerPort; - /** The user specified timeout in milliseconds */ + /** The user specified timeout in milliseconds. */ private final String timeout; // The following methods are derived from "backtype.storm.utils.NimbusClient" @@ -145,8 +147,8 @@ public class FlinkClient { /** * Return a reference to itself. - * <p> - * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once. + * + * <p>{@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once. * * @return A reference to itself. */ @@ -188,7 +190,7 @@ public class FlinkClient { try { FlinkClient.addStormConfigToTopology(topology, conf); - } catch(ClassNotFoundException e) { + } catch (ClassNotFoundException e) { LOG.error("Could not register class for Kryo serialization.", e); throw new InvalidTopologyException("Could not register class for Kryo serialization."); } @@ -352,9 +354,9 @@ public class FlinkClient { if (klass instanceof String) { flinkConfig.registerKryoType(Class.forName((String) klass)); } else { - for (Entry<String,String> register : ((Map<String,String>)klass).entrySet()) { + for (Entry<String, String> register : ((Map<String, String>) klass).entrySet()) { flinkConfig.registerTypeWithKryoSerializer(Class.forName(register.getKey()), - (Class<? extends Serializer<?>>)Class.forName(register.getValue())); + (Class<? extends Serializer<?>>) Class.forName(register.getValue())); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java index d69d345..364c4d5 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java @@ -18,7 +18,14 @@ package org.apache.flink.storm.api; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.minicluster.FlinkMiniCluster; +import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; +import org.apache.flink.streaming.api.graph.StreamGraph; + import org.apache.storm.LocalCluster; import org.apache.storm.generated.ClusterSummary; import org.apache.storm.generated.KillOptions; @@ -26,13 +33,6 @@ import org.apache.storm.generated.RebalanceOptions; import org.apache.storm.generated.StormTopology; import org.apache.storm.generated.SubmitOptions; import org.apache.storm.generated.TopologyInfo; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; -import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; -import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,17 +44,15 @@ import java.util.Objects; */ public class FlinkLocalCluster { - /** The log used by this mini cluster */ + /** The log used by this mini cluster. */ private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class); - /** The Flink mini cluster on which to execute the programs */ + /** The Flink mini cluster on which to execute the programs. */ private FlinkMiniCluster flink; /** Configuration key to submit topology in blocking mode if flag is set to {@code true}. */ public static final String SUBMIT_BLOCKING = "SUBMIT_STORM_TOPOLOGY_BLOCKING"; - - public FlinkLocalCluster() { } @@ -62,8 +60,6 @@ public class FlinkLocalCluster { this.flink = Objects.requireNonNull(flink); } - - @SuppressWarnings("rawtypes") public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology) throws Exception { @@ -77,8 +73,8 @@ public class FlinkLocalCluster { boolean submitBlocking = false; if (conf != null) { Object blockingFlag = conf.get(SUBMIT_BLOCKING); - if(blockingFlag != null && blockingFlag instanceof Boolean) { - submitBlocking = ((Boolean)blockingFlag).booleanValue(); + if (blockingFlag != null && blockingFlag instanceof Boolean) { + submitBlocking = ((Boolean) blockingFlag).booleanValue(); } } @@ -184,7 +180,7 @@ public class FlinkLocalCluster { /** * A factory that creates local clusters. */ - public static interface LocalClusterFactory { + public interface LocalClusterFactory { /** * Creates a local Flink cluster. http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java index b0bebef..b1e8a47 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java @@ -17,13 +17,14 @@ package org.apache.flink.storm.api; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + import java.util.HashMap; import java.util.List; @@ -80,13 +81,13 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { /** * Returns {@link TypeInformation} for the declared output schema for a specific stream. - * + * * @param streamId * A stream ID. - * + * * @return output type information for the declared output schema of the specified stream; or {@code null} if * {@code streamId == null} - * + * * @throws IllegalArgumentException * If no output schema was declared for the specified stream or if more then 25 attributes got declared. */ @@ -143,12 +144,12 @@ final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer { /** * Computes the indexes within the declared output schema of the specified stream, for a list of given * field-grouping attributes. - * + * * @param streamId * A stream ID. * @param groupingFields * The names of the key fields. - * + * * @return array of {@code int}s that contains the index within the output schema for each attribute in the given * list */ http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java index 3b191b0..6135d4d 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java @@ -14,34 +14,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.api; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.ContextEnvironment; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; + import org.apache.storm.Config; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.generated.SubmitOptions; import org.apache.storm.utils.Utils; - -import java.net.URISyntaxException; -import java.net.URL; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.client.program.ContextEnvironment; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.GlobalConfiguration; import org.json.simple.JSONValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.net.URISyntaxException; +import java.net.URL; import java.util.Map; /** * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster. */ public class FlinkSubmitter { - public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class); + private static final Logger LOG = LoggerFactory.getLogger(FlinkSubmitter.class); /** * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. @@ -121,17 +122,17 @@ public class FlinkSubmitter { } } - logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); + LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); } catch (final InvalidTopologyException e) { - logger.warn("Topology submission exception: " + e.get_msg()); + LOG.warn("Topology submission exception: " + e.get_msg()); throw e; } catch (final AlreadyAliveException e) { - logger.warn("Topology already alive exception", e); + LOG.warn("Topology already alive exception", e); throw e; } - logger.info("Finished submitting topology: " + name); + LOG.info("Finished submitting topology: " + name); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java index 2b36feb..3b78a90 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java @@ -16,17 +16,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.api; -import org.apache.storm.generated.ComponentCommon; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.generated.Grouping; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.IRichStateSpout; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; +package org.apache.flink.storm.api; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -46,6 +37,16 @@ import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.InstantiationUtil; +import org.apache.storm.generated.ComponentCommon; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.IRichStateSpout; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + import java.io.IOException; import java.lang.reflect.Field; import java.util.HashMap; @@ -62,9 +63,9 @@ import java.util.Set; */ public class FlinkTopology { - /** All declared streams and output schemas by operator ID */ + /** All declared streams and output schemas by operator ID. */ private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>(); - /** All spouts&bolts declarers by their ID */ + /** All spouts&bolts declarers by their ID. */ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>(); private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = @@ -96,7 +97,6 @@ public class FlinkTopology { } /** - * * Creates a Flink program that uses the specified spouts and bolts. * @param stormBuilder The Storm topology builder to use for creating the Flink topology. * @return A {@link FlinkTopology} which contains the translated Storm topology and may be executed. @@ -123,7 +123,6 @@ public class FlinkTopology { return env.execute(); } - @SuppressWarnings("unchecked") private <T> Map<String, T> getPrivateField(String field) { try { @@ -161,18 +160,16 @@ public class FlinkTopology { /* Translation of topology */ - for (final Entry<String, IRichSpout> spout : spouts.entrySet()) { final String spoutId = spout.getKey(); final IRichSpout userSpout = spout.getValue(); final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer(); userSpout.declareOutputFields(declarer); - final HashMap<String,Fields> sourceStreams = declarer.outputStreams; + final HashMap<String, Fields> sourceStreams = declarer.outputStreams; this.outputStreams.put(spoutId, sourceStreams); declarers.put(spoutId, declarer); - final HashMap<String, DataStream<Tuple>> outputStreams = new HashMap<String, DataStream<Tuple>>(); final DataStreamSource<?> source; @@ -222,7 +219,7 @@ public class FlinkTopology { * 1. Connect all spout streams with bolts streams * 2. Then proceed with the bolts stream already connected * - * Because we do not know the order in which an iterator steps over a set, we might process a consumer before + * <p>Because we do not know the order in which an iterator steps over a set, we might process a consumer before * its producer * ->thus, we might need to repeat multiple times */ @@ -418,7 +415,7 @@ public class FlinkTopology { final SingleOutputStreamOperator<Tuple> outStream; // only one input - if(inputStreams.entrySet().size() == 1) { + if (inputStreams.entrySet().size() == 1) { BoltWrapper<Tuple, Tuple> boltWrapper = new BoltWrapper<>(bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null); boltWrapper.setStormTopology(stormTopology); @@ -444,7 +441,7 @@ public class FlinkTopology { final SingleOutputStreamOperator<SplitStreamType<Tuple>> multiStream; // only one input - if(inputStreams.entrySet().size() == 1) { + if (inputStreams.entrySet().size() == 1) { final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<>( bolt, boltId, inputStreamId1, inputComponentId1, inputSchema1, null); boltWrapperMultipleOutputs.setStormTopology(stormTopology); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java index 160a7d9..00c467e3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/StormFlinkStreamMerger.java @@ -3,13 +3,14 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.storm.api; import org.apache.flink.storm.wrappers.StormTuple; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java index 1e6e2ed..f7bcb12 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/TwoFlinkStreamsMerger.java @@ -3,13 +3,14 @@ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the * License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ + package org.apache.flink.storm.api; import org.apache.flink.storm.wrappers.StormTuple; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java index 10f9797..7615b2e 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java @@ -31,6 +31,6 @@ public interface FiniteSpout extends IRichSpout { * * @return true, if the spout's stream reached its end, false otherwise */ - public boolean reachedEnd(); + boolean reachedEnd(); } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java index 20e3309..a830b10 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/NullTerminatingSpout.java @@ -15,15 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.util; -import java.util.Map; +package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; +import java.util.Map; + /** * {@link NullTerminatingSpout} in a finite spout (ie, implements {@link FiniteSpout} interface) that wraps an * infinite spout, and returns {@code true} in {@link #reachedEnd()} when the wrapped spout does not emit a tuple @@ -37,14 +38,10 @@ public class NullTerminatingSpout implements FiniteSpout { /** The observer that checks if the given spouts emit a tuple or not on nextTuple(). */ private SpoutOutputCollectorObserver observer; - - public NullTerminatingSpout(IRichSpout spout) { this.spout = spout; } - - @Override public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) { this.observer = new SpoutOutputCollectorObserver(collector); @@ -95,7 +92,7 @@ public class NullTerminatingSpout implements FiniteSpout { @Override public boolean reachedEnd() { - return this.observer.emitted == false; + return !this.observer.emitted; } } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java index 1fb5e02..d2e84c5 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.flink.api.common.functions.MapFunction; @@ -25,7 +26,7 @@ import org.apache.flink.streaming.api.datastream.SplitStream; * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and * {@link SplitStream#select(String...) .select(...)}). - * + * * @param <T> */ public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> { http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java index 5056795..36894c7 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.flink.streaming.api.datastream.DataStream; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java index 9e222ec..8be466e 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SpoutOutputCollectorObserver.java @@ -15,13 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.util; -import java.util.List; +package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.utils.Utils; +import java.util.List; + /** * Observes if a call to any {@code emit(...)} or {@code emitDirect(...)} method is made. * The internal flag {@link #emitted} must be reset by the user manually. @@ -33,15 +34,11 @@ class SpoutOutputCollectorObserver extends SpoutOutputCollector { /** The internal flag that it set to {@code true} if a tuple gets emitted. */ boolean emitted; - - public SpoutOutputCollectorObserver(SpoutOutputCollector delegate) { super(null); this.delegate = delegate; } - - @Override public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) { emitted = true; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java index 040c395..a9d7bfd 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; -import org.apache.storm.Config; import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; +import org.apache.storm.Config; + import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -44,7 +46,7 @@ public final class StormConfig extends GlobalJobParameters implements Map { /** * Creates an configuration with initial values provided by the given {@code Map}. - * + * * @param config * Initial values for this configuration. */ @@ -53,7 +55,6 @@ public final class StormConfig extends GlobalJobParameters implements Map { this.config.putAll(config); } - @Override public int size() { return this.config.size(); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java index 6072e0f..33ba374 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.flink.storm.api.FlinkTopology; @@ -45,4 +46,4 @@ public final class StormStreamSelector<T> implements OutputSelector<SplitStreamT return streamId; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java index 7f4d7d1..21ce115 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/AbstractStormCollector.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import org.apache.flink.api.java.tuple.Tuple; @@ -50,7 +51,7 @@ abstract class AbstractStormCollector<OUT> { * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * + * * @param numberOfAttributes * The number of attributes of the emitted tuples per output stream. * @param taskId @@ -107,7 +108,7 @@ abstract class AbstractStormCollector<OUT> { /** * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} * to the specified output stream. - * + * * @param The * The output stream id. * @param tuple @@ -160,7 +161,7 @@ abstract class AbstractStormCollector<OUT> { /** * Emits a Flink tuple. - * + * * @param flinkTuple * The tuple to be emitted. * @return the IDs of the tasks this tuple was sent to http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java index 7b94707..82c7be3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltCollector.java @@ -17,14 +17,14 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.task.IOutputCollector; -import org.apache.storm.tuple.Tuple; - import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.util.Collector; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.Tuple; + import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -36,14 +36,14 @@ import java.util.List; */ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { - /** The Flink output Collector */ + /** The Flink output Collector. */ private final Collector<OUT> flinkOutput; /** * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * + * * @param numberOfAttributes * The number of attributes of the emitted tuples per output stream. * @param taskId @@ -91,5 +91,4 @@ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputC @Override public void resetTimeout(Tuple var1) {} - } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index 731f28f..590faf3 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -17,16 +17,6 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.generated.Grouping; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.MessageId; -import org.apache.storm.utils.Utils; - import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; @@ -38,6 +28,16 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.utils.Utils; + import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -57,9 +57,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements private static final long serialVersionUID = -4788589118464155835L; /** The default input component ID. */ - public final static String DEFAULT_ID = "default ID"; + public static final String DEFAULT_ID = "default ID"; /** The default bolt ID. */ - public final static String DEFUALT_BOLT_NAME = "Unnamed Bolt"; + public static final String DEFUALT_BOLT_NAME = "Unnamed Bolt"; /** The wrapped Storm {@link IRichBolt bolt}. */ protected final IRichBolt bolt; @@ -90,7 +90,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible for * POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's * declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @throws IllegalArgumentException @@ -105,7 +105,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * within a Flink streaming program. The given input schema enable attribute-by-name access for input types * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on * the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param inputSchema @@ -124,7 +124,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the bolt's * number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of * {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs @@ -135,7 +135,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not within range * [1;25]. */ - public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) + public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) throws IllegalArgumentException { this(bolt, null, asList(rawOutputs)); } @@ -146,7 +146,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the bolt's * number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of * {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs @@ -167,7 +167,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param inputSchema @@ -183,7 +183,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements public BoltWrapper( final IRichBolt bolt, final Fields inputSchema, - final String[] rawOutputs) + final String[] rawOutputs) throws IllegalArgumentException { this(bolt, inputSchema, asList(rawOutputs)); } @@ -194,7 +194,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param inputSchema @@ -220,7 +220,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true} * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param name @@ -244,7 +244,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements /** * Sets the original Storm topology. - * + * * @param stormTopology * The original Storm topology. */ http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java index f55f0e3..e84abcc 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -17,6 +17,7 @@ package org.apache.flink.storm.wrappers; +import clojure.lang.Atom; import org.apache.storm.generated.StormTopology; import org.apache.storm.hooks.ITaskHook; import org.apache.storm.metric.api.CombinedMetric; @@ -27,7 +28,6 @@ import org.apache.storm.metric.api.ReducedMetric; import org.apache.storm.state.ISubscribedState; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Fields; -import clojure.lang.Atom; import java.util.Collection; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java index 6dd6973..07abffc 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java @@ -17,13 +17,13 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.topology.IRichBolt; - import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.storm.topology.IRichBolt; + import java.util.Collection; import static java.util.Arrays.asList; @@ -40,7 +40,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup * Instantiates a new {@link MergedInputsBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it * can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to {@link Tuple25} * depending on the bolt's declared number of attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @throws IllegalArgumentException @@ -56,7 +56,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of * attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs @@ -78,7 +78,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of * attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param rawOutputs @@ -100,7 +100,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of * attributes. - * + * * @param bolt * The Storm {@link IRichBolt bolt} to be used. * @param name http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java index 5404027..6e3a39a 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutCollector.java @@ -17,11 +17,12 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.spout.ISpoutOutputCollector; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple25; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.storm.spout.ISpoutOutputCollector; + import java.util.HashMap; import java.util.List; @@ -32,14 +33,14 @@ import java.util.List; */ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector { - /** The Flink source context object */ + /** The Flink source context object. */ private final SourceContext<OUT> flinkContext; /** * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. - * + * * @param numberOfAttributes * The number of attributes of the emitted tuples. * @param taskId @@ -73,7 +74,6 @@ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutO return this.tansformAndEmit(streamId, tuple); } - @Override public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); @@ -83,5 +83,4 @@ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutO return 0; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java index 3dd1e10..458fffb 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -17,11 +17,6 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; - import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.java.tuple.Tuple0; @@ -32,6 +27,11 @@ import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; + import java.util.Collection; import java.util.HashMap; @@ -73,7 +73,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout's declared number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @throws IllegalArgumentException @@ -87,7 +87,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout's declared number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param numberOfInvocations @@ -108,7 +108,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared * number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param rawOutputs @@ -130,7 +130,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared * number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param rawOutputs @@ -156,7 +156,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared * number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param rawOutputs @@ -178,7 +178,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared * number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param rawOutputs @@ -204,7 +204,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared * number of attributes. - * + * * @param spout * The {@link IRichSpout spout} to be used. * @param name @@ -231,7 +231,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp /** * Sets the original Storm topology. - * + * * @param stormTopology * The original Storm topology. */ @@ -240,7 +240,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp } @Override - public final void run(final SourceContext<OUT> ctx) throws Exception { + public void run(final SourceContext<OUT> ctx) throws Exception { final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); @@ -292,8 +292,8 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp /** * {@inheritDoc} - * <p> - * Sets the {@link #isRunning} flag to {@code false}. + * + * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void cancel() { @@ -302,8 +302,8 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp /** * {@inheritDoc} - * <p> - * Sets the {@link #isRunning} flag to {@code false}. + * + * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void stop() { http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java index 30085fc..a1d33e8 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/StormTuple.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; /* @@ -52,7 +53,6 @@ public class StormTuple<IN> implements org.apache.storm.tuple.Tuple { /** The message that is associated with this tuple. */ private final MessageId messageId; - /** * Create a new Storm tuple from the given Flink tuple. * http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java index 3a9b650..1611211 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/WrapperSetupHelper.java @@ -14,8 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import clojure.lang.Atom; import org.apache.storm.Config; import org.apache.storm.generated.Bolt; import org.apache.storm.generated.ComponentCommon; @@ -28,8 +32,6 @@ import org.apache.storm.topology.IComponent; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.IRichSpout; import org.apache.storm.tuple.Fields; -import clojure.lang.Atom; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.ArrayList; import java.util.Collection; @@ -45,14 +47,14 @@ import java.util.Map.Entry; class WrapperSetupHelper { /** The configuration key for the topology name. */ - final static String TOPOLOGY_NAME = "storm.topology.name"; + static final String TOPOLOGY_NAME = "storm.topology.name"; /** * Computes the number of output attributes used by a {@link SpoutWrapper} or {@link BoltWrapper} * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}. - * + * * @param spoutOrBolt * The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used. * @param rawOutputs @@ -96,7 +98,7 @@ class WrapperSetupHelper { /** * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor). - * + * * @param context * The Flink runtime context. * @param spoutOrBolt @@ -203,7 +205,7 @@ class WrapperSetupHelper { /** * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id. - * + * * @param componentId * The ID of the Spout/Bolt in the topology. * @param common @@ -220,7 +222,7 @@ class WrapperSetupHelper { * OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs. * @param componentToStreamToFields * OUTPUT: A map from all component IDs to there output streams and output fields. - * + * * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current * Flink operator {@code operatorName} -- {@code null} otherwise. */ @@ -229,7 +231,7 @@ class WrapperSetupHelper { final int dop, final Map<Integer, String> taskToComponents, final Map<String, List<Integer>> componentToSortedTasks, final Map<String, Map<String, Fields>> componentToStreamToFields) { - final int parallelism_hint = common.get_parallelism_hint(); + final int parallelismHint = common.get_parallelism_hint(); Integer taskId = null; if (componentId.equals(operatorName)) { @@ -237,7 +239,7 @@ class WrapperSetupHelper { } List<Integer> sortedTasks = new ArrayList<Integer>(dop); - for (int i = 0; i < parallelism_hint; ++i) { + for (int i = 0; i < parallelismHint; ++i) { taskToComponents.put(tid, componentId); sortedTasks.add(tid); ++tid; @@ -245,7 +247,7 @@ class WrapperSetupHelper { componentToSortedTasks.put(componentId, sortedTasks); Map<String, Fields> outputStreams = new HashMap<String, Fields>(); - for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) { + for (Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) { outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields())); } componentToStreamToFields.put(componentId, outputStreams); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java index ddbeaff..d035bb2 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarerTest.java @@ -14,17 +14,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.api; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.storm.util.AbstractTest; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; import org.junit.Assert; import org.junit.Test; import java.util.LinkedList; +/** + * Tests for the FlinkOutputFieldsDeclarer. + */ public class FlinkOutputFieldsDeclarerTest extends AbstractTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java index 0ec0179..aaecc06 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/FlinkTopologyTest.java @@ -14,17 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.api; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.storm.util.TestSink; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; import org.junit.Assert; import org.junit.Test; +/** + * Tests for the FlinkTopology. + */ public class FlinkTopologyTest { @Test @@ -68,7 +72,7 @@ public class FlinkTopologyTest { builder.setSpout("spout", new TestDummySpout()); builder.setBolt("sink", new TestSink()).fieldsGrouping("spout", - TestDummySpout.spoutStreamId, new Fields("id")); + TestDummySpout.SPOUT_STREAM_ID, new Fields("id")); FlinkTopology.createTopology(builder); } @@ -80,7 +84,7 @@ public class FlinkTopologyTest { builder.setSpout("spout", new TestDummySpout()); builder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout"); builder.setBolt("sink", new TestSink()).fieldsGrouping("bolt", - TestDummyBolt.groupingStreamId, new Fields("id")); + TestDummyBolt.GROUPING_STREAM_ID, new Fields("id")); FlinkTopology.createTopology(builder); } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java index 0f617fb..001e9c4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestBolt.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.api; import org.apache.storm.task.OutputCollector; @@ -24,6 +25,9 @@ import org.apache.storm.tuple.Tuple; import java.util.Map; +/** + * A no-op test implementation of a {@link IRichBolt}. + */ public class TestBolt implements IRichBolt { private static final long serialVersionUID = -667148827441397683L; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java index 1b185a7..3466ff4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/api/TestSpout.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.api; import org.apache.storm.spout.SpoutOutputCollector; @@ -23,6 +24,9 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; +/** + * A no-op test implementation of a {@link IRichSpout}. + */ public class TestSpout implements IRichSpout { private static final long serialVersionUID = -4884029383198924007L; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java index f51aba4..ca0e067 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java @@ -23,6 +23,9 @@ import org.slf4j.LoggerFactory; import java.util.Random; +/** + * Abstract class for all tests that require a {@link Random} to be setup before each test. + */ public abstract class AbstractTest { private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java index 9a5b1cd..8b89c95 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java @@ -26,6 +26,9 @@ import org.apache.storm.tuple.Values; import java.util.Map; +/** + * Tests for the Finite. + */ public class FiniteTestSpout implements IRichSpout { private static final long serialVersionUID = 7992419478267824279L; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java index 1eaed4a..7263ce4 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/NullTerminatingSpoutTest.java @@ -15,25 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.storm.util; -import java.util.HashMap; -import java.util.Map; +package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; - import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Matchers.same; -import static org.mockito.Mockito.any; +/** + * Tests for the NullTerminatingSpout. + */ public class NullTerminatingSpoutTest { @Test @@ -43,7 +46,7 @@ public class NullTerminatingSpoutTest { IRichSpout spoutMock = mock(IRichSpout.class); when(spoutMock.getComponentConfiguration()).thenReturn(compConfig); - Map<?,?> conf = mock(Map.class); + Map<?, ?> conf = mock(Map.class); TopologyContext context = mock(TopologyContext.class); Object msgId = mock(Object.class); OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class); http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java index a5b96bd..c150cc3 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/SpoutOutputCollectorObserverTest.java @@ -15,15 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; - import org.junit.Assert; import org.junit.Test; import static org.mockito.Mockito.mock; +/** + * Tests for the SpoutOutputCollectorObserver. + */ public class SpoutOutputCollectorObserverTest { @Test @@ -35,11 +38,11 @@ public class SpoutOutputCollectorObserverTest { Assert.assertTrue(observer.emitted); observer.emitted = false; - observer.emit(null, (Object)null); + observer.emit(null, (Object) null); Assert.assertTrue(observer.emitted); observer.emitted = false; - observer.emit((String)null, null); + observer.emit((String) null, null); Assert.assertTrue(observer.emitted); observer.emitted = false; @@ -51,11 +54,11 @@ public class SpoutOutputCollectorObserverTest { Assert.assertTrue(observer.emitted); observer.emitted = false; - observer.emitDirect(0, null, (Object)null); + observer.emitDirect(0, null, (Object) null); Assert.assertTrue(observer.emitted); observer.emitted = false; - observer.emitDirect(0, (String)null, null); + observer.emitDirect(0, (String) null, null); Assert.assertTrue(observer.emitted); observer.emitted = false; http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java index f73db20..67d4a17 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.junit.Assert; @@ -22,6 +23,9 @@ import org.junit.Test; import java.util.Iterator; +/** + * Tests for the StormStreamSelector. + */ public class StormStreamSelectorTest { @Test http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java index 2ad8f2e..2773692 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.storm.task.OutputCollector; @@ -26,11 +27,14 @@ import org.apache.storm.tuple.Values; import java.util.Map; +/** + * A test implementation of a {@link IRichBolt}. + */ public class TestDummyBolt implements IRichBolt { private static final long serialVersionUID = 6893611247443121322L; - public final static String shuffleStreamId = "shuffleStream"; - public final static String groupingStreamId = "groupingStream"; + public static final String SHUFFLE_STREAM_ID = "shuffleStream"; + public static final String GROUPING_STREAM_ID = "groupingStream"; private boolean emit = true; @SuppressWarnings("rawtypes") @@ -49,10 +53,10 @@ public class TestDummyBolt implements IRichBolt { @Override public void execute(Tuple input) { if (this.context.getThisTaskIndex() == 0) { - this.collector.emit(shuffleStreamId, input.getValues()); + this.collector.emit(SHUFFLE_STREAM_ID, input.getValues()); } if (this.emit) { - this.collector.emit(groupingStreamId, new Values("bolt", this.context)); + this.collector.emit(GROUPING_STREAM_ID, new Values("bolt", this.context)); this.emit = false; } } @@ -62,8 +66,8 @@ public class TestDummyBolt implements IRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declareStream(shuffleStreamId, new Fields("data")); - declarer.declareStream(groupingStreamId, new Fields("id", "data")); + declarer.declareStream(SHUFFLE_STREAM_ID, new Fields("data")); + declarer.declareStream(GROUPING_STREAM_ID, new Fields("id", "data")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java index 82506e4..5ff8289 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.storm.spout.SpoutOutputCollector; @@ -26,10 +27,13 @@ import org.apache.storm.utils.Utils; import java.util.Map; +/** + * A test implementation of a {@link IRichSpout}. + */ public class TestDummySpout implements IRichSpout { private static final long serialVersionUID = -5190945609124603118L; - public final static String spoutStreamId = "spout-stream"; + public static final String SPOUT_STREAM_ID = "spout-stream"; private boolean emit = true; @SuppressWarnings("rawtypes") @@ -71,7 +75,7 @@ public class TestDummySpout implements IRichSpout { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data")); - declarer.declareStream(spoutStreamId, new Fields("id", "data")); + declarer.declareStream(SPOUT_STREAM_ID, new Fields("id", "data")); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java index 1f4da55..a3bb884 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.util; import org.apache.storm.task.OutputCollector; @@ -26,23 +27,26 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +/** + * A test implementation of a {@link IRichBolt} that stores incoming records in {@link #RESULT}. + */ public class TestSink implements IRichBolt { private static final long serialVersionUID = 4314871456719370877L; - public final static List<TopologyContext> result = new LinkedList<TopologyContext>(); + public static final List<TopologyContext> RESULT = new LinkedList<TopologyContext>(); @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { - result.add(context); + RESULT.add(context); } @Override public void execute(Tuple input) { if (input.size() == 1) { - result.add((TopologyContext) input.getValue(0)); + RESULT.add((TopologyContext) input.getValue(0)); } else { - result.add((TopologyContext) input.getValue(1)); + RESULT.add((TopologyContext) input.getValue(1)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java index 9e3165b..d48042b 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java @@ -17,11 +17,11 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.tuple.Values; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.streaming.api.operators.Output; + +import org.apache.storm.tuple.Values; import org.junit.Assert; import org.junit.Test; @@ -32,6 +32,9 @@ import java.util.List; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +/** + * Tests for the BoltCollector. + */ public class BoltCollectorTest extends AbstractTest { @SuppressWarnings({ "rawtypes", "unchecked" }) http://git-wip-us.apache.org/repos/asf/flink/blob/40cb093c/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java index 1f8f773..f518d17 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java @@ -17,15 +17,6 @@ package org.apache.flink.storm.wrappers; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichBolt; -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.tuple.Fields; -import org.apache.storm.tuple.MessageId; -import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Utils; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.java.tuple.Tuple; @@ -46,6 +37,15 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +67,9 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +/** + * Tests for the BoltWrapper. + */ @RunWith(PowerMockRunner.class) @PrepareForTest({StreamElementSerializer.class, WrapperSetupHelper.class, StreamRecord.class}) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
