http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
deleted file mode 100644
index 5efff66..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormInMemorySpout.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-/**
- * Implements a Storm Spout that reads String[] data stored in the memory. The 
spout stops
- * automatically when it emitted all of the data.
- */
-public class FiniteStormInMemorySpout extends StormInMemorySpout<String> 
implements
-               FiniteStormSpout {
-       private static final long serialVersionUID = -4008858647468647019L;
-
-       public FiniteStormInMemorySpout(String[] source) {
-               super(source);
-       }
-
-       @Override
-       public boolean reachedEnd() {
-               return counter >= source.length;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
deleted file mode 100644
index ec9adfe..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/OutputFormatter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-import java.io.Serializable;
-
-public interface OutputFormatter extends Serializable {
-
-       /**
-        * Converts a Storm {@link Tuple} to a string. This method is used for 
formatting the output
-        * tuples before writing them out to a file or to the consol.
-        *
-        * @param input The tuple to be formatted
-        * @return The string result of the formatting
-        */
-       public String format(Tuple input);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
deleted file mode 100644
index 0702e94..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/SimpleOutputFormatter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class SimpleOutputFormatter implements OutputFormatter {
-       private static final long serialVersionUID = 6349573860144270338L;
-
-       /**
-        * Converts a Storm {@link Tuple} with 1 field to a string by 
retrieving the value of that
-        * field. This method is used for formatting raw outputs wrapped in 
tuples, before writing them
-        * out to a file or to the consol.
-        *
-        * @param input
-        *              The tuple to be formatted
-        * @return The string result of the formatting
-        */
-       @Override
-       public String format(final Tuple input) {
-               if (input.getValues().size() != 1) {
-                       throw new RuntimeException("The output is not raw");
-               }
-               return input.getValue(0).toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
deleted file mode 100644
index ee8dca4..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltFileSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.TopologyContext;
-
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a sink that write the received data to the given file (as a 
result of {@code Object.toString()} for each
- * attribute).
- */
-public final class StormBoltFileSink extends AbstractStormBoltSink {
-       private static final long serialVersionUID = 2014027288631273666L;
-
-       private final String path;
-       private BufferedWriter writer;
-
-       public StormBoltFileSink(final String path) {
-               this(path, new SimpleOutputFormatter());
-       }
-
-       public StormBoltFileSink(final String path, final OutputFormatter 
formatter) {
-               super(formatter);
-               this.path = path;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepareSimple(final Map stormConf, final TopologyContext 
context) {
-               try {
-                       this.writer = new BufferedWriter(new 
FileWriter(this.path));
-               } catch (final IOException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @Override
-       public void writeExternal(final String line) {
-               try {
-                       this.writer.write(line + "\n");
-               } catch (final IOException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @Override
-       public void cleanup() {
-               if (this.writer != null) {
-                       try {
-                               this.writer.close();
-                       } catch (final IOException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
deleted file mode 100644
index 3bf49d0..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormBoltPrintSink.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.task.TopologyContext;
-
-import java.util.Map;
-
-/**
- * Implements a sink that prints the received data to {@code stdout}.
- */
-public final class StormBoltPrintSink extends AbstractStormBoltSink {
-       private static final long serialVersionUID = -6650011223001009519L;
-
-       public StormBoltPrintSink(OutputFormatter formatter) {
-               super(formatter);
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepareSimple(final Map stormConf, final TopologyContext 
context) {
-               /* nothing to do */
-       }
-
-       @Override
-       public void writeExternal(final String line) {
-               System.out.println(line);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
deleted file mode 100644
index 0611e37..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormFileSpout.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Implements a Storm Spout that reads data from a given local file.
- */
-public class StormFileSpout extends AbstractStormSpout {
-       private static final long serialVersionUID = -6996907090003590436L;
-
-       public final static String INPUT_FILE_PATH = "input.path";
-
-       protected String path = null;
-       protected BufferedReader reader;
-
-       public StormFileSpout() {}
-
-       public StormFileSpout(final String path) {
-               this.path = path;
-       }
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void open(final Map conf, final TopologyContext context, final 
SpoutOutputCollector collector) {
-               super.open(conf, context, collector);
-
-               Object configuredPath = conf.get(INPUT_FILE_PATH);
-               if(configuredPath != null) {
-                       this.path = (String)configuredPath;
-               }
-
-               try {
-                       this.reader = new BufferedReader(new 
FileReader(this.path));
-               } catch (final FileNotFoundException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @Override
-       public void close() {
-               if (this.reader != null) {
-                       try {
-                               this.reader.close();
-                       } catch (final IOException e) {
-                               throw new RuntimeException(e);
-                       }
-               }
-       }
-
-       @Override
-       public void nextTuple() {
-               String line;
-               try {
-                       line = this.reader.readLine();
-                       if (line != null) {
-                               this.collector.emit(new Values(line));
-                       }
-               } catch (final IOException e) {
-                       throw new RuntimeException(e);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
deleted file mode 100644
index f6ae622..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/StormInMemorySpout.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Values;
-
-/**
- * Implements a Storm Spout that reads data from an in.
- */
-public class StormInMemorySpout<T> extends AbstractStormSpout {
-       private static final long serialVersionUID = -4008858647468647019L;
-
-       protected T[] source;
-       protected int counter = 0;
-
-       public StormInMemorySpout(T[] source) {
-               this.source = source;
-       }
-
-       @Override
-       public void nextTuple() {
-               if (this.counter < source.length) {
-                       this.collector.emit(new Values(source[this.counter++]));
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
deleted file mode 100644
index 6419ee3..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/util/TupleOutputFormatter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Tuple;
-
-public class TupleOutputFormatter implements OutputFormatter {
-       private static final long serialVersionUID = -599665757723851761L;
-
-       @Override
-       public String format(final Tuple input) {
-               final StringBuilder stringBuilder = new StringBuilder();
-               stringBuilder.append("(");
-               for (final Object attribute : input.getValues()) {
-                       stringBuilder.append(attribute);
-                       stringBuilder.append(",");
-               }
-               stringBuilder.replace(stringBuilder.length() - 1, 
stringBuilder.length(), ")");
-               return stringBuilder.toString();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
deleted file mode 100644
index 6f7b6fb..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCount.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichBolt;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm bolt within a Flink Streaming program.</li>
- * </ul>
- */
-public class BoltTokenizerWordCount {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               final DataStream<String> text = getTextDataStream(env);
-
-               final DataStream<Tuple2<String, Integer>> counts = text
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // this is done by a Storm bolt that is wrapped 
accordingly
-                               .transform("StormBoltTokenizer",
-                                               TypeExtractor.getForObject(new 
Tuple2<String, Integer>("", 0)),
-                                               new StormBoltWrapper<String, 
Tuple2<String, Integer>>(new StormBoltTokenizer()))
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .keyBy(0).sum(1);
-
-               // emit result
-               if (fileOutput) {
-                       counts.writeAsText(outputPath);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount with Storm bolt tokenizer");
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: 
BoltTokenizerWordCount <text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing BoltTokenizerWordCount 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: BoltTokenizerWordCount 
<text path> <result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       return env.readTextFile(textPath);
-               }
-
-               return env.fromElements(WordCountData.WORDS);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
deleted file mode 100644
index 300f5bc..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichBolt;
-
-import org.apache.flink.api.java.io.CsvInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataPojos.Sentence;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. 
In contrast to
- * {@link BoltTokenizerWordCount} the tokenizer's input is a POJO type and the 
single field is accessed by name.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>how to access attributes by name for POJO type input streams
- * </ul>
- */
-public class BoltTokenizerWordCountPojo {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               final DataStream<Sentence> text = getTextDataStream(env);
-
-               final DataStream<Tuple2<String, Integer>> counts = text
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // this is done by a Storm bolt that is wrapped 
accordingly
-                               .transform("StormBoltTokenizer",
-                                               TypeExtractor.getForObject(new 
Tuple2<String, Integer>("", 0)),
-                                               new StormBoltWrapper<Sentence, 
Tuple2<String, Integer>>(
-                                                               new 
StormBoltTokenizerByName()))
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .keyBy(0).sum(1);
-
-               // emit result
-               if (fileOutput) {
-                       counts.writeAsText(outputPath);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount with Storm bolt tokenizer");
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: 
BoltTokenizerWordCount <text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing BoltTokenizerWordCount 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: BoltTokenizerWordCount 
<text path> <result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<Sentence> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       PojoTypeInfo<Sentence> sourceType = 
(PojoTypeInfo)TypeExtractor
-                                       .getForObject(new Sentence(""));
-                       return env.createInput(new CsvInputFormat<Sentence>(new 
Path(
-                                       textPath), 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
-                                       CsvInputFormat.DEFAULT_LINE_DELIMITER, 
sourceType),
-                                       sourceType);
-               }
-
-               return env.fromElements(WordCountDataPojos.SENTENCES);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
deleted file mode 100644
index ed01181..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.java.io.CsvInputFormat;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.WordCountDataTuple;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The tokenizer step is performed by a Storm {@link IRichBolt bolt}. 
In contrast to
- * {@link BoltTokenizerWordCount} the tokenizer's input is a {@link Tuple} 
type and the single field is accessed by
- * name.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>how to access attributes by name for {@link Tuple} type input streams
- * </ul>
- */
-public class BoltTokenizerWordCountWithNames {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               final DataStream<Tuple1<String>> text = getTextDataStream(env);
-
-               final DataStream<Tuple2<String, Integer>> counts = text
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // this is done by a Storm bolt that is wrapped 
accordingly
-                               .transform("StormBoltTokenizer",
-                                               TypeExtractor.getForObject(new 
Tuple2<String, Integer>("", 0)),
-                                               new 
StormBoltWrapper<Tuple1<String>, Tuple2<String, Integer>>(
-                                                               new 
StormBoltTokenizerByName(), new Fields("sentence")))
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .keyBy(0).sum(1);
-
-               // emit result
-               if (fileOutput) {
-                       counts.writeAsText(outputPath);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount with Storm bolt tokenizer");
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: 
BoltTokenizerWordCount <text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing BoltTokenizerWordCount 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: BoltTokenizerWordCount 
<text path> <result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<Tuple1<String>> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       TupleTypeInfo<Tuple1<String>> sourceType = 
(TupleTypeInfo<Tuple1<String>>)TypeExtractor
-                                       .getForObject(new Tuple1<String>(""));
-                       return env.createInput(new 
CsvInputFormat<Tuple1<String>>(new Path(
-                                       textPath), 
CsvInputFormat.DEFAULT_LINE_DELIMITER,
-                                       CsvInputFormat.DEFAULT_LINE_DELIMITER, 
sourceType),
-                                       sourceType);
-               }
-
-               return env.fromElements(WordCountDataTuple.TUPLES);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
deleted file mode 100644
index 21d7811..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout;
-import org.apache.flink.stormcompatibility.wrappers.StormFiniteSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.util.Collector;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The used data source is a Storm {@link IRichSpout bolt}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>use a Storm spout within a Flink Streaming program.</li>
- * </ul>
- */
-public class SpoutSourceWordCount {
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!parseParameters(args)) {
-                       return;
-               }
-
-               // set up the execution environment
-               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               final DataStream<String> text = getTextDataStream(env);
-
-               final DataStream<Tuple2<String, Integer>> counts =
-                               // split up the lines in pairs (2-tuples) 
containing: (word,1)
-                               text.flatMap(new Tokenizer())
-                               // group by the tuple field "0" and sum up 
tuple field "1"
-                               .keyBy(0).sum(1);
-
-               // emit result
-               if (fileOutput) {
-                       counts.writeAsText(outputPath);
-               } else {
-                       counts.print();
-               }
-
-               // execute program
-               env.execute("Streaming WordCount with Storm spout source");
-       }
-
-       // 
*************************************************************************
-       // USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * Implements the string tokenizer that splits sentences into words as 
a user-defined FlatMapFunction. The function
-        * takes a line (String) and splits it into multiple pairs in the form 
of "(word,1)" (Tuple2<String, Integer>).
-        */
-       public static final class Tokenizer implements FlatMapFunction<String, 
Tuple2<String, Integer>> {
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void flatMap(final String value, final 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                       // normalize and split the line
-                       final String[] tokens = 
value.toLowerCase().split("\\W+");
-
-                       // emit the pairs
-                       for (final String token : tokens) {
-                               if (token.length() > 0) {
-                                       out.collect(new Tuple2<String, 
Integer>(token, 1));
-                               }
-                       }
-               }
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: SpoutSourceWordCount 
<text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing SpoutSourceWordCount 
example with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: SpoutSourceWordCount <text 
path> <result path>");
-               }
-               return true;
-       }
-
-       private static DataStream<String> getTextDataStream(final 
StreamExecutionEnvironment env) {
-               if (fileOutput) {
-                       // read the text file from given input path
-                       final String[] tokens = textPath.split(":");
-                       final String localFile = tokens[tokens.length - 1];
-                       return env.addSource(
-                                       new StormFiniteSpoutWrapper<String>(new 
StormWordCountFileSpout(localFile),
-                                                       new String[] { 
Utils.DEFAULT_STREAM_ID }),
-                                       
TypeExtractor.getForClass(String.class)).setParallelism(1);
-               }
-
-               return env.addSource(
-                               new StormFiniteSpoutWrapper<String>(new 
StormWordCountInMemorySpout(),
-                                               new String[] { 
Utils.DEFAULT_STREAM_ID }),
-                               
TypeExtractor.getForClass(String.class)).setParallelism(1);
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
deleted file mode 100644
index 836c8e9..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
- * same way as to a Storm {@link LocalCluster}.
- * <p/>
- * This example shows how to run program directly within Java, thus it cannot 
be used to submit a {@link StormTopology}
- * via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink</li>
- * </ul>
- */
-public class StormWordCountLocal {
-       public final static String topologyId = "Streaming WordCount";
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!WordCountTopology.parseParameters(args)) {
-                       return;
-               }
-
-               // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
-
-               // execute program locally
-               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
-
-               Utils.sleep(10 * 1000);
-
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
-               cluster.shutdown();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
deleted file mode 100644
index f51afab..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountNamedLocal.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkLocalCluster;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
- * same way as to a Storm {@link LocalCluster}. In contrast to {@link 
StormWordCountLocal} all bolts access the field of
- * input tuples by name instead of index.
- * <p/>
- * This example shows how to run program directly within Java, thus it cannot 
be used to submit a {@link StormTopology}
- * via Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>run a regular Storm program locally on Flink
- * </ul>
- */
-public class StormWordCountNamedLocal {
-       public final static String topologyId = "Streaming WordCountName";
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!WordCountTopology.parseParameters(args)) {
-                       return;
-               }
-
-               // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology(false);
-
-               // execute program locally
-               final FlinkLocalCluster cluster = 
FlinkLocalCluster.getLocalCluster();
-               cluster.submitTopology(topologyId, null, 
builder.createTopology());
-
-               Utils.sleep(10 * 1000);
-
-               // TODO kill does no do anything so far
-               cluster.killTopology(topologyId);
-               cluster.shutdown();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
deleted file mode 100644
index 3c79eda..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteByClient.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link NimbusClient}. The Flink 
cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java, thus it cannot be 
used to submit a {@link StormTopology} via
- * Flink command line clients (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormWordCountRemoteByClient {
-       public final static String topologyId = "Streaming WordCount";
-       private final static String uploadedJarLocation = 
"target/WordCount-StormTopology.jar";
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws 
AlreadyAliveException, InvalidTopologyException,
-                       NotAliveException {
-
-               if (!WordCountTopology.parseParameters(args)) {
-                       return;
-               }
-
-               // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
-
-               // execute program on Flink cluster
-               final Config conf = new Config();
-               // can be changed to remote address
-               conf.put(Config.NIMBUS_HOST, "localhost");
-               // use default flink jobmanger.rpc.port
-               conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
-
-               final FlinkClient cluster = 
FlinkClient.getConfiguredClient(conf);
-               cluster.submitTopology(topologyId, uploadedJarLocation, 
builder.createTopology());
-
-               Utils.sleep(5 * 1000);
-
-               cluster.killTopology(topologyId);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
deleted file mode 100644
index de84f55..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkClient;
-import org.apache.flink.stormcompatibility.api.FlinkSubmitter;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology} and 
submitted to Flink for execution in the
- * same way as to a Storm cluster similar to {@link StormSubmitter}. The Flink 
cluster can be local or remote.
- * <p/>
- * This example shows how to submit the program via Java as well as Flink's 
command line client (ie, bin/flink).
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>submit a regular Storm program to a local or remote Flink cluster.</li>
- * </ul>
- */
-public class StormWordCountRemoteBySubmitter {
-       public final static String topologyId = "Streaming WordCount";
-
-       // 
*************************************************************************
-       // PROGRAM
-       // 
*************************************************************************
-
-       public static void main(final String[] args) throws Exception {
-
-               if (!WordCountTopology.parseParameters(args)) {
-                       return;
-               }
-
-               // build Topology the Storm way
-               final FlinkTopologyBuilder builder = 
WordCountTopology.buildTopology();
-
-               // execute program on Flink cluster
-               final Config conf = new Config();
-               // We can set Jobmanager host/port values manually or leave 
them blank
-               // if not set and
-               // - executed within Java, default values "localhost" and 
"6123" are set by FlinkSubmitter
-               // - executed via bin/flink values from flink-conf.yaml are set 
by FlinkSubmitter.
-               // conf.put(Config.NIMBUS_HOST, "localhost");
-               // conf.put(Config.NIMBUS_THRIFT_PORT, new Integer(6123));
-
-               // The user jar file must be specified via JVM argument if 
executed via Java.
-               // => -Dstorm.jar=target/WordCount-StormTopology.jar
-               // If bin/flink is used, the jar file is detected automatically.
-               FlinkSubmitter.submitTopology(topologyId, conf, 
builder.createTopology());
-
-               Thread.sleep(5 * 1000);
-
-               FlinkClient.getConfiguredClient(conf).killTopology(topologyId);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
deleted file mode 100644
index 45be821..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/WordCountTopology.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.stormcompatibility.util.OutputFormatter;
-import org.apache.flink.stormcompatibility.util.StormBoltFileSink;
-import org.apache.flink.stormcompatibility.util.StormBoltPrintSink;
-import org.apache.flink.stormcompatibility.util.TupleOutputFormatter;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounter;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltCounterByName;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizer;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormBoltTokenizerByName;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountFileSpout;
-import 
org.apache.flink.stormcompatibility.wordcount.stormoperators.StormWordCountInMemorySpout;
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence 
histogram over text files in a streaming
- * fashion. The program is constructed as a regular {@link StormTopology}.
- * <p/>
- * <p/>
- * The input is a plain text file with lines separated by newline characters.
- * <p/>
- * <p/>
- * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from 
{@link WordCountData}.
- * <p/>
- * <p/>
- * This example shows how to:
- * <ul>
- * <li>how to construct a regular Storm topology as Flink program</li>
- * </ul>
- */
-public class WordCountTopology {
-       public final static String spoutId = "source";
-       public final static String tokenierzerId = "tokenizer";
-       public final static String counterId = "counter";
-       public final static String sinkId = "sink";
-       private final static OutputFormatter formatter = new 
TupleOutputFormatter();
-
-       public static FlinkTopologyBuilder buildTopology() {
-               return buildTopology(true);
-       }
-
-       public static FlinkTopologyBuilder buildTopology(boolean indexOrName) {
-
-               final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-
-               // get input data
-               if (fileInputOutput) {
-                       // read the text file from given input path
-                       final String[] tokens = textPath.split(":");
-                       final String inputFile = tokens[tokens.length - 1];
-                       builder.setSpout(spoutId, new 
StormWordCountFileSpout(inputFile));
-               } else {
-                       builder.setSpout(spoutId, new 
StormWordCountInMemorySpout());
-               }
-
-               if (indexOrName) {
-                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
-                       builder.setBolt(tokenierzerId, new 
StormBoltTokenizer(), 4).shuffleGrouping(spoutId);
-                       // group by the tuple field "0" and sum up tuple field 
"1"
-                       builder.setBolt(counterId, new StormBoltCounter(), 
4).fieldsGrouping(tokenierzerId,
-                                       new 
Fields(StormBoltTokenizer.ATTRIBUTE_WORD));
-               } else {
-                       // split up the lines in pairs (2-tuples) containing: 
(word,1)
-                       builder.setBolt(tokenierzerId, new 
StormBoltTokenizerByName(), 4).shuffleGrouping(
-                                       spoutId);
-                       // group by the tuple field "0" and sum up tuple field 
"1"
-                       builder.setBolt(counterId, new 
StormBoltCounterByName(), 4).fieldsGrouping(
-                                       tokenierzerId, new 
Fields(StormBoltTokenizerByName.ATTRIBUTE_WORD));
-               }
-
-               // emit result
-               if (fileInputOutput) {
-                       // read the text file from given input path
-                       final String[] tokens = outputPath.split(":");
-                       final String outputFile = tokens[tokens.length - 1];
-                       builder.setBolt(sinkId, new 
StormBoltFileSink(outputFile, formatter)).shuffleGrouping(counterId);
-               } else {
-                       builder.setBolt(sinkId, new 
StormBoltPrintSink(formatter), 4).shuffleGrouping(counterId);
-               }
-
-               return builder;
-       }
-
-       // 
*************************************************************************
-       // UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileInputOutput = false;
-       private static String textPath;
-       private static String outputPath;
-
-       static boolean parseParameters(final String[] args) {
-
-               if (args.length > 0) {
-                       // parse input arguments
-                       fileInputOutput = true;
-                       if (args.length == 2) {
-                               textPath = args[0];
-                               outputPath = args[1];
-                       } else {
-                               System.err.println("Usage: StormWordCount* 
<text path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing StormWordCount* example 
with built-in default data");
-                       System.out.println("  Provide parameters to read input 
data from a file");
-                       System.out.println("  Usage: StormWordCount* <text 
path> <result path>");
-               }
-
-               return true;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
deleted file mode 100644
index 1544c19..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements the word counter that the occurrence of each unique word. The 
bolt takes a pair (input tuple schema:
- * {@code <String,Integer>}) and sums the given word count for each unique 
word (output tuple schema:
- * {@code <String,Integer>} ).
- */
-public class StormBoltCounter implements IRichBolt {
-       private static final long serialVersionUID = 399619605462625934L;
-
-       public static final String ATTRIBUTE_WORD = "word";
-       public static final String ATTRIBUTE_COUNT = "count";
-
-       private final HashMap<String, Count> counts = new HashMap<String, 
Count>();
-       private OutputCollector collector;
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
-               this.collector = collector;
-       }
-
-       @Override
-       public void execute(final Tuple input) {
-               final String word = 
input.getString(StormBoltTokenizer.ATTRIBUTE_WORD_INDEX);
-
-               Count currentCount = this.counts.get(word);
-               if (currentCount == null) {
-                       currentCount = new Count();
-                       this.counts.put(word, currentCount);
-               }
-               currentCount.count += 
input.getInteger(StormBoltTokenizer.ATTRIBUTE_COUNT_INDEX);
-
-               this.collector.emit(new Values(word, currentCount.count));
-       }
-
-       @Override
-       public void cleanup() {/* nothing to do */}
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-
-       /**
-        * A counter helper to emit immutable tuples to the given 
stormCollector and avoid unnecessary object
-        * creating/deletion.
-        */
-       private static final class Count {
-               public int count;
-
-               public Count() {/* nothing to do */}
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
deleted file mode 100644
index bf940c3..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltCounterByName.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Implements the word counter that the occurrence of each unique word. The 
bolt takes a pair (input tuple schema:
- * {@code <String,Integer>}) and sums the given word count for each unique 
word (output tuple schema:
- * {@code <String,Integer>} ).
- */
-public class StormBoltCounterByName implements IRichBolt {
-       private static final long serialVersionUID = 399619605462625934L;
-
-       public static final String ATTRIBUTE_WORD = "word";
-       public static final String ATTRIBUTE_COUNT = "count";
-
-       private final HashMap<String, Count> counts = new HashMap<String, 
Count>();
-       private OutputCollector collector;
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
-               this.collector = collector;
-       }
-
-       @Override
-       public void execute(final Tuple input) {
-               final String word = 
input.getStringByField(StormBoltTokenizer.ATTRIBUTE_WORD);
-
-               Count currentCount = this.counts.get(word);
-               if (currentCount == null) {
-                       currentCount = new Count();
-                       this.counts.put(word, currentCount);
-               }
-               currentCount.count += 
input.getIntegerByField(StormBoltTokenizer.ATTRIBUTE_COUNT);
-
-               this.collector.emit(new Values(word, currentCount.count));
-       }
-
-       @Override
-       public void cleanup() {/* nothing to do */}
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-
-       /**
-        * A counter helper to emit immutable tuples to the given 
stormCollector and avoid unnecessary object
-        * creating/deletion.
-        */
-       private static final class Count {
-               public int count;
-
-               public Count() {/* nothing to do */}
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
deleted file mode 100644
index dfb3e37..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * Implements the string tokenizer that splits sentences into words as a Storm 
bolt. The bolt takes a line (input tuple
- * schema: {@code <String>}) and splits it into multiple pairs in the form of 
"(word,1)" (output tuple schema:
- * {@code <String,Integer>}).
- * <p>
- * Same as {@link StormBoltTokenizerByName}, but accesses input attribute by 
index (instead of name).
- */
-public final class StormBoltTokenizer implements IRichBolt {
-       private static final long serialVersionUID = -8589620297208175149L;
-
-       public static final String ATTRIBUTE_WORD = "word";
-       public static final String ATTRIBUTE_COUNT = "count";
-
-       public static final int ATTRIBUTE_WORD_INDEX = 0;
-       public static final int ATTRIBUTE_COUNT_INDEX = 1;
-
-       private OutputCollector collector;
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
-               this.collector = collector;
-       }
-
-       @Override
-       public void execute(final Tuple input) {
-               final String[] tokens = 
input.getString(0).toLowerCase().split("\\W+");
-
-               for (final String token : tokens) {
-                       if (token.length() > 0) {
-                               this.collector.emit(new Values(token, 1));
-                       }
-               }
-       }
-
-       @Override
-       public void cleanup() {/* nothing to do */}
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
deleted file mode 100644
index 8796b95..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormBoltTokenizerByName.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-/**
- * Implements the string tokenizer that splits sentences into words as a Storm 
bolt. The bolt takes a line (input tuple
- * schema: {@code <String>}) and splits it into multiple pairs in the form of 
"(word,1)" (output tuple schema:
- * {@code <String,Integer>}).
- * <p>
- * Same as {@link StormBoltTokenizer}, but accesses input attribute by name 
(instead of index).
- */
-public final class StormBoltTokenizerByName implements IRichBolt {
-       private static final long serialVersionUID = -8589620297208175149L;
-
-       public static final String ATTRIBUTE_WORD = "word";
-       public static final String ATTRIBUTE_COUNT = "count";
-
-       public static final int ATTRIBUTE_WORD_INDEX = 0;
-       public static final int ATTRIBUTE_COUNT_INDEX = 1;
-
-       private OutputCollector collector;
-
-       @SuppressWarnings("rawtypes")
-       @Override
-       public void prepare(final Map stormConf, final TopologyContext context, 
final OutputCollector collector) {
-               this.collector = collector;
-       }
-
-       @Override
-       public void execute(final Tuple input) {
-               final String[] tokens = 
input.getStringByField("sentence").toLowerCase().split("\\W+");
-
-               for (final String token : tokens) {
-                       if (token.length() > 0) {
-                               this.collector.emit(new Values(token, 1));
-                       }
-               }
-       }
-
-       @Override
-       public void cleanup() {/* nothing to do */}
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields(ATTRIBUTE_WORD, ATTRIBUTE_COUNT));
-       }
-
-       @Override
-       public Map<String, Object> getComponentConfiguration() {
-               return null;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
deleted file mode 100644
index e994760..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import org.apache.flink.stormcompatibility.util.StormFileSpout;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-/**
- * Implements a Storm Spout that reads data from a given local file.
- */
-public final class StormWordCountFileSpout extends StormFileSpout {
-       private static final long serialVersionUID = 2372251989250954503L;
-
-       public StormWordCountFileSpout(String path) {
-               super(path);
-       }
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields("sentence"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
 
b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
deleted file mode 100644
index 372f66f..0000000
--- 
a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wordcount.stormoperators;
-
-import org.apache.flink.examples.java.wordcount.util.WordCountData;
-import org.apache.flink.stormcompatibility.util.StormInMemorySpout;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-
-/**
- * Implements a Storm Spout that reads data from {@link WordCountData#WORDS}.
- */
-public final class StormWordCountInMemorySpout extends 
StormInMemorySpout<String> {
-       private static final long serialVersionUID = 8832143302409465843L;
-
-       public StormWordCountInMemorySpout() {
-               super(WordCountData.WORDS);
-       }
-
-       @Override
-       public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-               declarer.declare(new Fields("sentence"));
-       }
-}

Reply via email to