http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
new file mode 100644
index 0000000..f51aba4
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/AbstractTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public abstract class AbstractTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractTest.class);
+
+       protected long seed;
+       protected Random r;
+
+       @Before
+       public void prepare() {
+               this.seed = System.currentTimeMillis();
+               this.r = new Random(this.seed);
+               LOG.info("Test seed: {}", new Long(this.seed));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
new file mode 100644
index 0000000..1b320e5
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/FiniteTestSpout.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.Map;
+
+public class FiniteTestSpout implements IRichSpout {
+       private static final long serialVersionUID = 7992419478267824279L;
+
+       private int numberOfOutputTuples;
+       private SpoutOutputCollector collector;
+
+       public FiniteTestSpout(final int numberOfOutputTuples) {
+               this.numberOfOutputTuples = numberOfOutputTuples;
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
+               this.collector = collector;
+       }
+
+       @Override
+       public void close() {/* nothing to do */}
+
+       @Override
+       public void activate() {/* nothing to do */}
+
+       @Override
+       public void deactivate() {/* nothing to do */}
+
+       @Override
+       public void nextTuple() {
+               if (--this.numberOfOutputTuples >= 0) {
+                       this.collector.emit(new Values(new 
Integer(this.numberOfOutputTuples)));
+               }
+       }
+
+       @Override
+       public void ack(final Object msgId) {/* nothing to do */}
+
+       @Override
+       public void fail(final Object msgId) {/* nothing to do */}
+
+       @Override
+       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
+               declarer.declare(new Fields("dummy"));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
new file mode 100644
index 0000000..17de427
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/StormStreamSelectorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Iterator;
+
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StormStreamSelectorTest {
+
+       @Test
+       public void testSelector() {
+               StormStreamSelector<Object> selector = new 
StormStreamSelector<Object>();
+               SplitStreamType<Object> tuple = new SplitStreamType<Object>();
+               Iterator<String> result;
+
+               tuple.streamId = "stream1";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream1", result.next());
+               Assert.assertFalse(result.hasNext());
+
+               tuple.streamId = "stream2";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream2", result.next());
+               Assert.assertFalse(result.hasNext());
+
+               tuple.streamId = "stream1";
+               result = selector.select(tuple).iterator();
+               Assert.assertEquals("stream1", result.next());
+               Assert.assertFalse(result.hasNext());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
new file mode 100644
index 0000000..b7458df
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummyBolt.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestDummyBolt implements IRichBolt {
+       private static final long serialVersionUID = 6893611247443121322L;
+
+       public final static String shuffleStreamId = "shuffleStream";
+       public final static String groupingStreamId = "groupingStream";
+
+       private boolean emit = true;
+       @SuppressWarnings("rawtypes")
+       public Map config;
+       private TopologyContext context;
+       private OutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+               this.config = stormConf;
+               this.context = context;
+               this.collector = collector;
+       }
+
+       @Override
+       public void execute(Tuple input) {
+               if (this.context.getThisTaskIndex() == 0) {
+                       this.collector.emit(shuffleStreamId, input.getValues());
+               }
+               if (this.emit) {
+                       this.collector.emit(groupingStreamId, new 
Values("bolt", this.context));
+                       this.emit = false;
+               }
+       }
+
+       @Override
+       public void cleanup() {}
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               declarer.declareStream(shuffleStreamId, new Fields("data"));
+               declarer.declareStream(groupingStreamId, new Fields("id", 
"data"));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
new file mode 100644
index 0000000..ed9ffff
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestDummySpout.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+public class TestDummySpout implements IRichSpout {
+       private static final long serialVersionUID = -5190945609124603118L;
+
+       public final static String spoutStreamId = "spout-stream";
+
+       private boolean emit = true;
+       @SuppressWarnings("rawtypes")
+       public Map config;
+       private TopologyContext context;
+       private SpoutOutputCollector collector;
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+               this.config = conf;
+               this.context = context;
+               this.collector = collector;
+       }
+
+       @Override
+       public void close() {}
+
+       @Override
+       public void activate() {}
+
+       @Override
+       public void deactivate() {}
+
+       @Override
+       public void nextTuple() {
+               if (this.emit) {
+                       this.collector.emit(new Values(this.context));
+                       this.emit = false;
+               }
+       }
+
+       @Override
+       public void ack(Object msgId) {}
+
+       @Override
+       public void fail(Object msgId) {}
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {
+               declarer.declareStream(Utils.DEFAULT_STREAM_ID, new 
Fields("data"));
+               declarer.declareStream(spoutStreamId, new Fields("id", "data"));
+       }
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
new file mode 100644
index 0000000..59939fd
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/util/TestSink.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+
+public class TestSink implements IRichBolt {
+       private static final long serialVersionUID = 4314871456719370877L;
+
+       public final static List<TopologyContext> result = new 
LinkedList<TopologyContext>();
+
+       @SuppressWarnings("rawtypes")
+       @Override
+       public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+               result.add(context);
+       }
+
+       @Override
+       public void execute(Tuple input) {
+               if (input.size() == 1) {
+                       result.add((TopologyContext) input.getValue(0));
+               } else {
+                       result.add((TopologyContext) input.getValue(1));
+               }
+       }
+
+       @Override
+       public void cleanup() {}
+
+       @Override
+       public void declareOutputFields(OutputFieldsDeclarer declarer) {}
+
+       @Override
+       public Map<String, Object> getComponentConfiguration() {
+               return null;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
new file mode 100644
index 0000000..3d7d26b
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltCollectorTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.BoltCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class BoltCollectorTest extends AbstractTest {
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testBoltStormCollector() throws InstantiationException, 
IllegalAccessException {
+               for (int numberOfAttributes = -1; numberOfAttributes < 26; 
++numberOfAttributes) {
+                       final Output flinkCollector = mock(Output.class);
+                       Tuple flinkTuple = null;
+                       final Values tuple = new Values();
+
+                       BoltCollector<?> collector;
+
+                       final String streamId = "streamId";
+                       HashMap<String, Integer> attributes = new 
HashMap<String, Integer>();
+                       attributes.put(streamId, numberOfAttributes);
+
+                       if (numberOfAttributes == -1) {
+                               collector = new BoltCollector(attributes, 
flinkCollector);
+                               tuple.add(new Integer(this.r.nextInt()));
+
+                       } else {
+                               collector = new BoltCollector(attributes, 
flinkCollector);
+                               flinkTuple = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+                               for (int i = 0; i < numberOfAttributes; ++i) {
+                                       tuple.add(new 
Integer(this.r.nextInt()));
+                                       flinkTuple.setField(tuple.get(i), i);
+                               }
+                       }
+
+                       final Collection anchors = mock(Collection.class);
+                       final List<Integer> taskIds;
+                       taskIds = collector.emit(streamId, anchors, tuple);
+
+                       Assert.assertNull(taskIds);
+
+                       if (numberOfAttributes == -1) {
+                               verify(flinkCollector).collect(tuple.get(0));
+                       } else {
+                               verify(flinkCollector).collect(flinkTuple);
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testEmitDirect() {
+               new BoltCollector<Object>(mock(HashMap.class), 
mock(Output.class)).emitDirect(0, null,
+                               null, null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
new file mode 100644
index 0000000..e33fdb9
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummyBolt;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.same;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StreamRecordSerializer.class, WrapperSetupHelper.class})
+public class BoltWrapperTest extends AbstractTest {
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrapperRawType() throws Exception {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields("dummy1", "dummy2"));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               new BoltWrapper<Object, Object>(mock(IRichBolt.class),
+                               new String[] { Utils.DEFAULT_STREAM_ID });
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrapperToManyAttributes1() throws Exception {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               final String[] schema = new String[26];
+               for (int i = 0; i < schema.length; ++i) {
+                       schema[i] = "a" + i;
+               }
+               declarer.declare(new Fields(schema));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               new BoltWrapper<Object, Object>(mock(IRichBolt.class));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrapperToManyAttributes2() throws Exception {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               final String[] schema = new String[26];
+               for (int i = 0; i < schema.length; ++i) {
+                       schema[i] = "a" + i;
+               }
+               declarer.declare(new Fields(schema));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               new BoltWrapper<Object, Object>(mock(IRichBolt.class), new 
String[] {});
+       }
+
+       @Test
+       public void testWrapper() throws Exception {
+               for (int i = -1; i < 26; ++i) {
+                       this.testWrapper(i);
+               }
+       }
+
+       @SuppressWarnings({"rawtypes", "unchecked"})
+       private void testWrapper(final int numberOfAttributes) throws Exception 
{
+               assert ((-1 <= numberOfAttributes) && (numberOfAttributes <= 
25));
+               Tuple flinkTuple = null;
+               String rawTuple = null;
+
+               if (numberOfAttributes == -1) {
+                       rawTuple = "test";
+               } else {
+                       flinkTuple = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
+               }
+
+               final String[] schema;
+               if (numberOfAttributes == -1) {
+                       schema = new String[1];
+               } else {
+                       schema = new String[numberOfAttributes];
+               }
+               for (int i = 0; i < schema.length; ++i) {
+                       schema[i] = "a" + i;
+               }
+
+               final StreamRecord record = mock(StreamRecord.class);
+               if (numberOfAttributes == -1) {
+                       when(record.getValue()).thenReturn(rawTuple);
+               } else {
+                       when(record.getValue()).thenReturn(flinkTuple);
+               }
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final IRichBolt bolt = mock(IRichBolt.class);
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields(schema));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) 
null);
+               wrapper.setup(mock(Output.class), taskContext);
+               wrapper.open(null);
+
+               wrapper.processElement(record);
+               if (numberOfAttributes == -1) {
+                       verify(bolt).execute(eq(new 
StormTuple<String>(rawTuple, null)));
+               } else {
+                       verify(bolt).execute(eq(new 
StormTuple<Tuple>(flinkTuple, null)));
+               }
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testMultipleOutputStreams() throws Exception {
+               final boolean rawOutType1 = super.r.nextBoolean();
+               final boolean rawOutType2 = super.r.nextBoolean();
+
+               final StreamRecord record = mock(StreamRecord.class);
+               when(record.getValue()).thenReturn(2).thenReturn(3);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final Output output = mock(Output.class);
+
+               final TestBolt bolt = new TestBolt();
+               final HashSet<String> raw = new HashSet<String>();
+               if (rawOutType1) {
+                       raw.add("stream1");
+               }
+               if (rawOutType2) {
+                       raw.add("stream2");
+               }
+
+               final BoltWrapper wrapper = new BoltWrapper(bolt, (Fields) 
null, raw);
+               wrapper.setup(output, taskContext);
+               wrapper.open(null);
+
+               final SplitStreamType splitRecord = new 
SplitStreamType<Integer>();
+               if (rawOutType1) {
+                       splitRecord.streamId = "stream1";
+                       splitRecord.value = 2;
+               } else {
+                       splitRecord.streamId = "stream1";
+                       splitRecord.value = new Tuple1<Integer>(2);
+               }
+               wrapper.processElement(record);
+               verify(output).collect(new 
StreamRecord<SplitStreamType>(splitRecord, 0));
+
+               if (rawOutType2) {
+                       splitRecord.streamId = "stream2";
+                       splitRecord.value = 3;
+               } else {
+                       splitRecord.streamId = "stream2";
+                       splitRecord.value = new Tuple1<Integer>(3);
+               }
+               wrapper.processElement(record);
+               verify(output, times(2)).collect(new 
StreamRecord<SplitStreamType>(splitRecord, 0));
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testOpen() throws Exception {
+               final StormConfig stormConfig = new StormConfig();
+               final Configuration flinkConfig = new Configuration();
+
+               final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+               
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+                               .thenReturn(flinkConfig);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields("dummy"));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               final IRichBolt bolt = mock(IRichBolt.class);
+
+               BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, 
Object>(bolt);
+               wrapper.setup(mock(Output.class), taskContext);
+
+               // test without configuration
+               wrapper.open(null);
+               verify(bolt).prepare(any(Map.class), 
any(TopologyContext.class), any(OutputCollector.class));
+
+               // test with StormConfig
+               wrapper.open(null);
+               verify(bolt).prepare(same(stormConfig), 
any(TopologyContext.class),
+                               any(OutputCollector.class));
+
+               // test with Configuration
+               final TestDummyBolt testBolt = new TestDummyBolt();
+               wrapper = new BoltWrapper<Object, Object>(testBolt);
+               wrapper.setup(mock(Output.class), taskContext);
+
+               wrapper.open(null);
+               for (Entry<String, String> entry : 
flinkConfig.toMap().entrySet()) {
+                       Assert.assertEquals(entry.getValue(), 
testBolt.config.get(entry.getKey()));
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testOpenSink() throws Exception {
+               final StormConfig stormConfig = new StormConfig();
+               final Configuration flinkConfig = new Configuration();
+
+               final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+               
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+                               .thenReturn(flinkConfig);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final IRichBolt bolt = mock(IRichBolt.class);
+
+               BoltWrapper<Object, Object> wrapper = new BoltWrapper<Object, 
Object>(bolt);
+               wrapper.setup(mock(Output.class), taskContext);
+
+               // test without configuration
+               wrapper.open(null);
+               verify(bolt).prepare(any(Map.class), any(TopologyContext.class),
+                               isNull(OutputCollector.class));
+
+               // test with StormConfig
+               wrapper.open(null);
+               verify(bolt).prepare(same(stormConfig), 
any(TopologyContext.class),
+                               isNull(OutputCollector.class));
+
+               // test with Configuration
+               final TestDummyBolt testBolt = new TestDummyBolt();
+               wrapper = new BoltWrapper<Object, Object>(testBolt);
+               wrapper.setup(mock(Output.class), taskContext);
+
+               wrapper.open(null);
+               for (Entry<String, String> entry : 
flinkConfig.toMap().entrySet()) {
+                       Assert.assertEquals(entry.getValue(), 
testBolt.config.get(entry.getKey()));
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testClose() throws Exception {
+               final IRichBolt bolt = mock(IRichBolt.class);
+
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields("dummy"));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
+
+               final BoltWrapper<Object, Object> wrapper = new 
BoltWrapper<Object, Object>(bolt);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               wrapper.setup(mock(Output.class), taskContext);
+
+               wrapper.close();
+               wrapper.dispose();
+
+               verify(bolt).cleanup();
+       }
+
+       private static final class TestBolt implements IRichBolt {
+               private static final long serialVersionUID = 
7278692872260138758L;
+               private OutputCollector collector;
+
+               @SuppressWarnings("rawtypes")
+               @Override
+               public void prepare(Map stormConf, TopologyContext context, 
OutputCollector collector) {
+                       this.collector = collector;
+               }
+
+               int counter = 0;
+               @Override
+               public void execute(backtype.storm.tuple.Tuple input) {
+                       if (++counter % 2 == 1) {
+                               this.collector.emit("stream1", new 
Values(input.getInteger(0)));
+                       } else {
+                               this.collector.emit("stream2", new 
Values(input.getInteger(0)));
+                       }
+               }
+
+               @Override
+               public void cleanup() {}
+
+               @Override
+               public void declareOutputFields(OutputFieldsDeclarer declarer) {
+                       declarer.declareStream("stream1", new Fields("a1"));
+                       declarer.declareStream("stream2", new Fields("a2"));
+               }
+
+               @Override
+               public Map<String, Object> getComponentConfiguration() {
+                       return null;
+               }
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
new file mode 100644
index 0000000..69d4a8e
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/FlinkTopologyContextTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import java.util.HashMap;
+
+import backtype.storm.generated.Bolt;
+import backtype.storm.generated.SpoutSpec;
+import backtype.storm.generated.StateSpoutSpec;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.FlinkTopologyContext;
+import org.junit.Test;
+
+
+/*
+ * FlinkTopologyContext.getSources(componentId) and 
FlinkTopologyContext.getTargets(componentId) are not tested here,
+ * because those are tested in StormWrapperSetupHelperTest.
+ */
+public class FlinkTopologyContextTest extends AbstractTest {
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testAddTaskHook() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .addTaskHook(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetHooks() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .getHooks();
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric1() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (ICombiner) null, 0);
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric2() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (IReducer) null, 0);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testRegisteredMetric3() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .registerMetric(null, (IMetric) null, 0);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetRegisteredMetricByName() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .getRegisteredMetricByName(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetAllSubscribedState() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setAllSubscribedState(null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetSubscribedState1() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setSubscribedState(null, null);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testSetSubscribedState2() {
+               new FlinkTopologyContext(new StormTopology(new HashMap<String, 
SpoutSpec>(),
+                               new HashMap<String, Bolt>(), new 
HashMap<String, StateSpoutSpec>()), null, null,
+                               null, null, null, null, null, null, null, null, 
null, null, null, null, null)
+               .setSubscribedState(null, null, null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
new file mode 100644
index 0000000..4618101
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SetupOutputFieldsDeclarerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+
+public class SetupOutputFieldsDeclarerTest extends AbstractTest {
+
+       @Test
+       public void testDeclare() {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+
+               int numberOfAttributes = this.r.nextInt(26);
+               declarer.declare(createSchema(numberOfAttributes));
+               Assert.assertEquals(1, declarer.outputSchemas.size());
+               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
+                               .intValue());
+
+               final String sid = "streamId";
+               numberOfAttributes = this.r.nextInt(26);
+               declarer.declareStream(sid, createSchema(numberOfAttributes));
+               Assert.assertEquals(2, declarer.outputSchemas.size());
+               Assert.assertEquals(numberOfAttributes, 
declarer.outputSchemas.get(sid).intValue());
+       }
+
+       private Fields createSchema(final int numberOfAttributes) {
+               final ArrayList<String> schema = new 
ArrayList<String>(numberOfAttributes);
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       schema.add("a" + i);
+               }
+               return new Fields(schema);
+       }
+
+       @Test
+       public void testDeclareDirect() {
+               new SetupOutputFieldsDeclarer().declare(false, new Fields());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareDirectFail() {
+               new SetupOutputFieldsDeclarer().declare(true, new Fields());
+       }
+
+       @Test
+       public void testDeclareStream() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new 
Fields());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareStreamFail() {
+               new SetupOutputFieldsDeclarer().declareStream(null, new 
Fields());
+       }
+
+       @Test
+       public void testDeclareFullStream() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new 
Fields());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testDeclareFullStreamFailNonDefaultStream() {
+               new SetupOutputFieldsDeclarer().declareStream(null, false, new 
Fields());
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testDeclareFullStreamFailDirect() {
+               new 
SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new 
Fields());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
new file mode 100644
index 0000000..6b60d2b
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutCollectorTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.tuple.Values;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.SpoutCollector;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SpoutCollectorTest extends AbstractTest {
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testSpoutStormCollector() throws InstantiationException, 
IllegalAccessException {
+               for (int numberOfAttributes = -1; numberOfAttributes < 26; 
++numberOfAttributes) {
+                       final SourceContext flinkCollector = 
mock(SourceContext.class);
+                       Tuple flinkTuple = null;
+                       final Values tuple = new Values();
+
+                       SpoutCollector<?> collector;
+
+                       final String streamId = "streamId";
+                       HashMap<String, Integer> attributes = new 
HashMap<String, Integer>();
+                       attributes.put(streamId, numberOfAttributes);
+
+                       if (numberOfAttributes == -1) {
+                               collector = new SpoutCollector(attributes, 
flinkCollector);
+                               tuple.add(new Integer(this.r.nextInt()));
+
+                       } else {
+                               collector = new SpoutCollector(attributes, 
flinkCollector);
+                               flinkTuple = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
+
+                               for (int i = 0; i < numberOfAttributes; ++i) {
+                                       tuple.add(new 
Integer(this.r.nextInt()));
+                                       flinkTuple.setField(tuple.get(i), i);
+                               }
+                       }
+
+                       final List<Integer> taskIds;
+                       final Object messageId = new Integer(this.r.nextInt());
+
+                       taskIds = collector.emit(streamId, tuple, messageId);
+
+                       Assert.assertNull(taskIds);
+
+                       if (numberOfAttributes == -1) {
+                               verify(flinkCollector).collect(tuple.get(0));
+                       } else {
+                               verify(flinkCollector).collect(flinkTuple);
+                       }
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test(expected = UnsupportedOperationException.class)
+       public void testEmitDirect() {
+               new SpoutCollector<Object>(mock(HashMap.class), 
mock(SourceContext.class)).emitDirect(
+                               0, null, null,
+                               (Object) null);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
new file mode 100644
index 0000000..227d736
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/SpoutWrapperTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.util.FiniteSpout;
+import org.apache.flink.storm.util.FiniteTestSpout;
+import org.apache.flink.storm.util.StormConfig;
+import org.apache.flink.storm.util.TestDummySpout;
+import org.apache.flink.storm.wrappers.SetupOutputFieldsDeclarer;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.storm.wrappers.WrapperSetupHelper;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WrapperSetupHelper.class)
+public class SpoutWrapperTest extends AbstractTest {
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testRunPrepare() throws Exception {
+               final StormConfig stormConfig = new StormConfig();
+               stormConfig.put(this.r.nextInt(), this.r.nextInt());
+               final Configuration flinkConfig = new Configuration();
+               flinkConfig.setInteger("testKey", this.r.nextInt());
+
+               final ExecutionConfig taskConfig = mock(ExecutionConfig.class);
+               
when(taskConfig.getGlobalJobParameters()).thenReturn(null).thenReturn(stormConfig)
+                               .thenReturn(flinkConfig);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               when(taskContext.getExecutionConfig()).thenReturn(taskConfig);
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final IRichSpout spout = mock(IRichSpout.class);
+               SpoutWrapper spoutWrapper = new SpoutWrapper(spout);
+               spoutWrapper.setRuntimeContext(taskContext);
+               spoutWrapper.cancel();
+
+               // test without configuration
+               spoutWrapper.run(mock(SourceContext.class));
+               verify(spout).open(any(Map.class), any(TopologyContext.class),
+                               any(SpoutOutputCollector.class));
+
+               // test with StormConfig
+               spoutWrapper.run(mock(SourceContext.class));
+               verify(spout).open(eq(stormConfig), any(TopologyContext.class),
+                               any(SpoutOutputCollector.class));
+
+               // test with Configuration
+               final TestDummySpout testSpout = new TestDummySpout();
+               spoutWrapper = new SpoutWrapper(testSpout);
+               spoutWrapper.setRuntimeContext(taskContext);
+               spoutWrapper.cancel();
+
+               spoutWrapper.run(mock(SourceContext.class));
+               for (Entry<String, String> entry : 
flinkConfig.toMap().entrySet()) {
+                       Assert.assertEquals(entry.getValue(), 
testSpout.config.get(entry.getKey()));
+               }
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void testRunExecuteFixedNumber() throws Exception {
+               final SetupOutputFieldsDeclarer declarer = new 
SetupOutputFieldsDeclarer();
+               declarer.declare(new Fields("dummy"));
+               
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments()
+                               .thenReturn(declarer);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final IRichSpout spout = mock(IRichSpout.class);
+               final int numberOfCalls = this.r.nextInt(50);
+               final SpoutWrapper<?> spoutWrapper = new 
SpoutWrapper<Object>(spout,
+                               numberOfCalls);
+               spoutWrapper.setRuntimeContext(taskContext);
+
+               spoutWrapper.run(mock(SourceContext.class));
+               verify(spout, times(numberOfCalls)).nextTuple();
+       }
+
+       @Test
+       public void testRunExecuteFinite() throws Exception {
+               final int numberOfCalls = this.r.nextInt(50);
+
+               final LinkedList<Tuple1<Integer>> expectedResult = new 
LinkedList<Tuple1<Integer>>();
+               for (int i = numberOfCalls - 1; i >= 0; --i) {
+                       expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+               }
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final FiniteTestSpout spout = new 
FiniteTestSpout(numberOfCalls);
+               final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new 
SpoutWrapper<Tuple1<Integer>>(
+                               spout, -1);
+               spoutWrapper.setRuntimeContext(taskContext);
+
+               final TestContext collector = new TestContext();
+               spoutWrapper.run(collector);
+
+               Assert.assertEquals(expectedResult, collector.result);
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void runAndExecuteFiniteSpout() throws Exception {
+               final FiniteSpout stormSpout = mock(FiniteSpout.class);
+               when(stormSpout.reachedEnd()).thenReturn(false, false, false, 
true, false, false, true);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final SpoutWrapper<?> wrapper = new 
SpoutWrapper<Object>(stormSpout);
+               wrapper.setRuntimeContext(taskContext);
+
+               wrapper.run(mock(SourceContext.class));
+               verify(stormSpout, times(3)).nextTuple();
+       }
+
+       @SuppressWarnings("unchecked")
+       @Test
+       public void runAndExecuteFiniteSpout2() throws Exception {
+               final FiniteSpout stormSpout = mock(FiniteSpout.class);
+               when(stormSpout.reachedEnd()).thenReturn(true, false, true, 
false, true, false, true);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final SpoutWrapper<?> wrapper = new 
SpoutWrapper<Object>(stormSpout);
+               wrapper.setRuntimeContext(taskContext);
+
+               wrapper.run(mock(SourceContext.class));
+               verify(stormSpout, never()).nextTuple();
+       }
+
+       @Test
+       public void testCancel() throws Exception {
+               final int numberOfCalls = 5 + this.r.nextInt(5);
+
+               final StreamingRuntimeContext taskContext = 
mock(StreamingRuntimeContext.class);
+               
when(taskContext.getExecutionConfig()).thenReturn(mock(ExecutionConfig.class));
+               when(taskContext.getTaskStubParameters()).thenReturn(new 
Configuration());
+               when(taskContext.getTaskName()).thenReturn("name");
+
+               final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
+
+               final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new 
SpoutWrapper<Tuple1<Integer>>(spout);
+               spoutWrapper.setRuntimeContext(taskContext);
+
+               spoutWrapper.cancel();
+               final TestContext collector = new TestContext();
+               spoutWrapper.run(collector);
+
+               Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), 
collector.result);
+       }
+
+       @Test
+       public void testClose() throws Exception {
+               final IRichSpout spout = mock(IRichSpout.class);
+               final SpoutWrapper<Tuple1<Integer>> spoutWrapper = new 
SpoutWrapper<Tuple1<Integer>>(spout);
+
+               spoutWrapper.close();
+
+               verify(spout).close();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
new file mode 100644
index 0000000..155fcd9
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/StormTupleTest.java
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.storm.util.AbstractTest;
+import org.apache.flink.storm.wrappers.StormTuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+
+public class StormTupleTest extends AbstractTest {
+       private static final String fieldName = "fieldName";
+       private static final String fieldNamePojo = "member";
+
+       private int arity, index;
+
+       @Override
+       @Before
+       public void prepare() {
+               super.prepare();
+               this.arity = 1 + r.nextInt(25);
+               this.index = r.nextInt(this.arity);
+       }
+
+       @Test
+       public void nonTupleTest() {
+               final Object flinkTuple = this.r.nextInt();
+
+               final StormTuple<Object> tuple = new 
StormTuple<Object>(flinkTuple, null);
+               Assert.assertSame(flinkTuple, tuple.getValue(0));
+
+               final List<Object> values = tuple.getValues();
+               Assert.assertEquals(1, values.size());
+               Assert.assertEquals(flinkTuple, values.get(0));
+       }
+
+       @Test
+       public void tupleTest() throws InstantiationException, 
IllegalAccessException {
+               final int numberOfAttributes = this.r.nextInt(26);
+               final Object[] data = new Object[numberOfAttributes];
+
+               final Tuple flinkTuple = 
Tuple.getTupleClass(numberOfAttributes).newInstance();
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       data[i] = this.r.nextInt();
+                       flinkTuple.setField(data[i], i);
+               }
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               final List<Object> values = tuple.getValues();
+
+               Assert.assertEquals(numberOfAttributes, values.size());
+               for (int i = 0; i < numberOfAttributes; ++i) {
+                       Assert.assertEquals(flinkTuple.getField(i), 
values.get(i));
+               }
+
+               Assert.assertEquals(numberOfAttributes, tuple.size());
+       }
+
+       @Test
+       public void testBinary() {
+               final byte[] data = new byte[this.r.nextInt(15)];
+               this.r.nextBytes(data);
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getBinary(index));
+       }
+
+       @Test
+       public void testBoolean() {
+               final Boolean flinkTuple = this.r.nextBoolean();
+
+               final StormTuple<Boolean> tuple = new 
StormTuple<Boolean>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getBoolean(0));
+       }
+
+       @Test
+       public void testByte() {
+               final Byte flinkTuple = (byte) this.r.nextInt();
+
+               final StormTuple<Byte> tuple = new StormTuple<Byte>(flinkTuple, 
null);
+               Assert.assertEquals(flinkTuple, tuple.getByte(0));
+       }
+
+       @Test
+       public void testDouble() {
+               final Double flinkTuple = this.r.nextDouble();
+
+               final StormTuple<Double> tuple = new 
StormTuple<Double>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getDouble(0));
+       }
+
+       @Test
+       public void testFloat() {
+               final Float flinkTuple = this.r.nextFloat();
+
+               final StormTuple<Float> tuple = new 
StormTuple<Float>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getFloat(0));
+       }
+
+       @Test
+       public void testInteger() {
+               final Integer flinkTuple = this.r.nextInt();
+
+               final StormTuple<Integer> tuple = new 
StormTuple<Integer>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getInteger(0));
+       }
+
+       @Test
+       public void testLong() {
+               final Long flinkTuple = this.r.nextLong();
+
+               final StormTuple<Long> tuple = new StormTuple<Long>(flinkTuple, 
null);
+               Assert.assertEquals(flinkTuple, tuple.getLong(0));
+       }
+
+       @Test
+       public void testShort() {
+               final Short flinkTuple = (short) this.r.nextInt();
+
+               final StormTuple<Short> tuple = new 
StormTuple<Short>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getShort(0));
+       }
+
+       @Test
+       public void testString() {
+               final byte[] data = new byte[this.r.nextInt(15)];
+               this.r.nextBytes(data);
+               final String flinkTuple = new String(data);
+
+               final StormTuple<String> tuple = new 
StormTuple<String>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple, tuple.getString(0));
+       }
+
+       @Test
+       public void testBinaryTuple() {
+               final byte[] data = new byte[this.r.nextInt(15)];
+               this.r.nextBytes(data);
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getBinary(index));
+       }
+
+       @Test
+       public void testBooleanTuple() {
+               final Boolean data = this.r.nextBoolean();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getBoolean(index));
+       }
+
+       @Test
+       public void testByteTuple() {
+               final Byte data = (byte) this.r.nextInt();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getByte(index));
+       }
+
+       @Test
+       public void testDoubleTuple() {
+               final Double data = this.r.nextDouble();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getDouble(index));
+       }
+
+       @Test
+       public void testFloatTuple() {
+               final Float data = this.r.nextFloat();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getFloat(index));
+       }
+
+       @Test
+       public void testIntegerTuple() {
+               final Integer data = this.r.nextInt();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getInteger(index));
+       }
+
+       @Test
+       public void testLongTuple() {
+               final Long data = this.r.nextLong();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getLong(index));
+       }
+
+       @Test
+       public void testShortTuple() {
+               final Short data = (short) this.r.nextInt();
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getShort(index));
+       }
+
+       @Test
+       public void testStringTuple() {
+               final byte[] rawdata = new byte[this.r.nextInt(15)];
+               this.r.nextBytes(rawdata);
+               final String data = new String(rawdata);
+
+               final int index = this.r.nextInt(5);
+               final Tuple flinkTuple = new Tuple5<Object, Object, Object, 
Object, Object>();
+               flinkTuple.setField(data, index);
+
+               final StormTuple<Tuple> tuple = new 
StormTuple<Tuple>(flinkTuple, null);
+               Assert.assertEquals(flinkTuple.getField(index), 
tuple.getString(index));
+       }
+
+       @Test
+       public void testContains() throws Exception {
+               Fields schema = new Fields("a1", "a2");
+               StormTuple<Object> tuple = new 
StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+                               schema);
+
+               Assert.assertTrue(tuple.contains("a1"));
+               Assert.assertTrue(tuple.contains("a2"));
+               Assert.assertFalse(tuple.contains("a3"));
+       }
+
+       @Test
+       public void testGetFields() throws Exception {
+               Fields schema = new Fields();
+
+               Assert.assertSame(schema, new 
StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+                               schema).getFields());
+
+               Assert.assertSame(null, new StormTuple<Object>(null, 
schema).getFields());
+
+       }
+
+       @Test
+       public void testFieldIndex() throws Exception {
+               Fields schema = new Fields("a1", "a2");
+               StormTuple<Object> tuple = new 
StormTuple<Object>(Tuple.getTupleClass(1).newInstance(),
+                               schema);
+
+               Assert.assertEquals(0, tuple.fieldIndex("a1"));
+               Assert.assertEquals(1, tuple.fieldIndex("a2"));
+       }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testSelect() throws Exception {
+               Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+               Values values = new Values();
+
+               ArrayList<String> attributeNames = new ArrayList<String>(arity);
+               ArrayList<String> filterNames = new ArrayList<String>(arity);
+
+               for (int i = 0; i < arity; ++i) {
+                       tuple.setField(i, i);
+                       attributeNames.add("a" + i);
+
+                       if (r.nextBoolean()) {
+                               filterNames.add("a" + i);
+                               values.add(i);
+                       }
+               }
+               Fields schema = new Fields(attributeNames);
+               Fields selector = new Fields(filterNames);
+
+               Assert.assertEquals(values, new StormTuple(tuple, 
schema).select(selector));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetValueByField() throws Exception {
+               Object value = mock(Object.class);
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getValueByField(fieldName));
+
+       }
+
+       @Test
+       public void testGetValueByFieldPojo() throws Exception {
+               Object value = mock(Object.class);
+               TestPojoMember<Object> pojo = new TestPojoMember<Object>(value);
+               StormTuple<TestPojoMember<Object>> tuple = new 
StormTuple<TestPojoMember<Object>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetValueByFieldPojoGetter() throws Exception {
+               Object value = mock(Object.class);
+               TestPojoGetter<Object> pojo = new TestPojoGetter<Object>(value);
+               StormTuple<TestPojoGetter<Object>> tuple = new 
StormTuple<TestPojoGetter<Object>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getValueByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetStringByField() throws Exception {
+               String value = "stringValue";
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getStringByField(fieldName));
+       }
+
+       @Test
+       public void testGetStringByFieldPojo() throws Exception {
+               String value = "stringValue";
+               TestPojoMember<String> pojo = new TestPojoMember<String>(value);
+               StormTuple<TestPojoMember<String>> tuple = new 
StormTuple<TestPojoMember<String>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetStringByFieldPojoGetter() throws Exception {
+               String value = "stringValue";
+               TestPojoGetter<String> pojo = new TestPojoGetter<String>(value);
+               StormTuple<TestPojoGetter<String>> tuple = new 
StormTuple<TestPojoGetter<String>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getStringByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetIntegerByField() throws Exception {
+               Integer value = r.nextInt();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getIntegerByField(fieldName));
+       }
+
+       @Test
+       public void testGetIntegerByFieldPojo() throws Exception {
+               Integer value = r.nextInt();
+               TestPojoMember<Integer> pojo = new 
TestPojoMember<Integer>(value);
+               StormTuple<TestPojoMember<Integer>> tuple = new 
StormTuple<TestPojoMember<Integer>>(pojo,
+                               null);
+               Assert.assertSame(value, 
tuple.getIntegerByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetIntegerByFieldPojoGetter() throws Exception {
+               Integer value = r.nextInt();
+               TestPojoGetter<Integer> pojo = new 
TestPojoGetter<Integer>(value);
+               StormTuple<TestPojoGetter<Integer>> tuple = new 
StormTuple<TestPojoGetter<Integer>>(pojo,
+                               null);
+               Assert.assertSame(value, 
tuple.getIntegerByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetLongByField() throws Exception {
+               Long value = r.nextLong();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getLongByField(fieldName));
+       }
+
+       @Test
+       public void testGetLongByFieldPojo() throws Exception {
+               Long value = r.nextLong();
+               TestPojoMember<Long> pojo = new TestPojoMember<Long>(value);
+               StormTuple<TestPojoMember<Long>> tuple = new 
StormTuple<TestPojoMember<Long>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetLongByFieldPojoGetter() throws Exception {
+               Long value = r.nextLong();
+               TestPojoGetter<Long> pojo = new TestPojoGetter<Long>(value);
+               StormTuple<TestPojoGetter<Long>> tuple = new 
StormTuple<TestPojoGetter<Long>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getLongByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetBooleanByField() throws Exception {
+               Boolean value = r.nextBoolean();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertEquals(value, tuple.getBooleanByField(fieldName));
+       }
+
+       @Test
+       public void testGetBooleanByFieldPojo() throws Exception {
+               Boolean value = r.nextBoolean();
+               TestPojoMember<Boolean> pojo = new 
TestPojoMember<Boolean>(value);
+               StormTuple<TestPojoMember<Boolean>> tuple = new 
StormTuple<TestPojoMember<Boolean>>(pojo,
+                               null);
+               Assert.assertSame(value, 
tuple.getBooleanByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetBooleanByFieldPojoGetter() throws Exception {
+               Boolean value = r.nextBoolean();
+               TestPojoGetter<Boolean> pojo = new 
TestPojoGetter<Boolean>(value);
+               StormTuple<TestPojoGetter<Boolean>> tuple = new 
StormTuple<TestPojoGetter<Boolean>>(pojo,
+                               null);
+               Assert.assertSame(value, 
tuple.getBooleanByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetShortByField() throws Exception {
+               Short value = (short) r.nextInt();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getShortByField(fieldName));
+       }
+
+       @Test
+       public void testGetShortByFieldPojo() throws Exception {
+               Short value = (short) r.nextInt();
+               TestPojoMember<Short> pojo = new TestPojoMember<Short>(value);
+               StormTuple<TestPojoMember<Short>> tuple = new 
StormTuple<TestPojoMember<Short>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetShortByFieldPojoGetter() throws Exception {
+               Short value = (short) r.nextInt();
+               TestPojoGetter<Short> pojo = new TestPojoGetter<Short>(value);
+               StormTuple<TestPojoGetter<Short>> tuple = new 
StormTuple<TestPojoGetter<Short>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getShortByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetByteByField() throws Exception {
+               Byte value = new Byte((byte) r.nextInt());
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getByteByField(fieldName));
+       }
+
+       @Test
+       public void testGetByteByFieldPojo() throws Exception {
+               Byte value = new Byte((byte) r.nextInt());
+               TestPojoMember<Byte> pojo = new TestPojoMember<Byte>(value);
+               StormTuple<TestPojoMember<Byte>> tuple = new 
StormTuple<TestPojoMember<Byte>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetByteByFieldPojoGetter() throws Exception {
+               Byte value = new Byte((byte) r.nextInt());
+               TestPojoGetter<Byte> pojo = new TestPojoGetter<Byte>(value);
+               StormTuple<TestPojoGetter<Byte>> tuple = new 
StormTuple<TestPojoGetter<Byte>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getByteByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetDoubleByField() throws Exception {
+               Double value = r.nextDouble();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getDoubleByField(fieldName));
+       }
+
+       @Test
+       public void testGetDoubleByFieldPojo() throws Exception {
+               Double value = r.nextDouble();
+               TestPojoMember<Double> pojo = new TestPojoMember<Double>(value);
+               StormTuple<TestPojoMember<Double>> tuple = new 
StormTuple<TestPojoMember<Double>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetDoubleByFieldPojoGetter() throws Exception {
+               Double value = r.nextDouble();
+               TestPojoGetter<Double> pojo = new TestPojoGetter<Double>(value);
+               StormTuple<TestPojoGetter<Double>> tuple = new 
StormTuple<TestPojoGetter<Double>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getDoubleByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetFloatByField() throws Exception {
+               Float value = r.nextFloat();
+               StormTuple tuple = testGetByField(arity, index, value);
+               Assert.assertSame(value, tuple.getFloatByField(fieldName));
+       }
+
+       @Test
+       public void testGetFloatByFieldPojo() throws Exception {
+               Float value = r.nextFloat();
+               TestPojoMember<Float> pojo = new TestPojoMember<Float>(value);
+               StormTuple<TestPojoMember<Float>> tuple = new 
StormTuple<TestPojoMember<Float>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetFloatByFieldPojoGetter() throws Exception {
+               Float value = r.nextFloat();
+               TestPojoGetter<Float> pojo = new TestPojoGetter<Float>(value);
+               StormTuple<TestPojoGetter<Float>> tuple = new 
StormTuple<TestPojoGetter<Float>>(pojo,
+                               null);
+               Assert.assertSame(value, tuple.getFloatByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings("rawtypes")
+       @Test
+       public void testGetBinaryByField() throws Exception {
+               byte[] data = new byte[1 + r.nextInt(20)];
+               r.nextBytes(data);
+               StormTuple tuple = testGetByField(arity, index, data);
+               Assert.assertSame(data, tuple.getBinaryByField(fieldName));
+       }
+
+       @Test
+       public void testGetBinaryFieldPojo() throws Exception {
+               byte[] data = new byte[1 + r.nextInt(20)];
+               r.nextBytes(data);
+               TestPojoMember<byte[]> pojo = new TestPojoMember<byte[]>(data);
+               StormTuple<TestPojoMember<byte[]>> tuple = new 
StormTuple<TestPojoMember<byte[]>>(pojo,
+                               null);
+               Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+       }
+
+       @Test
+       public void testGetBinaryByFieldPojoGetter() throws Exception {
+               byte[] data = new byte[1 + r.nextInt(20)];
+               r.nextBytes(data);
+               TestPojoGetter<byte[]> pojo = new TestPojoGetter<byte[]>(data);
+               StormTuple<TestPojoGetter<byte[]>> tuple = new 
StormTuple<TestPojoGetter<byte[]>>(pojo,
+                               null);
+               Assert.assertSame(data, tuple.getBinaryByField(fieldNamePojo));
+       }
+
+       @SuppressWarnings({ "unchecked", "rawtypes" })
+       private <T> StormTuple testGetByField(int arity, int index, T value)
+                       throws Exception {
+
+               assert (index < arity);
+
+               Tuple tuple = Tuple.getTupleClass(arity).newInstance();
+               tuple.setField(value, index);
+
+               ArrayList<String> attributeNames = new ArrayList<String>(arity);
+               for(int i = 0; i < arity; ++i) {
+                       if(i == index) {
+                               attributeNames.add(fieldName);
+                       } else {
+                               attributeNames.add("" + i);
+                       }
+               }
+               Fields schema = new Fields(attributeNames);
+
+               return new StormTuple(tuple, schema);
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetSourceGlobalStreamid() {
+               new StormTuple<Object>(null, null).getSourceGlobalStreamid();
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetSourceComponent() {
+               new StormTuple<Object>(null, null).getSourceComponent();
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetSourceTask() {
+               new StormTuple<Object>(null, null).getSourceTask();
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetSourceStreamId() {
+               new StormTuple<Object>(null, null).getSourceStreamId();
+       }
+
+       @Test(expected = UnsupportedOperationException.class)
+       public void testGetMessageId() {
+               new StormTuple<Object>(null, null).getMessageId();
+       }
+
+       public static class TestPojoMember<T> {
+               public T member;
+
+               public TestPojoMember(T value) {
+                       this.member = value;
+               }
+       }
+
+       public static class TestPojoGetter<T> {
+               private T member;
+
+               public TestPojoGetter(T value) {
+                       this.member = value;
+               }
+
+               public T getMember() {
+                       return this.member;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
new file mode 100644
index 0000000..4c4749a
--- /dev/null
+++ 
b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/TestContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wrappers;
+
+import org.apache.flink.api.java.tuple.Tuple1;
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.LinkedList;
+
+class TestContext implements SourceContext<Tuple1<Integer>> {
+       public LinkedList<Tuple1<Integer>> result = new 
LinkedList<Tuple1<Integer>>();
+
+       public TestContext() {
+       }
+
+       @Override
+       public void collect(final Tuple1<Integer> record) {
+               this.result.add(record.copy());
+       }
+
+       @Override
+       public void collectWithTimestamp(Tuple1<Integer> element, long 
timestamp) {
+               this.result.add(element.copy());
+       }
+
+       @Override
+       public void emitWatermark(Watermark mark) {
+               // ignore it
+       }
+
+       @Override
+       public Object getCheckpointLock() {
+               return null;
+       }
+
+       @Override
+       public void close() {
+
+       }
+}

Reply via email to