Hi Jack! You are doing something very interesting there. I am not sure I am getting everything, but are some issues I can see...
- In the line where you join with the solution set: "join(iteration.getSolutionSet()).where(0).equalTo(0).getInput1(); // <-- Referencing the solution set" What happens is that after constructing the join, you grab that join's first input data set. This means, that you actually ignore the join and simply use its "input1", in that case the result of the reduceGroup operation. That explains why the program does not depend on the solution set. If you change the code to "getInput2()" then you construct teh join and take only its second input (the solution set). The other inputs are the "dangling", meaning they are not really consumed and thus regarded unused. - The second error you get is a bug in Flink. This projection in the join cannot handle null inputs, which may occur when joining with the solution set. Let us fix that! Greetings, Stephan On Wed, Aug 27, 2014 at 11:10 AM, Jack David Galilee < [email protected]> wrote: > Hi Everyone, > > > I had an epiphany while reading through the Flink 0.6 API documentation > and decided to try a new method for my iterative algorithm, but it just > results in a weirder error. I've also included the error I was getting for > the suggestion that was posted earlier. > > > I'm sorry for not being able to provide full source code. If it is any > help all of my functions now produce Tuple2<String, String>(); Where the > initial dataset is also Tuple2<String,String>. The goal is to write out the > union of the results from all iterations where intersection of the set of > keys for iteration i and iteration i - 1 is the empty set. > > > DeltaIteration<Tuple2<String, String>, Tuple2<String, String>> > iteration = transactions. > iterateDelta(initial, maxIterations, 0); > > > DataSet<Tuple2<String, String>> ... = .... > flatMap(new > ...()).withBroadcastSet(iteration.getWorkset(), "..."). // <-- Referencing > the working set > groupBy(0). > reduceGroup(new ...()). > withParameters(intValue). > > join(iteration.getSolutionSet()).where(0).equalTo(0).getInput1(); // <-- > Referencing the solution set > // projectFirst(1).projectSecond(1).types(String.class, > String.class); > > > Raises Exception. If I change it to get input2(), I get the same error, > but for the working set which is referenced through the broadcast. > > > Exception in thread "main" org.apache.flink.compiler.CompilerException: > Error: The step function does not reference the solution set. > at > org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:856) > at > org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:616) > at > org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283) > at > org.apache.flink.api.common.operators.base.GenericDataSinkBase.accept(GenericDataSinkBase.java:289) > > > If I remove the getInput1() call and uncomment that last line it yields. I > was concerned that I was accidentally writing out a null value > > somewhere but I can't find out. > > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: > java.lang.NullPointerException > at > org.apache.flink.api.java.operators.JoinOperator$ProjectFlatJoinFunction.join(JoinOperator.java:935) > at > org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:143) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510) > at > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137) > at > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375) > at > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265) > at java.lang.Thread.run(Thread.java:744) > > > After more investigation it appears that the null pointer exists somewhere > between the the reduceGroup operator and next mapOperator as the next > mapOperator does not run after the reduceGroup. > > > > Thanks, > > Jack >
