On 04/14/2016 11:52 AM, star jlong wrote:
> One question which dependency of flink are you using because I'm using 
> <dependency> <groupId>org.apache.flink</groupId> 
> <artifactId>flink-storm-examples_2.11</artifactId> 
> <version>1.0.0</version></dependency>
> And once I change the version to SNAPSHOT version, the pom.xml complains that 
> it could not satisfy the given dependency.
>     Le Jeudi 14 avril 2016 10h45, star jlong <> a 
> écrit :
>  Yes it is. 
>     Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <> a écrit :
>  For the fix, you need to use the current development version of Flink,
> ie, change your maven dependency from <version>1.0</version> to
> <version>1.1-SNAPSHOT</version>
> One question: what is FlinkGitService.class? It does only show up when
> you get the ClassLoader:
>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) }, 
>> FlinkGitService.class.getClassLoader());
> It is the class that contains methods deploy() and getFlinkTopology() ?
> -Matthias
> On 04/14/2016 05:20 AM, star jlong wrote:
>> What I'm  trying to say is that to get submit the flink topology to flink, I 
>> had to do an invocation of the mainMethod(which contain the actaul topology) 
>> of my topology with the class java.lang.reflect.Method.That is if you a take 
>> look at the following the topology the mainMethod is buildTopologypublic 
>> class WordCountTopology {
>>     public static void main(String[] args) throws Exception {
>>     Config conf = new Config();
>>     conf.setDebug(true);
>>     if (args != null && args.length > 0) {
>>         conf.setNumWorkers(1);
>>         conf.setMaxTaskParallelism(1);
>>         FlinkSubmitter.submitTopology(args[0], conf, buildTopology());
>>     }
>>     // Otherwise, we are running locally
>>     else {
>>         conf.setMaxTaskParallelism(1);
>>         FlinkLocalCluster cluster = new FlinkLocalCluster();
>>         cluster.submitTopology("word-count", conf, buildTopology());
>>         Thread.sleep(10000);
>>     }
>> }
>> public static FlinkTopology buildTopology() {
>>     TopologyBuilder builder = new TopologyBuilder();
>>     builder.setSpout("spout", new RandomSentenceSpout(), 1);
>>     builder.setBolt("split", new SplitSentence(), 
>> 1).shuffleGrouping("spout");
>>     builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new 
>> Fields("word"));
>>     builder.setBolt("writeIntoFile", new 
>> BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() {
>>         private static final long serialVersionUID = 1L;
>>         @Override
>>         public String format(Tuple tuple) {
>>             return tuple.toString();
>>         }
>>     }), 1).shuffleGrouping("count");
>>     return FlinkTopology.createTopology(builder);
>> }
>> }That is the method that I want to invoke from my jar so that I will be able 
>> to do the submitting of the topology without any problem ie
>> final FlinkClient cluster = 
>> FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, 
>> uploadedJarLocation, getFlinkTopogy(String.format("file://%s", 
>> jarPath),properties.getProperty("topologyMainClass"), 
>> properties.getProperty("methodName")));
>> Where getFlinkTopology() return the contains actually topology
>> But while doing that reflection I had an exception.
>> Another question please. How do I make used of the hotflix of Till. 
>>     Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <> a écrit :
>>   I cannot follow completely in your last step when you fail. What do you
>> mean by "I'm stuck at the level when I want to copy that from the jar to
>> submit it to flink"?
>> Btw: I copied the code from the SO question and it works for me on the
>> current master (which includes Till's hotfix).
>> -Matthias
>> On 04/13/2016 09:39 PM, star jlong wrote:
>>> Thanks Matthias for the reply. 
>>> Maybe I should explain what I want to do better.My objective is to deploy a 
>>> flink topology on flink using java but in the production mode. For that 
>>> here are the step that I have taken.
>>> 1-Convert a sample wordcount storm topology to a flink topology as 
>>> indicated here 
>>> the 
>>> topology on local mode (with my IDE eclipse) and on production mode by 
>>> assembling everything with a mvn clean install then submitting the jar to 
>>> flink on the command line with 
>>> ./bin/flink run -c stormWorldCount.WordCountTopology 
>>> /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar
>>>  myFlinkTopology
>>> At this level everything went well.
>>> Then I wanted to submit the same jar to flink on the production mode by 
>>> using a java program. Then I decided to create a mainMethod in my topology 
>>> that returns the flinkTopology which I wanted to submit to flink using the 
>>> FlinkClient. But I'm stuck at the level when I want to copy that from the 
>>> jar to submit it to flink.
>>> I know that is possible because I have used the same procedure with a storm 
>>> topology that it works perfectly well.
>>> What I'm missing please?
>>>   jstar
>>>     Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <> a 
>>> écrit :
>>>   Hi jstar,
>>> I need to have a close look. But I am wondering why you use reflection
>>> in the first place? Is there any specific reason for that?
>>> Furthermore, the example provided in project maven-example also covers
>>> the case to submit a topology to Flink via Java. Have a look at
>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter
>>> It contains a main() method and you can just run it as a regular Java
>>> program in your IDE.
>>> The SO question example should also work; it also contains a main()
>>> method, so you should be able to run it.
>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an
>>> ExecutuionEnvironment in you code. This happen automatically with
>>> FlinkClient/FlinkSubmitter.
>>> Furthermore, I would recommend to use FlinkSubmitter instead of
>>> FlinkClient as it is somewhat simpler to use.
>>> About SO question: I guess the problem is the jar assembling. The user says
>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install
>>> to obtain the jar."
>>> I guess this is not sufficient to bundle a correct jar. Have a look into
>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar
>>> correctly. (Regular maven artifact do not work for job submission...)
>>> Will have a close look and follow up... Hope this helps already.
>>> -Matthias
>>> On 04/13/2016 06:23 PM, star jlong wrote:
>>>> Thanks for the reply.
>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. 
>>>> Here is the try that I did RemoteEnvironment remote = new 
>>>> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute();
>>>> While running the program, this is the exception that I got.
>>>> java.lang.RuntimeException: No data sinks have been created yet. A program 
>>>> needs at least one sink that consumes data. Examples are writing the data 
>>>> set or printing it.
>>>>     Le Mercredi 13 avril 2016 16h54, Till Rohrmann <> 
>>>> a écrit :
>>>>   I think this is not the problem here since the problem is still happening
>>>> on the client side when the FlinkTopology tries to copy the registered
>>>> spouts. This happens before the job is submitted to the cluster. Maybe
>>>> Mathias could chime in here.
>>>> Cheers,
>>>> Till
>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <> wrote:
>>>>> Hi!
>>>>> For flink standalone programs, you would use a "RemoteEnvironment"
>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api".
>>>>> That one should deal with jars, classloaders, etc for you.
>>>>> Stephan
>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <>
>>>>> wrote:
>>>>>> Thanks for the suggestion. Sure those examples are interesting and I have
>>>>>> deploy them successfully on flink. The deployment is done the command
>>>>> line
>>>>>> that is doing something like
>>>>>> bin/flink run example.jarBut what I want is to submit the topology to
>>>>>> flink using a java program.
>>>>>> Thanks.
>>>>>>     Le Mercredi 13 avril 2016 14h12, Chesnay Schepler <
>>>>>> a écrit :
>>>>>>   you can find examples here:
>>>>>> we haven't established yet that it is an API issue; it could very well
>>>>>> be caused by the reflection magic you're using...
>>>>>> On 13.04.2016 14:57, star jlong wrote:
>>>>>>> Ok, it seems like there an issue with the api. So please does anybody
>>>>>> has a working example for deploying a topology using the flink dependency
>>>>>> flink-storm_2.11 or any other will be welcoming.
>>>>>>> Thanks,
>>>>>>> jstar
>>>>>>>       Le Mercredi 13 avril 2016 13h44, star jlong
>>>>>> <> a écrit :
>>>>>>>   Hi Schepler,
>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as
>>>>>> indicated on that post because I'm the one that posted that issue.
>>>>>>>       Le Mercredi 13 avril 2016 13h35, Chesnay Schepler <
>>>>>>> a écrit :
>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote:
>>>>>>>> Hi jstar,
>>>>>>>> what's exactly the problem you're observing?
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong
>>>>> <
>>>>>>>> wrote:
>>>>>>>>> Hi there,
>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much
>>>>>> interested
>>>>>>>>> in submitting a topoloy  to flink using its api. As indicated
>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck
>>>>>> with
>>>>>>>>> some exception. Please any help will be welcoming.
>>>>>>>>> Thanks.
>>>>>>>>> jstar

