[ 
https://issues.apache.org/jira/browse/FLINK-2837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15027083#comment-15027083
 ] 

ASF GitHub Bot commented on FLINK-2837:
---------------------------------------

Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1398#discussion_r45889142
  
    --- Diff: 
flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 ---
    @@ -15,75 +16,474 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
     package org.apache.flink.storm.api;
     
    +import backtype.storm.generated.ComponentCommon;
    +import backtype.storm.generated.GlobalStreamId;
    +import backtype.storm.generated.Grouping;
     import backtype.storm.generated.StormTopology;
    +import backtype.storm.topology.IRichBolt;
    +import backtype.storm.topology.IRichSpout;
    +import backtype.storm.topology.IRichStateSpout;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.base.Preconditions;
     import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.tuple.Tuple;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.storm.util.SplitStreamMapper;
    +import org.apache.flink.storm.util.SplitStreamType;
    +import org.apache.flink.storm.util.StormStreamSelector;
    +import org.apache.flink.storm.wrappers.BoltWrapper;
    +import org.apache.flink.storm.wrappers.BoltWrapperTwoInput;
    +import org.apache.flink.storm.wrappers.SpoutWrapper;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.datastream.DataStreamSource;
    +import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    +import org.apache.flink.streaming.api.datastream.SplitStream;
     import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Set;
     
     /**
    - * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented 
in terms of a {@link
    - * StreamExecutionEnvironment} . In contrast to a regular {@link 
StreamExecutionEnvironment}, a {@link FlinkTopology}
    - * cannot be executed directly, but must be handed over to a {@link 
FlinkLocalCluster}, {@link FlinkSubmitter}, or
    - * {@link FlinkClient}.
    + * {@link FlinkTopology} translates a {@link TopologyBuilder} to a Flink 
program.
    + * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not 
supported.</strong>
      */
    -public class FlinkTopology extends StreamExecutionEnvironment {
    +public class FlinkTopology {
    +
    +   /** All declared streams and output schemas by operator ID */
    +   private final HashMap<String, HashMap<String, Fields>> outputStreams = 
new HashMap<String, HashMap<String, Fields>>();
    +   /** All spouts&bolts declarers by their ID */
    +   private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = 
new HashMap<String, FlinkOutputFieldsDeclarer>();
    +
    +   private final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> 
unprocessdInputsPerBolt =
    +                   new HashMap<String, Set<Entry<GlobalStreamId, 
Grouping>>>();
    +
    +   final HashMap<String, HashMap<String, DataStream<Tuple>>> 
availableInputs = new HashMap<>();
     
    -   /** The number of declared tasks for the whole program (ie, sum over 
all dops) */
    -   private int numberOfTasks = 0;
    +   private final TopologyBuilder builder;
     
    -   public FlinkTopology() {
    -           // Set default parallelism to 1, to mirror Storm default 
behavior
    -           super.setParallelism(1);
    +   // needs to be a class member for internal testing purpose
    +   private final StormTopology stormTopology;
    +
    +   private final Map<String, IRichSpout> spouts;
    +   private final Map<String, IRichBolt> bolts;
    +
    +   private final StreamExecutionEnvironment env;
    +
    +   private FlinkTopology(TopologyBuilder builder) {
    +           this.builder = builder;
    +           this.stormTopology = builder.createTopology();
    +           // extract the spouts and bolts
    +           this.spouts = getPrivateField("_spouts");
    +           this.bolts = getPrivateField("_bolts");
    +
    +           this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           // Kick off the translation immediately
    +           translateTopology();
        }
     
        /**
    -    * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter}, or {@link
    -    * FlinkClient}.
         *
    -    * @throws UnsupportedOperationException
    -    *              at every invocation
    +    * Creates a Flink program that uses the specified spouts and bolts.
    +    * @param stormBuilder The storm topology builder to use for creating 
the Flink topology.
    +    * @return A Flink Topology which may be executed.
         */
    -   @Override
    -   public JobExecutionResult execute() throws Exception {
    -           throw new UnsupportedOperationException(
    -                           "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
    -                           "instead.");
    +   public static FlinkTopology createTopology(TopologyBuilder 
stormBuilder) {
    +           return new FlinkTopology(stormBuilder);
        }
     
        /**
    -    * Is not supported. In order to execute use {@link FlinkLocalCluster}, 
{@link FlinkSubmitter} or {@link
    -    * FlinkClient}.
    -    *
    -    * @throws UnsupportedOperationException
    -    *              at every invocation
    +    * Returns the underlying Flink ExecutionEnvironment for the Storm 
topology.
    +    * @return The contextual environment.
         */
    -   @Override
    -   public JobExecutionResult execute(final String jobName) throws 
Exception {
    -           throw new UnsupportedOperationException(
    -                           "A FlinkTopology cannot be executed directly. 
Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
    -                           "instead.");
    +   public StreamExecutionEnvironment getExecutionEnvironment() {
    +           return this.env;
        }
     
        /**
    -    * Increased the number of declared tasks of this program by the given 
value.
    -    *
    -    * @param dop
    -    *              The dop of a new operator that increases the number of 
overall tasks.
    +    * Directly executes the Storm topology based on the current context 
(local when in IDE and
    +    * remote when executed thorugh ./bin/flink).
    +    * @return The execution result
    +    * @throws Exception
         */
    --- End diff --
    
    Can we describe the exception in a meaningful way? Or are there too many 
reasons to get listed here? What does JavaDoc of `env.execute()` say?


> FlinkTopologyBuilder cannot handle multiple input streams
> ---------------------------------------------------------
>
>                 Key: FLINK-2837
>                 URL: https://issues.apache.org/jira/browse/FLINK-2837
>             Project: Flink
>          Issue Type: Bug
>          Components: Storm Compatibility
>            Reporter: Matthias J. Sax
>            Assignee: Maximilian Michels
>
> FlinkTopologyBuilder cannot handle multiple input streams correctly. Instead 
> of union the incoming streams, it replicates the consuming bolt and each 
> (logical) instance processes one of the input streams.
> For example:
> {noformat}
> final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
> builder.setSpout(spoutId1, new FiniteRandomSpout(0, 10));
> builder.setSpout(spoutId2, new FiniteRandomSpout(1, 8));
> builder.setSpout(spoutId3, new FiniteRandomSpout(2, 13));
> builder.setBolt(boltId, new MergerBolt())
>       .shuffleGrouping(spoutId1)
>       .shuffleGrouping(spoutId2)
>       .shuffleGrouping(spoutId3);
> builder.setBolt("sink", new BoltPrintSink(new SimpleOutputFormatter()))
>       .shuffleGrouping(boltId);
> {noformat}
> will only print the data from a single source instead of all sources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to