[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 'flink-storm'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea2596e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea2596e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea2596e Branch: refs/heads/master Commit: 0ea2596e1b605c1eedb843273660ef1366463313 Parents: 4456453 Author: Stephan Ewen <se...@apache.org> Authored: Mon Aug 1 15:58:12 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Aug 1 19:52:13 2016 +0200 ---------------------------------------------------------------------- flink-contrib/flink-storm/pom.xml | 83 +++++++------------- .../org/apache/flink/storm/api/FlinkClient.java | 5 +- .../flink/storm/wrappers/BoltWrapper.java | 19 +++-- .../storm/wrappers/FlinkTopologyContext.java | 3 +- .../storm/wrappers/MergedInputsBoltWrapper.java | 6 +- .../flink/storm/wrappers/SpoutWrapper.java | 8 +- .../storm/wrappers/WrapperSetupHelperTest.java | 12 ++- 7 files changed, 60 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml index 590f33d..0ac49db 100644 --- a/flink-contrib/flink-storm/pom.xml +++ b/flink-contrib/flink-storm/pom.xml @@ -61,18 +61,40 @@ under the License. <artifactId>log4j-over-slf4j</artifactId> </exclusion> <exclusion> - <artifactId>logback-classic</artifactId> <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>ring</groupId> + <artifactId>ring-core</artifactId> + </exclusion> + <exclusion> + <groupId>ring</groupId> + <artifactId>ring-devel</artifactId> + </exclusion> + <exclusion> + <groupId>ring</groupId> + <artifactId>ring-servlet</artifactId> + </exclusion> + <exclusion> + <groupId>ring</groupId> + <artifactId>ring-jetty-adapter</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.jgrapht</groupId> + <artifactId>jgrapht-core</artifactId> </exclusion> </exclusions> </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - </dependency> - <!-- test dependencies --> <dependency> @@ -85,51 +107,4 @@ under the License. </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - - <pluginManagement> - <plugins> - <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.--> - <plugin> - <groupId>org.eclipse.m2e</groupId> - <artifactId>lifecycle-mapping</artifactId> - <version>1.0.0</version> - <configuration> - <lifecycleMappingMetadata> - <pluginExecutions> - <pluginExecution> - <pluginExecutionFilter> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <versionRange>[2.9,)</versionRange> - <goals> - <goal>unpack</goal> - </goals> - </pluginExecutionFilter> - <action> - <ignore/> - </action> - </pluginExecution> - </pluginExecutions> - </lifecycleMappingMetadata> - </configuration> - </plugin> - </plugins> - </pluginManagement> - - </build> - </project> http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java index 9628bb7..f4bcfb7 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java @@ -22,6 +22,7 @@ import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.pattern.Patterns; import akka.util.Timeout; + import backtype.storm.Config; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; @@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient; import backtype.storm.utils.Utils; import com.esotericsoftware.kryo.Serializer; -import com.google.common.collect.Lists; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; @@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus; import org.apache.flink.storm.util.StormConfig; import org.apache.flink.streaming.api.graph.StreamGraph; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,7 +216,7 @@ public class FlinkClient { try { ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader( - Lists.newArrayList(uploadedJarUrl), + Collections.<URL>singletonList(uploadedJarUrl), Collections.<URL>emptyList(), this.getClass().getClassLoader()); client.runDetached(jobGraph, classLoader); http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java index 5311cb3..6e316e7 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.generated.GlobalStreamId; @@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.MessageId; import backtype.storm.utils.Utils; -import com.google.common.collect.Sets; - import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple0; @@ -44,6 +43,8 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import static java.util.Arrays.asList; + /** * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program. * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can @@ -135,9 +136,9 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not within range * [1;25]. */ - public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) + public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) throws IllegalArgumentException { - this(bolt, null, Sets.newHashSet(rawOutputs)); + this(bolt, null, asList(rawOutputs)); } /** @@ -157,8 +158,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * [1;25]. */ - public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) - throws IllegalArgumentException { + public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) throws IllegalArgumentException { this(bolt, null, rawOutputs); } @@ -181,9 +181,12 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range * [0;25]. */ - public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs) + public BoltWrapper( + final IRichBolt bolt, + final Fields inputSchema, + final String[] rawOutputs) throws IllegalArgumentException { - this(bolt, inputSchema, Sets.newHashSet(rawOutputs)); + this(bolt, inputSchema, asList(rawOutputs)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java index db1d147..52d39a7 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java @@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext { * @throws UnsupportedOperationException * at every invocation */ - @SuppressWarnings("unchecked") @Override - public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) { + public <T extends IMetric> T registerMetric(final String name, final T metric, final int timeBucketSizeInSecs) { throw new UnsupportedOperationException("Metrics are not supported by Flink"); } http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java index 89defde..7a3b6d5 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java @@ -14,10 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.topology.IRichBolt; -import com.google.common.collect.Sets; import org.apache.flink.api.java.tuple.Tuple0; import org.apache.flink.api.java.tuple.Tuple1; @@ -26,6 +26,8 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Collection; +import static java.util.Arrays.asList; + /** * A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It * can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream @@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends BoltWrapper<StormTup */ public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs) throws IllegalArgumentException { - super(bolt, Sets.newHashSet(rawOutputs)); + super(bolt, asList(rawOutputs)); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java index 66b05c6..c171ccc 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java @@ -22,8 +22,6 @@ import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; -import com.google.common.collect.Sets; - import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters; import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.api.java.tuple.Tuple0; @@ -37,6 +35,8 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import java.util.Collection; import java.util.HashMap; +import static java.util.Arrays.asList; + /** * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see @@ -121,7 +121,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp */ public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs) throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs), null); + this(spout, asList(rawOutputs), null); } /** @@ -147,7 +147,7 @@ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> imp */ public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs, final Integer numberOfInvocations) throws IllegalArgumentException { - this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations); + this(spout, asList(rawOutputs), numberOfInvocations); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java index 82b12d6..000fe84 100644 --- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java +++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.storm.wrappers; import backtype.storm.Config; @@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; -import com.google.common.collect.Sets; + import org.apache.flink.storm.api.FlinkTopology; import org.apache.flink.storm.util.AbstractTest; import org.apache.flink.storm.util.TestDummyBolt; import org.apache.flink.storm.util.TestDummySpout; import org.apache.flink.storm.util.TestSink; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; + import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -48,6 +51,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import static java.util.Collections.singleton; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest { PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer); WrapperSetupHelper.getNumberOfAttributes(boltOrSpout, - Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID })); + new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID))); } @Test(expected = IllegalArgumentException.class) @@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest { Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes( boltOrSpout, - numberOfAttributes == -1 ? Sets - .newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null)); + numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null)); } @Test