What is the compile error? Or could you already resolve it? there should not be a difference from 1.0 to 1.1-SNAPSHOT for both classes...
And yes, prefix _2.11 is only available for released but not for current development branch. But it is the same code (no need to worry about it). -Matthias On 04/14/2016 07:38 PM, star jlong wrote: > Hi Matthias, > > I change the version as per your requirement but when I do that I have a > compilation error at the level of the classes > org.apache.flink.storm.util.BoltFileSink and > org.apache.flink.storm.util.OutputFormatter > > Btw, the dependency > <dependency> <groupId>org.apache.flink</groupId> > <artifactId>flink-storm-examples_2.11</artifactId> > <version>1.1-SNAPSHOT</version> </dependency> > > Could not be satisfy, but this one could be > <dependency> <groupId>org.apache.flink</groupId> > <artifactId>flink-storm_2.10</artifactId> <version>1.1-SNAPSHOT</version> > </dependency> > > Le Jeudi 14 avril 2016 13h05, Matthias J. Sax <mj...@apache.org> a écrit : > > > change the version to 1.1-SNAPSHOT > > On 04/14/2016 11:52 AM, star jlong wrote: >> One question which dependency of flink are you using because I'm using >> <dependency> <groupId>org.apache.flink</groupId> >> <artifactId>flink-storm-examples_2.11</artifactId> >> <version>1.0.0</version></dependency> >> And once I change the version to SNAPSHOT version, the pom.xml complains >> that it could not satisfy the given dependency. >> >> Le Jeudi 14 avril 2016 10h45, star jlong <jlongs...@yahoo.fr.INVALID> a >> écrit : >> >> >> Yes it is. >> >> Le Jeudi 14 avril 2016 10h39, Matthias J. Sax <mj...@apache.org> a écrit >> : >> >> >> For the fix, you need to use the current development version of Flink, >> ie, change your maven dependency from <version>1.0</version> to >> <version>1.1-SNAPSHOT</version> >> >> One question: what is FlinkGitService.class? It does only show up when >> you get the ClassLoader: >> >>> ClassLoader loader = URLClassLoader.newInstance(new URL[] { new URL(path) >>> }, FlinkGitService.class.getClassLoader()); >> >> It is the class that contains methods deploy() and getFlinkTopology() ? >> >> -Matthias >> >> On 04/14/2016 05:20 AM, star jlong wrote: >>> What I'm trying to say is that to get submit the flink topology to flink, >>> I had to do an invocation of the mainMethod(which contain the actaul >>> topology) of my topology with the class java.lang.reflect.Method.That is if >>> you a take look at the following the topology the mainMethod is >>> buildTopologypublic class WordCountTopology { >>> public static void main(String[] args) throws Exception { >>> >>> Config conf = new Config(); >>> conf.setDebug(true); >>> if (args != null && args.length > 0) { >>> >>> conf.setNumWorkers(1); >>> conf.setMaxTaskParallelism(1); >>> FlinkSubmitter.submitTopology(args[0], conf, buildTopology()); >>> >>> } >>> // Otherwise, we are running locally >>> else { >>> conf.setMaxTaskParallelism(1); >>> FlinkLocalCluster cluster = new FlinkLocalCluster(); >>> cluster.submitTopology("word-count", conf, buildTopology()); >>> Thread.sleep(10000); >>> } >>> } >>> >>> public static FlinkTopology buildTopology() { >>> >>> TopologyBuilder builder = new TopologyBuilder(); >>> >>> builder.setSpout("spout", new RandomSentenceSpout(), 1); >>> builder.setBolt("split", new SplitSentence(), >>> 1).shuffleGrouping("spout"); >>> builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", >>> new Fields("word")); >>> >>> builder.setBolt("writeIntoFile", new >>> BoltFileSink("/home/username/wordcount.txt", new OutputFormatter() { >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public String format(Tuple tuple) { >>> return tuple.toString(); >>> } >>> }), 1).shuffleGrouping("count"); >>> >>> return FlinkTopology.createTopology(builder); >>> >>> } >>> }That is the method that I want to invoke from my jar so that I will be >>> able to do the submitting of the topology without any problem ie >>> >>> final FlinkClient cluster = >>> FlinkClient.getConfiguredClient(conf);cluster.submitTopology(topologyId, >>> uploadedJarLocation, getFlinkTopogy(String.format("file://%s", >>> jarPath),properties.getProperty("topologyMainClass"), >>> properties.getProperty("methodName"))); >>> Where getFlinkTopology() return the contains actually topology >>> >>> But while doing that reflection I had an exception. >>> >>> Another question please. How do I make used of the hotflix of Till. >>> >>> Le Jeudi 14 avril 2016 0h19, Matthias J. Sax <mj...@apache.org> a écrit >>> : >>> >>> >>> I cannot follow completely in your last step when you fail. What do you >>> mean by "I'm stuck at the level when I want to copy that from the jar to >>> submit it to flink"? >>> >>> Btw: I copied the code from the SO question and it works for me on the >>> current master (which includes Till's hotfix). >>> >>> -Matthias >>> >>> >>> On 04/13/2016 09:39 PM, star jlong wrote: >>>> Thanks Matthias for the reply. >>>> Maybe I should explain what I want to do better.My objective is to deploy >>>> a flink topology on flink using java but in the production mode. For that >>>> here are the step that I have taken. >>>> 1-Convert a sample wordcount storm topology to a flink topology as >>>> indicated here >>>> https://flink.apache.org/news/2015/12/11/storm-compatibility.html2-Run the >>>> topology on local mode (with my IDE eclipse) and on production mode by >>>> assembling everything with a mvn clean install then submitting the jar to >>>> flink on the command line with >>>> ./bin/flink run -c stormWorldCount.WordCountTopology >>>> /home/raymond/testFlink/target/storm_example-0.0.1-SNAPSHOT-jar-with-dependencies.jar >>>> myFlinkTopology >>>> At this level everything went well. >>>> >>>> Then I wanted to submit the same jar to flink on the production mode by >>>> using a java program. Then I decided to create a mainMethod in my topology >>>> that returns the flinkTopology which I wanted to submit to flink using the >>>> FlinkClient. But I'm stuck at the level when I want to copy that from the >>>> jar to submit it to flink. >>>> >>>> I know that is possible because I have used the same procedure with a >>>> storm topology that it works perfectly well. >>>> What I'm missing please? >>>> jstar >>>> >>>> Le Mercredi 13 avril 2016 19h23, Matthias J. Sax <mj...@apache.org> a >>>> écrit : >>>> >>>> >>>> Hi jstar, >>>> >>>> I need to have a close look. But I am wondering why you use reflection >>>> in the first place? Is there any specific reason for that? >>>> >>>> Furthermore, the example provided in project maven-example also covers >>>> the case to submit a topology to Flink via Java. Have a look at >>>> org.apache.flink.storm.wordcount.WordCountRemoteBySubmitter >>>> >>>> It contains a main() method and you can just run it as a regular Java >>>> program in your IDE. >>>> >>>> The SO question example should also work; it also contains a main() >>>> method, so you should be able to run it. >>>> >>>> Btw: If you use Storm-Compatiblitly-API there is no reason the get an >>>> ExecutuionEnvironment in you code. This happen automatically with >>>> FlinkClient/FlinkSubmitter. >>>> >>>> Furthermore, I would recommend to use FlinkSubmitter instead of >>>> FlinkClient as it is somewhat simpler to use. >>>> >>>> About SO question: I guess the problem is the jar assembling. The user says >>>> >>>> "Since I'using maven to handle my dependencies, I do a Mvn clean install >>>> to obtain the jar." >>>> >>>> I guess this is not sufficient to bundle a correct jar. Have a look into >>>> pom.xml from storm-examples. It uses maven plug-ins in assemble the jar >>>> correctly. (Regular maven artifact do not work for job submission...) >>>> >>>> Will have a close look and follow up... Hope this helps already. >>>> >>>> -Matthias >>>> >>>> On 04/13/2016 06:23 PM, star jlong wrote: >>>>> Thanks for the reply. >>>>> @Stephen, I try using RemoteEnvironment to submit my topology to flink. >>>>> Here is the try that I did RemoteEnvironment remote = new >>>>> RemoteEnvironment(ipJobManager, 6123, jarPath); remote.execute(); >>>>> While running the program, this is the exception that I got. >>>>> java.lang.RuntimeException: No data sinks have been created yet. A >>>>> program needs at least one sink that consumes data. Examples are writing >>>>> the data set or printing it. >>>>> >>>>> >>>>> Le Mercredi 13 avril 2016 16h54, Till Rohrmann <trohrm...@apache.org> >>>>> a écrit : >>>>> >>>>> >>>>> I think this is not the problem here since the problem is still >>>>> happening >>>>> on the client side when the FlinkTopology tries to copy the registered >>>>> spouts. This happens before the job is submitted to the cluster. Maybe >>>>> Mathias could chime in here. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Apr 13, 2016 at 5:39 PM, Stephan Ewen <se...@apache.org> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> For flink standalone programs, you would use a "RemoteEnvironment" >>>>>> >>>>>> For Storm, I would use the "FlinkClient" in "org.apache.flink.storm.api". >>>>>> That one should deal with jars, classloaders, etc for you. >>>>>> >>>>>> Stephan >>>>>> >>>>>> >>>>>> On Wed, Apr 13, 2016 at 3:43 PM, star jlong <jlongs...@yahoo.fr.invalid> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the suggestion. Sure those examples are interesting and I >>>>>>> have >>>>>>> deploy them successfully on flink. The deployment is done the command >>>>>> line >>>>>>> that is doing something like >>>>>>> bin/flink run example.jarBut what I want is to submit the topology to >>>>>>> flink using a java program. >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> Le Mercredi 13 avril 2016 14h12, Chesnay Schepler < >>>>>> ches...@apache.org> >>>>>>> a écrit : >>>>>>> >>>>>>> >>>>>>> you can find examples here: >>>>>>> >>>>>>> >>>>>> https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-examples >>>>>>> >>>>>>> we haven't established yet that it is an API issue; it could very well >>>>>>> be caused by the reflection magic you're using... >>>>>>> >>>>>>> On 13.04.2016 14:57, star jlong wrote: >>>>>>>> Ok, it seems like there an issue with the api. So please does anybody >>>>>>> has a working example for deploying a topology using the flink >>>>>>> dependency >>>>>>> flink-storm_2.11 or any other will be welcoming. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> jstar >>>>>>>> >>>>>>>> Le Mercredi 13 avril 2016 13h44, star jlong >>>>>>> <jlongs...@yahoo.fr.INVALID> a écrit : >>>>>>>> >>>>>>>> >>>>>>>> Hi Schepler, >>>>>>>> >>>>>>>> Thanks for the concerned. Yes I'm actaully having the same issue as >>>>>>> indicated on that post because I'm the one that posted that issue. >>>>>>>> >>>>>>>> Le Mercredi 13 avril 2016 13h35, Chesnay Schepler < >>>>>>> ches...@apache.org> a écrit : >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> http://stackoverflow.com/questions/36584784/issues-while-submitting-a-topology-to-apache-flink-using-the-flink-api >>>>>>>> >>>>>>>> On 13.04.2016 14:28, Till Rohrmann wrote: >>>>>>>>> Hi jstar, >>>>>>>>> >>>>>>>>> what's exactly the problem you're observing? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Wed, Apr 13, 2016 at 2:23 PM, star jlong >>>>>> <jlongs...@yahoo.fr.invalid >>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi there, >>>>>>>>>> >>>>>>>>>> I'm jstar. I have been playing around with flink. I'm very much >>>>>>> interested >>>>>>>>>> in submitting a topoloy to flink using its api. As indicated >>>>>>>>>> on stackoverflow, that is the try that I have given. But I was stuck >>>>>>> with >>>>>>>>>> some exception. Please any help will be welcoming. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> jstar >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>> >>>> >>>> >>>> >>> >>> >>> >>> >> >> >> >> >> >> > > > >
signature.asc
Description: OpenPGP digital signature