Hi Everyone,

I had an epiphany while reading through the Flink 0.6 API documentation and 
decided to try a new method for my iterative algorithm, but it just results in 
a weirder error. I've also included the error I was getting for the suggestion 
that was posted earlier.


I'm sorry for not being able to provide full source code. If it is any help all 
of my functions now produce Tuple2<String, String>(); Where the initial dataset 
is also Tuple2<String,String>. The goal is to write out the union of the 
results from all iterations where intersection of the set of keys for iteration 
i and iteration i - 1 is the empty set.


        DeltaIteration<Tuple2<String, String>, Tuple2<String, String>> 
iteration = transactions.
                iterateDelta(initial, maxIterations, 0);


        DataSet<Tuple2<String, String>> ... = ....
                flatMap(new ...()).withBroadcastSet(iteration.getWorkset(), 
"..."). // <-- Referencing the working set
                groupBy(0).
                reduceGroup(new ...()).
                withParameters(intValue).
                
join(iteration.getSolutionSet()).where(0).equalTo(0).getInput1(); // <-- 
Referencing the solution set
//             projectFirst(1).projectSecond(1).types(String.class, 
String.class);


Raises Exception. If I change it to get input2(), I get the same error, but for 
the working set which is referenced through the broadcast.


Exception in thread "main" org.apache.flink.compiler.CompilerException: Error: 
The step function does not reference the solution set.
    at 
org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:856)
    at 
org.apache.flink.compiler.PactCompiler$GraphCreatingVisitor.postVisit(PactCompiler.java:616)
    at 
org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:283)
    at 
org.apache.flink.api.common.operators.base.GenericDataSinkBase.accept(GenericDataSinkBase.java:289)


If I remove the getInput1() call and uncomment that last line it yields. I was 
concerned that I was accidentally writing out a null value

somewhere but I can't find out.


Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.lang.NullPointerException
    at 
org.apache.flink.api.java.operators.JoinOperator$ProjectFlatJoinFunction.join(JoinOperator.java:935)
    at 
org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:143)
    at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
    at 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
    at 
org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
    at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
    at 
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
    at java.lang.Thread.run(Thread.java:744)


After more investigation it appears that the null pointer exists somewhere 
between the the reduceGroup operator and next mapOperator as the next 
mapOperator does not run after the reduceGroup.



Thanks,

Jack

Reply via email to