[ 
https://issues.apache.org/jira/browse/FLINK-4823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15571746#comment-15571746
 ] 

Sajeev Ramakrishnan commented on FLINK-4823:
--------------------------------------------

                DataSet<Tuple2<String, VertexDTO>> vertices1 = 
group.flatMap(new MapToVertex());
                DataSet<Tuple2<String, VertexDTO>> vertices2 = user.flatMap(new 
MapToVertex());
                DataSet<Tuple2<String, VertexDTO>> vertices = 
vertices1.union(vertices2);
                
                
                DataSet<Tuple3<String,String,String>> edges = 
member.flatMap(new MapToEdge());
                
                
                Graph<String, VertexDTO, String> graph = 
Graph.fromTupleDataSet(vertices,edges, env);

                System.out.println("Vertices 
"+graph.runVertexCentricIteration(new ComputeTraversal(expFile), null, 
iterations).numberOfVertices());
                

        static final class ComputeTraversal extends ComputeFunction<String, 
VertexDTO, String, String> {

                private static final long serialVersionUID = 
4883900514255635602L;
                String fileExpd;
                FileWriter fw;
                String DD="|";
                public ComputeTraversal(String file) { fileExpd = file; }
                public void preSuperstep()  throws Exception { fw = new 
FileWriter(fileExpd,true); System.out.println("iteration 
"+getSuperstepNumber());}
                public void postSuperstep() throws Exception { fw.close(); }
                
                
                public void compute(Vertex<String, VertexDTO> v, 
MessageIterator<String> msgIter) throws IOException {
         
                        List<String> targets = new ArrayList<String>();

                        if (getSuperstepNumber() == 1){

                                if (v.f1.visit && v.f1.type.equals("GROUP")){
                                        
                                        if(v.f1.childList!=null && 
!v.f1.childList.isEmpty()){
                                                for(String 
child:v.f1.childList){
                                                        targets.add(child);
                                                }
                                        }
                                        else
                                        for (Edge<String, String> e : 
getEdges())
                                                targets.add(e.getTarget());

                                        sendMessageToMembers(v, targets, null);
                                }
                }
                else {

                        if(v.f1.childList!=null && !v.f1.childList.isEmpty()){
                                        for(String child:v.f1.childList){
                                                targets.add(child);
                                        }
                                }
                                else
                                for (Edge<String, String> e : getEdges())
                                        targets.add(e.getTarget());
                        
                    for (String msgIn : msgIter) {
                        if (!msgIn.contains(v.getId()+"\t")) { // prevent 
cyclic graph
                                String msgOut = 
sendMessageToMembers(v,targets,msgIn);
                                
                        }
                    }
                        }
          
                }
                
                private String sendMessageToMembers(Vertex<String,VertexDTO> v, 
List<String> targets, String msgIn) {
                        String msgOut = (msgIn==null ? "" : msgIn+">") + 
v.getId()+"\t"+v.getValue().domain+"/"+v.getValue().name;
                        for (String t : targets) sendMessageTo(t,msgOut);
                        return msgOut;
                }


        }

        
        static final class MapToEdge implements FlatMapFunction<InputT, 
Tuple3<String, String,String>> {
                /**
                 * 
                 */
                private static final long serialVersionUID = 
1930826776772888046L;

                @Override
                public void flatMap(InputT mem,Collector<Tuple3<String, 
String,String>> out) throws Exception {
                        
                        Tuple3<String,String,String> e = new 
Tuple3<String,String,String>(mem.gKey+"",mem.mKey+"",mem.rKey+"");
                        out.collect(e);
                }
        }

        // map combinedUserGroup data set to VertexDTO object
        static final class MapToVertex implements FlatMapFunction<InputT, 
Tuple2<String, VertexDTO>> {
                private static final long serialVersionUID = 
170886364461479813L;

                        @Override
                public void flatMap(InputT vertex, Collector<Tuple2<String, 
VertexDTO>> out) throws Exception {
                        VertexDTO VertexDTO = null;
                        
                        //vertex.key is long. So making it as string
                        VertexDTO = new 
VertexDTO(vertex.key+"",vertex.domain,vertex.accountName,vertex.memberType,vertex.childList,true);
                        out.collect(new Tuple2<String, VertexDTO>(VertexDTO.id, 
VertexDTO));
                        
                }
        }
        
        static final class VertexDTO implements Serializable {
                
                private static final long serialVersionUID = 
-7095619486072713817L;
                public String id     = "";
                public String domain = "";
                public String name   = "";
                public String type   = "";  
                public boolean visit;
                Set<String> childList;
                public VertexDTO() {}
                public VertexDTO(String id, String domain, String account, 
String type,Set<String> childList,boolean visit) {
                        this.id      = id;
                        this.domain  = domain;
                        this.name    = account;
                        this.type    = type;
                        this.childList = childList;
                        this.visit = visit;
                }
                public boolean equals(Object obj) {
                        return id.equals(((VertexDTO)obj).id);
                }
                
        }


> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
> hold a value
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-4823
>                 URL: https://issues.apache.org/jira/browse/FLINK-4823
>             Project: Flink
>          Issue Type: Bug
>          Components: Gelly
>    Affects Versions: 1.1.0
>         Environment: RHEL 6.6
>            Reporter: Sajeev Ramakrishnan
>            Priority: Blocker
>
> Team,
>   We are getting NULL pointer exception while doing the vertex centric graph 
> traversal.
> org.apache.flink.types.NullFieldException: Field 0 is null, but expected to 
> hold a value.
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:126)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>         at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>         at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
>         at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
>         at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>         at 
> org.apache.flink.api.java.operators.JoinOperator$DefaultJoin$WrappingFlatJoinFunction.join(JoinOperator.java:572)
>         at 
> org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver.run(JoinWithSolutionSetFirstDriver.java:196)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>         at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
>         at 
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:122)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>         at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>         ... 13 more
> All the parameters that I am passing for the vertex and edges are not null. 
> Not able to find out the root cause.
> Thanks & Regards,
> Sajeev Ramakrishnan



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to