[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies in 
'flink-storm'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ea2596e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ea2596e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ea2596e

Branch: refs/heads/master
Commit: 0ea2596e1b605c1eedb843273660ef1366463313
Parents: 4456453
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Aug 1 15:58:12 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 1 19:52:13 2016 +0200

----------------------------------------------------------------------
 flink-contrib/flink-storm/pom.xml               | 83 +++++++-------------
 .../org/apache/flink/storm/api/FlinkClient.java |  5 +-
 .../flink/storm/wrappers/BoltWrapper.java       | 19 +++--
 .../storm/wrappers/FlinkTopologyContext.java    |  3 +-
 .../storm/wrappers/MergedInputsBoltWrapper.java |  6 +-
 .../flink/storm/wrappers/SpoutWrapper.java      |  8 +-
 .../storm/wrappers/WrapperSetupHelperTest.java  | 12 ++-
 7 files changed, 60 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml 
b/flink-contrib/flink-storm/pom.xml
index 590f33d..0ac49db 100644
--- a/flink-contrib/flink-storm/pom.xml
+++ b/flink-contrib/flink-storm/pom.xml
@@ -61,18 +61,40 @@ under the License.
                                        
<artifactId>log4j-over-slf4j</artifactId>
                                </exclusion>
                                <exclusion>
-                                       <artifactId>logback-classic</artifactId>
                                        <groupId>ch.qos.logback</groupId>
+                                       <artifactId>logback-classic</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ring</groupId>
+                                       <artifactId>ring-core</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ring</groupId>
+                                       <artifactId>ring-devel</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ring</groupId>
+                                       <artifactId>ring-servlet</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>ring</groupId>
+                                       
<artifactId>ring-jetty-adapter</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.mortbay.jetty</groupId>
+                                       <artifactId>jetty-util</artifactId>
+                               </exclusion>
+                               <exclusion>
+                                       <groupId>org.jgrapht</groupId>
+                                       <artifactId>jgrapht-core</artifactId>
                                </exclusion>
                        </exclusions>
                </dependency>
 
-               <dependency>
-                       <groupId>com.google.guava</groupId>
-                       <artifactId>guava</artifactId>
-                       <version>${guava.version}</version>
-               </dependency>
-
                <!-- test dependencies -->
 
                <dependency>
@@ -85,51 +107,4 @@ under the License.
                
        </dependencies>
 
-       <build>
-               <plugins>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-jar-plugin</artifactId>
-                               <executions>
-                                       <execution>
-                                               <goals>
-                                                       <goal>test-jar</goal>
-                                               </goals>
-                                       </execution>
-                               </executions>
-                       </plugin>
-               </plugins>
-
-               <pluginManagement>
-                       <plugins>
-                               <!--This plugin's configuration is used to 
store Eclipse m2e settings only. It has no influence on the Maven build 
itself.-->
-                               <plugin>
-                                       <groupId>org.eclipse.m2e</groupId>
-                                       
<artifactId>lifecycle-mapping</artifactId>
-                                       <version>1.0.0</version>
-                                       <configuration>
-                                               <lifecycleMappingMetadata>
-                                                       <pluginExecutions>
-                                                               
<pluginExecution>
-                                                                       
<pluginExecutionFilter>
-                                                                               
<groupId>org.apache.maven.plugins</groupId>
-                                                                               
<artifactId>maven-dependency-plugin</artifactId>
-                                                                               
<versionRange>[2.9,)</versionRange>
-                                                                               
<goals>
-                                                                               
        <goal>unpack</goal>
-                                                                               
</goals>
-                                                                       
</pluginExecutionFilter>
-                                                                       <action>
-                                                                               
<ignore/>
-                                                                       
</action>
-                                                               
</pluginExecution>
-                                                       </pluginExecutions>
-                                               </lifecycleMappingMetadata>
-                                       </configuration>
-                               </plugin>
-                       </plugins>
-               </pluginManagement>
-
-       </build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 9628bb7..f4bcfb7 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import backtype.storm.Config;
 import backtype.storm.generated.AlreadyAliveException;
 import backtype.storm.generated.InvalidTopologyException;
@@ -32,7 +33,6 @@ import backtype.storm.utils.NimbusClient;
 import backtype.storm.utils.Utils;
 
 import com.esotericsoftware.kryo.Serializer;
-import com.google.common.collect.Lists;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.storm.util.StormConfig;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,7 +216,7 @@ public class FlinkClient {
 
                try {
                        ClassLoader classLoader = 
JobWithJars.buildUserCodeClassLoader(
-                                       Lists.newArrayList(uploadedJarUrl),
+                                       
Collections.<URL>singletonList(uploadedJarUrl),
                                        Collections.<URL>emptyList(),
                                        this.getClass().getClassLoader());
                        client.runDetached(jobGraph, classLoader);

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 5311cb3..6e316e7 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.generated.GlobalStreamId;
@@ -26,8 +27,6 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.utils.Utils;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -44,6 +43,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the 
Storm bolt within a Flink Streaming program.
  * It takes the Flink input tuples of type {@code IN} and transforms them into 
{@link StormTuple}s that the bolt can
@@ -135,9 +136,9 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not within range
         *             [1;25].
         */
-       public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
+       public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs) 
                        throws IllegalArgumentException {
-               this(bolt, null, Sets.newHashSet(rawOutputs));
+               this(bolt, null, asList(rawOutputs));
        }
 
        /**
@@ -157,8 +158,7 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
         *             [1;25].
         */
-       public BoltWrapper(final IRichBolt bolt, final Collection<String> 
rawOutputs)
-                       throws IllegalArgumentException {
+       public BoltWrapper(final IRichBolt bolt, final Collection<String> 
rawOutputs) throws IllegalArgumentException {
                this(bolt, null, rawOutputs);
        }
 
@@ -181,9 +181,12 @@ public class BoltWrapper<IN, OUT> extends 
AbstractStreamOperator<OUT> implements
         *             {@code rawOuput} is {@code false} and the number of 
declared output attributes is not with range
         *             [0;25].
         */
-       public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, 
final String[] rawOutputs)
+       public BoltWrapper(
+                       final IRichBolt bolt,
+                       final Fields inputSchema,
+                       final String[] rawOutputs) 
                        throws IllegalArgumentException {
-               this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
+               this(bolt, inputSchema, asList(rawOutputs));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
index db1d147..52d39a7 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/FlinkTopologyContext.java
@@ -120,9 +120,8 @@ final class FlinkTopologyContext extends TopologyContext {
         * @throws UnsupportedOperationException
         *              at every invocation
         */
-       @SuppressWarnings("unchecked")
        @Override
-       public IMetric registerMetric(final String name, final IMetric metric, 
final int timeBucketSizeInSecs) {
+       public <T extends IMetric> T registerMetric(final String name, final T 
metric, final int timeBucketSizeInSecs) {
                throw new UnsupportedOperationException("Metrics are not 
supported by Flink");
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
index 89defde..7a3b6d5 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/MergedInputsBoltWrapper.java
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.topology.IRichBolt;
-import com.google.common.collect.Sets;
 
 import org.apache.flink.api.java.tuple.Tuple0;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -26,6 +26,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects 
input tuples of type {@link StormTuple}. It
  * can be used to wrap a multi-input bolt and assumes that all input stream 
got merged into a {@link StormTuple} stream
@@ -67,7 +69,7 @@ public final class MergedInputsBoltWrapper<IN, OUT> extends 
BoltWrapper<StormTup
         */
        public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] 
rawOutputs)
                        throws IllegalArgumentException {
-               super(bolt, Sets.newHashSet(rawOutputs));
+               super(bolt, asList(rawOutputs));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
index 66b05c6..c171ccc 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/SpoutWrapper.java
@@ -22,8 +22,6 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IRichSpout;
 
-import com.google.common.collect.Sets;
-
 import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.java.tuple.Tuple0;
@@ -37,6 +35,8 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import java.util.Collection;
 import java.util.HashMap;
 
+import static java.util.Arrays.asList;
+
 /**
  * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it 
within a Flink Streaming program. It
  * takes the spout's output tuples and transforms them into Flink tuples of 
type {@code OUT} (see
@@ -121,7 +121,7 @@ public final class SpoutWrapper<OUT> extends 
RichParallelSourceFunction<OUT> imp
         */
        public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
                        throws IllegalArgumentException {
-               this(spout, Sets.newHashSet(rawOutputs), null);
+               this(spout, asList(rawOutputs), null);
        }
 
        /**
@@ -147,7 +147,7 @@ public final class SpoutWrapper<OUT> extends 
RichParallelSourceFunction<OUT> imp
         */
        public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
                        final Integer numberOfInvocations) throws 
IllegalArgumentException {
-               this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
+               this(spout, asList(rawOutputs), numberOfInvocations);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/0ea2596e/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
index 82b12d6..000fe84 100644
--- 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/WrapperSetupHelperTest.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.storm.wrappers;
 
 import backtype.storm.Config;
@@ -27,16 +28,18 @@ import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
-import com.google.common.collect.Sets;
+
 import org.apache.flink.storm.api.FlinkTopology;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.TestDummyBolt;
 import org.apache.flink.storm.util.TestDummySpout;
 import org.apache.flink.storm.util.TestSink;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -48,6 +51,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static java.util.Collections.singleton;
+
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -85,7 +90,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
                
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
 
                WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
-                               Sets.newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }));
+                               new 
HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)));
        }
 
        @Test(expected = IllegalArgumentException.class)
@@ -143,8 +148,7 @@ public class WrapperSetupHelperTest extends AbstractTest {
 
                Assert.assertEquals(attributes, 
WrapperSetupHelper.getNumberOfAttributes(
                                boltOrSpout,
-                               numberOfAttributes == -1 ? Sets
-                                               .newHashSet(new String[] { 
Utils.DEFAULT_STREAM_ID }) : null));
+                               numberOfAttributes == -1 ? new 
HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
        }
 
        @Test

Reply via email to