imbajin commented on code in PR #1984:
URL: 
https://github.com/apache/incubator-hugegraph/pull/1984#discussion_r1010508058


##########
hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java:
##########
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
+import com.baidu.hugegraph.exception.ExistedException;
+import com.baidu.hugegraph.iterator.ListIterator;
+import com.baidu.hugegraph.job.UserJob;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser;
+import com.baidu.hugegraph.job.algorithm.Consumers;
+import com.baidu.hugegraph.schema.SchemaLabel;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.structure.HugeVertex;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.baidu.hugegraph.util.Log;
+import com.baidu.hugegraph.util.StringEncoding;
+import com.google.common.collect.ImmutableMap;
+
+public class LouvainTraverser extends AlgoTraverser {
+
+    public static final String C_PASS = "c_pass-";
+    public static final String C_KIN = "c_kin";
+    public static final String C_WEIGHT = "c_weight";
+    public static final String C_MEMBERS = "c_members";
+
+    private static final long LIMIT = AbstractAlgorithm.MAX_QUERY_LIMIT;
+    private static final int MAX_COMM_SIZE = 100000; // 10w
+
+    private static final Logger LOG = Log.logger(LouvainTraverser.class);
+
+    private final GraphTraversalSource g;
+    private final String sourceLabel;
+    private final String sourceCLabel;
+    private final long degree;
+    private final boolean skipIsolated;
+
+    private final Cache cache;
+
+    private long m;
+    private String passLabel;
+
+    public LouvainTraverser(UserJob<Object> job, int workers, long degree,
+                            String sourceLabel, String sourceCLabel,
+                            boolean skipIsolated) {
+        super(job, LouvainAlgorithm.ALGO_NAME, workers);
+        this.g = this.graph().traversal();
+        this.sourceLabel = sourceLabel;
+        this.sourceCLabel = sourceCLabel;
+        this.degree = degree;
+        this.skipIsolated = skipIsolated;
+        this.m = 1L;
+        this.passLabel = "";
+
+        this.cache = new Cache();
+    }
+
+    private void defineSchemaOfPk() {
+        String label = this.labelOfPassN(0);
+        if (this.graph().existsVertexLabel(label) ||
+            this.graph().existsEdgeLabel(label)) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding");
+        }
+
+        SchemaManager schema = this.graph().schema();
+        schema.propertyKey(C_KIN).asInt()
+              .ifNotExist().create();
+        schema.propertyKey(C_MEMBERS).valueSet().asText()
+              .ifNotExist().create();
+        schema.propertyKey(C_WEIGHT).asFloat()
+              .ifNotExist().create();
+
+        this.m = this.g.E().count().next();
+    }
+
+    private void defineSchemaOfPassN(int pass) {
+        this.passLabel = labelOfPassN(pass);
+
+        SchemaManager schema = this.graph().schema();
+        try {
+            schema.vertexLabel(this.passLabel).useCustomizeStringId()
+                  .properties(C_KIN, C_MEMBERS, C_WEIGHT)
+                  .create();
+            schema.edgeLabel(this.passLabel)
+                  .sourceLabel(this.passLabel)
+                  .targetLabel(this.passLabel)
+                  .properties(C_WEIGHT)
+                  .create();
+        } catch (ExistedException e) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding", e);
+        }
+    }
+
+    private List<String> cpassEdgeLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getEdgeLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private List<String> cpassVertexLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getVertexLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private String labelOfPassN(int n) {
+        return C_PASS + n;
+    }
+
+    private float weightOfEdge(Edge e) {
+        if (e.label().startsWith(C_PASS)) {
+            assert e.property(C_WEIGHT).isPresent();
+            return e.value(C_WEIGHT);
+        } else if (e.property(C_WEIGHT).isPresent()) {
+            return e.value(C_WEIGHT);
+        }
+        return 1f;
+    }
+
+    private float weightOfEdges(List<Edge> edges) {
+        float weight = 0f;
+        for (Edge edge : edges) {
+            weight += weightOfEdge(edge);
+        }
+        return weight;
+    }
+
+    private Vertex newCommunityNode(Id cid, float cweight,
+                                    int kin, List<String> members) {
+        assert !members.isEmpty() : members;
+        /*
+         * cweight: members size(all pass) of the community, just for debug
+         * kin: edges weight in the community
+         * members: members id of the community of last pass
+         */
+        return this.graph().addVertex(T.label, this.passLabel,
+                                      T.id, cid, C_WEIGHT, cweight,
+                                      C_KIN, kin, C_MEMBERS, members);
+    }
+
+    private Vertex makeCommunityNode(Id cid) {
+        VertexLabel vl = this.graph().vertexLabel(this.passLabel);
+        return new HugeVertex(this.graph(), cid, vl);
+    }
+
+    private Edge newCommunityEdge(Vertex source, Vertex target, float weight) {
+        return source.addEdge(this.passLabel, target, C_WEIGHT, weight);
+    }
+
+    private void insertNewCommunity(int pass, Id cid, float cweight,
+                                    int kin, List<String> members,
+                                    Map<Id, MutableFloat> cedges) {
+        // create backend vertex if it's the first time
+        Id vid = this.cache.genId(pass, cid);
+        Vertex node = this.newCommunityNode(vid, cweight, kin, members);
+        commitIfNeeded();
+        // update backend vertex edges
+        for (Map.Entry<Id, MutableFloat> e : cedges.entrySet()) {
+            float weight = e.getValue().floatValue();
+            vid = this.cache.genId(pass, e.getKey());
+            Vertex targetV = this.makeCommunityNode(vid);
+            this.newCommunityEdge(node, targetV, weight);
+            commitIfNeeded();
+        }
+        LOG.debug("Add new comm: {} kin={} size={}", node, kin, 
members.size());
+    }
+
+    private boolean needSkipVertex(int pass, Vertex v) {
+        // skip the old intermediate data when first pass
+        String label = v.label();
+        if (label.startsWith(C_PASS)) {
+            if (pass == 0) {
+                return true;
+            }
+            String lastPassLabel = labelOfPassN(pass - 1);
+            if (!label.equals(lastPassLabel)) {
+                return true;
+            }
+        }
+        // skip the vertex with unmatched clabel
+        if (this.sourceCLabel != null && !match(v, this.sourceCLabel)) {
+            return true;
+        }
+        return false;
+    }
+
+    private Iterator<Vertex> sourceVertices(int pass) {
+        if (pass > 0) {
+            // all vertices of merged community
+            String lastPassLabel = labelOfPassN(pass - 1);
+            return this.vertices(lastPassLabel, LIMIT);
+        } else {
+            assert pass == 0;
+            // all vertices at the first time
+            return this.vertices(this.sourceLabel, LIMIT);
+        }
+    }
+
+    private List<Edge> neighbors(Id vid) {
+        Iterator<Edge> nbs = this.edgesOfVertex(vid, Directions.BOTH,
+                                                (Id) null, this.degree);
+        @SuppressWarnings("resource")
+        ListIterator<Edge> list = new ListIterator<>(LIMIT, nbs);
+        return (List<Edge>) list.list();
+    }
+
+    private float weightOfVertex(Vertex v, List<Edge> edges) {
+        // degree/weight of vertex
+        Float value = this.cache.vertexWeight((Id) v.id());
+        if (value != null) {
+            return value;
+        }
+        if (edges == null) {
+            edges = neighbors((Id) v.id());
+        }
+        float weight = weightOfEdges(edges);
+        this.cache.vertexWeight((Id) v.id(), weight);
+        return weight;
+    }
+
+    private int kinOfVertex(Vertex v) {
+        if (v.label().startsWith(C_PASS) && v.property(C_KIN).isPresent()) {
+            return v.value(C_KIN);
+        }
+        return 0;
+    }
+
+    private float cweightOfVertex(Vertex v) {
+        if (v.label().startsWith(C_PASS) && v.property(C_WEIGHT).isPresent()) {
+            return v.value(C_WEIGHT);
+        }
+        return 1f;
+    }
+
+    private Community communityOfVertex(Vertex v, List<Edge> nbs) {
+        Id vid = (Id) v.id();
+        Community c = this.cache.vertex2Community(vid);
+        // ensure source vertex exist in cache
+        if (c == null) {
+            c = this.wrapCommunity(v, nbs);
+            assert c != null;
+        }
+        return c;
+    }
+
+    // 1: wrap original vertex as community node
+    // 2: add original vertices to community node,
+    //    and save as community vertex when merge()
+    // 3: wrap community vertex as community node,
+    //    and repeat step 2 and step 3.
+    private Community wrapCommunity(Vertex v, List<Edge> nbs) {
+        Id vid = (Id) v.id();
+        Community comm = this.cache.vertex2Community(vid);
+        if (comm != null) {
+            return comm;
+        }
+
+        comm = new Community(vid);
+        comm.add(this, v, nbs);
+        comm = this.cache.vertex2CommunityIfAbsent(vid, comm);
+        return comm;
+    }
+
+    private Collection<Pair<Community, MutableInt>> nbCommunities(
+                                                    int pass,
+                                                    List<Edge> edges) {
+        // comms is a map of cid:[community,weight]
+        Map<Id, Pair<Community, MutableInt>> comms = new HashMap<>();
+        for (Edge edge : edges) {
+            Vertex otherV = ((HugeEdge) edge).otherVertex();
+            if (needSkipVertex(pass, otherV)) {
+                // skip the old intermediate data, or filter clabel
+                continue;
+            }
+            // will traverse the neighbors of otherV
+            Community c = this.wrapCommunity(otherV, null);
+            if (!comms.containsKey(c.cid)) {
+                comms.put(c.cid, Pair.of(c, new MutableInt(0)));
+            }
+            // calc weight between source vertex and neighbor community
+            comms.get(c.cid).getRight().add(2 * weightOfEdge(edge));
+        }
+        return comms.values();
+    }
+
+    private void doMoveCommunity(Vertex v, List<Edge> nbs, Community newC) {
+        Id vid = (Id) v.id();
+
+        // update community of v (return the origin one)
+        Community oldC = this.cache.vertex2Community(vid, newC);
+
+        // remove v from old community. should synchronized (vid)?
+        if (oldC != null) {
+            oldC.remove(this, v, nbs);
+        }
+
+        // add v to new community
+        newC.add(this, v, nbs);
+        LOG.debug("Move {} to community: {}", v, newC);
+    }
+
+    private boolean moveCommunity(Vertex v, int pass) {
+        // move vertex to neighbor community if needed
+        List<Edge> nbs = neighbors((Id) v.id());
+        if (this.skipIsolated && pass == 0 && nbs.isEmpty()) {
+            return false;
+        }
+        Community c = communityOfVertex(v, nbs);
+        double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
+        // update community of v if △Q changed
+        double maxDeltaQ = 0d;
+        Community bestComm = null;
+        // list all neighbor communities of v
+        for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
+            // △Q = (Ki_in - Ki * Etot / m) / 2m
+            Community otherC = nbc.getLeft();
+            if (otherC.size() >= MAX_COMM_SIZE) {
+                LOG.info("Skip community {} for {} due to its size >= {}",
+                         otherC.cid, v, MAX_COMM_SIZE);
+                continue;
+            }
+            // weight between c and otherC
+            double kiin = nbc.getRight().floatValue();
+            // weight of otherC
+            double tot = otherC.kin() + otherC.kout();
+            if (c.equals(otherC)) {
+                assert c == otherC;
+                if (tot < ki) {
+                    /*
+                     * expect tot >= ki, but multi-threads may
+                     * cause tot < ki due to concurrent update otherC
+                     */
+                    LOG.warn("Changing vertex: {}(ki={}, kiin={}, pass={}), 
otherC: {}",
+                             v, ki, kiin, pass, otherC);
+                }
+                tot -= ki;
+                // assert tot >= 0d : otherC + ", tot=" + tot + ", ki=" + ki;
+                // expect tot >= 0, but may be something wrong?
+                if (tot < 0d) {
+                    tot = 0d;
+                }
+            }
+            double deltaQ = kiin - ki * tot / this.m;
+            if (deltaQ > maxDeltaQ) {
+                // TODO: cache otherC for neighbors the same community
+                maxDeltaQ = deltaQ;
+                bestComm = otherC;
+            }
+        }
+        if (maxDeltaQ > 0d && !c.equals(bestComm)) {
+            // move v to the community of maxQ neighbor
+            doMoveCommunity(v, nbs, bestComm);
+            return true;
+        }
+        return false;
+    }
+
+    private double moveCommunities(int pass) {
+        LOG.info("Detect community for pass {}", pass);
+        Iterator<Vertex> vertices = this.sourceVertices(pass);
+
+        // shuffle
+        //r = r.order().by(shuffle);
+
+        long total = 0L;
+        AtomicLong moved = new AtomicLong(0L);
+
+        Consumers<Vertex> consumers = new Consumers<>(this.executor, v -> {
+            // called by multi-threads
+            if (this.moveCommunity(v, pass)) {
+                moved.incrementAndGet();
+            }
+        });
+
+        consumers.start("louvain-move-pass-" + pass);
+        try {
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                Vertex v = vertices.next();
+                if (needSkipVertex(pass, v)) {
+                    // skip the old intermediate data, or filter clabel
+                    continue;
+                }
+                total++;
+                consumers.provide(v);
+            }
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
+        }
+
+        // maybe always shocking when set degree limited
+        return total == 0L ? 0d : moved.doubleValue() / total;
+    }
+
+    private void mergeCommunities(int pass) {
+        LOG.info("Merge community for pass {}", pass);
+        // merge each community as a vertex
+        Collection<Pair<Community, Set<Id>>> comms = this.cache.communities();
+        assert this.skipIsolated || this.allMembersExist(comms,  pass - 1);
+        this.cache.resetVertexWeight();
+
+        Consumers<Pair<Community, Set<Id>>> consumers = new Consumers<>(
+                                                        this.executor, pair -> 
{
+            // called by multi-threads
+            this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+        }, () -> {
+            // commit when finished
+            this.graph().tx().commit();
+        });
+
+        consumers.start("louvain-merge-pass-" + pass);
+        try {
+            for (Pair<Community, Set<Id>> pair : comms) {
+                Community c = pair.getLeft();
+                if (c.empty()) {
+                    continue;
+                }
+                this.progress += pair.getRight().size();
+                this.updateProgress(this.progress);
+                //this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+                consumers.provide(pair);
+            }
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
+        }
+
+        this.graph().tx().commit();
+        assert this.skipIsolated || this.allMembersExist(pass);
+
+        // reset communities
+        this.cache.reset();
+    }
+
+    private void mergeCommunity(int pass, Community c, Set<Id> cvertices) {
+        // update kin and edges between communities
+        int kin = c.kin();
+        int membersSize = cvertices.size();
+        assert !cvertices.isEmpty();
+        assert membersSize == c.size();
+        List<String> members = new ArrayList<>(membersSize);
+        Map<Id, MutableFloat> cedges = new HashMap<>(membersSize);
+        for (Id v : cvertices) {
+            members.add(v.toString());
+            // collect edges between this community and other communities
+            List<Edge> neighbors = neighbors(v);
+            for (Edge edge : neighbors) {
+                Vertex otherV = ((HugeEdge) edge).otherVertex();
+                if (cvertices.contains(otherV.id())) {
+                    // inner edges of this community, will be calc twice
+                    // due to both e-in and e-out are in vertices,
+                    kin += weightOfEdge(edge);

Review Comment:
   shall we drop the float (use floor)? (or use rounding number?)



##########
hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java:
##########
@@ -0,0 +1,929 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm.comm;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
+import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.backend.id.Id;
+import com.baidu.hugegraph.backend.id.IdGenerator;
+import com.baidu.hugegraph.exception.ExistedException;
+import com.baidu.hugegraph.iterator.ListIterator;
+import com.baidu.hugegraph.job.UserJob;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
+import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser;
+import com.baidu.hugegraph.job.algorithm.Consumers;
+import com.baidu.hugegraph.schema.SchemaLabel;
+import com.baidu.hugegraph.schema.SchemaManager;
+import com.baidu.hugegraph.schema.VertexLabel;
+import com.baidu.hugegraph.structure.HugeEdge;
+import com.baidu.hugegraph.structure.HugeVertex;
+import com.baidu.hugegraph.type.define.Directions;
+import com.baidu.hugegraph.util.InsertionOrderUtil;
+import com.baidu.hugegraph.util.Log;
+import com.baidu.hugegraph.util.StringEncoding;
+import com.google.common.collect.ImmutableMap;
+
+public class LouvainTraverser extends AlgoTraverser {
+
+    public static final String C_PASS = "c_pass-";
+    public static final String C_KIN = "c_kin";
+    public static final String C_WEIGHT = "c_weight";
+    public static final String C_MEMBERS = "c_members";
+
+    private static final long LIMIT = AbstractAlgorithm.MAX_QUERY_LIMIT;
+    private static final int MAX_COMM_SIZE = 100000; // 10w
+
+    private static final Logger LOG = Log.logger(LouvainTraverser.class);
+
+    private final GraphTraversalSource g;
+    private final String sourceLabel;
+    private final String sourceCLabel;
+    private final long degree;
+    private final boolean skipIsolated;
+
+    private final Cache cache;
+
+    private long m;
+    private String passLabel;
+
+    public LouvainTraverser(UserJob<Object> job, int workers, long degree,
+                            String sourceLabel, String sourceCLabel,
+                            boolean skipIsolated) {
+        super(job, LouvainAlgorithm.ALGO_NAME, workers);
+        this.g = this.graph().traversal();
+        this.sourceLabel = sourceLabel;
+        this.sourceCLabel = sourceCLabel;
+        this.degree = degree;
+        this.skipIsolated = skipIsolated;
+        this.m = 1L;
+        this.passLabel = "";
+
+        this.cache = new Cache();
+    }
+
+    private void defineSchemaOfPk() {
+        String label = this.labelOfPassN(0);
+        if (this.graph().existsVertexLabel(label) ||
+            this.graph().existsEdgeLabel(label)) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding");
+        }
+
+        SchemaManager schema = this.graph().schema();
+        schema.propertyKey(C_KIN).asInt()
+              .ifNotExist().create();
+        schema.propertyKey(C_MEMBERS).valueSet().asText()
+              .ifNotExist().create();
+        schema.propertyKey(C_WEIGHT).asFloat()
+              .ifNotExist().create();
+
+        this.m = this.g.E().count().next();
+    }
+
+    private void defineSchemaOfPassN(int pass) {
+        this.passLabel = labelOfPassN(pass);
+
+        SchemaManager schema = this.graph().schema();
+        try {
+            schema.vertexLabel(this.passLabel).useCustomizeStringId()
+                  .properties(C_KIN, C_MEMBERS, C_WEIGHT)
+                  .create();
+            schema.edgeLabel(this.passLabel)
+                  .sourceLabel(this.passLabel)
+                  .targetLabel(this.passLabel)
+                  .properties(C_WEIGHT)
+                  .create();
+        } catch (ExistedException e) {
+            throw new IllegalArgumentException(
+                      "Please clear historical results before proceeding", e);
+        }
+    }
+
+    private List<String> cpassEdgeLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getEdgeLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private List<String> cpassVertexLabels() {
+        List<String> names = new ArrayList<>();
+        for (SchemaLabel label : this.graph().schema().getVertexLabels()) {
+            String name = label.name();
+            if (name.startsWith(C_PASS)) {
+                names.add(name);
+            }
+        }
+        return names;
+    }
+
+    private String labelOfPassN(int n) {
+        return C_PASS + n;
+    }
+
+    private float weightOfEdge(Edge e) {
+        if (e.label().startsWith(C_PASS)) {
+            assert e.property(C_WEIGHT).isPresent();
+            return e.value(C_WEIGHT);
+        } else if (e.property(C_WEIGHT).isPresent()) {
+            return e.value(C_WEIGHT);
+        }
+        return 1f;
+    }
+
+    private float weightOfEdges(List<Edge> edges) {
+        float weight = 0f;
+        for (Edge edge : edges) {
+            weight += weightOfEdge(edge);
+        }
+        return weight;
+    }
+
+    private Vertex newCommunityNode(Id cid, float cweight,
+                                    int kin, List<String> members) {
+        assert !members.isEmpty() : members;
+        /*
+         * cweight: members size(all pass) of the community, just for debug
+         * kin: edges weight in the community
+         * members: members id of the community of last pass
+         */
+        return this.graph().addVertex(T.label, this.passLabel,
+                                      T.id, cid, C_WEIGHT, cweight,
+                                      C_KIN, kin, C_MEMBERS, members);
+    }
+
+    private Vertex makeCommunityNode(Id cid) {
+        VertexLabel vl = this.graph().vertexLabel(this.passLabel);
+        return new HugeVertex(this.graph(), cid, vl);
+    }
+
+    private Edge newCommunityEdge(Vertex source, Vertex target, float weight) {
+        return source.addEdge(this.passLabel, target, C_WEIGHT, weight);
+    }
+
+    private void insertNewCommunity(int pass, Id cid, float cweight,
+                                    int kin, List<String> members,
+                                    Map<Id, MutableFloat> cedges) {
+        // create backend vertex if it's the first time
+        Id vid = this.cache.genId(pass, cid);
+        Vertex node = this.newCommunityNode(vid, cweight, kin, members);
+        commitIfNeeded();
+        // update backend vertex edges
+        for (Map.Entry<Id, MutableFloat> e : cedges.entrySet()) {
+            float weight = e.getValue().floatValue();
+            vid = this.cache.genId(pass, e.getKey());
+            Vertex targetV = this.makeCommunityNode(vid);
+            this.newCommunityEdge(node, targetV, weight);
+            commitIfNeeded();
+        }
+        LOG.debug("Add new comm: {} kin={} size={}", node, kin, 
members.size());
+    }
+
+    private boolean needSkipVertex(int pass, Vertex v) {
+        // skip the old intermediate data when first pass
+        String label = v.label();
+        if (label.startsWith(C_PASS)) {
+            if (pass == 0) {
+                return true;
+            }
+            String lastPassLabel = labelOfPassN(pass - 1);
+            if (!label.equals(lastPassLabel)) {
+                return true;
+            }
+        }
+        // skip the vertex with unmatched clabel
+        if (this.sourceCLabel != null && !match(v, this.sourceCLabel)) {
+            return true;
+        }
+        return false;
+    }
+
+    private Iterator<Vertex> sourceVertices(int pass) {
+        if (pass > 0) {
+            // all vertices of merged community
+            String lastPassLabel = labelOfPassN(pass - 1);
+            return this.vertices(lastPassLabel, LIMIT);
+        } else {
+            assert pass == 0;
+            // all vertices at the first time
+            return this.vertices(this.sourceLabel, LIMIT);
+        }
+    }
+
+    private List<Edge> neighbors(Id vid) {
+        Iterator<Edge> nbs = this.edgesOfVertex(vid, Directions.BOTH,
+                                                (Id) null, this.degree);
+        @SuppressWarnings("resource")
+        ListIterator<Edge> list = new ListIterator<>(LIMIT, nbs);
+        return (List<Edge>) list.list();
+    }
+
+    private float weightOfVertex(Vertex v, List<Edge> edges) {
+        // degree/weight of vertex
+        Float value = this.cache.vertexWeight((Id) v.id());
+        if (value != null) {
+            return value;
+        }
+        if (edges == null) {
+            edges = neighbors((Id) v.id());
+        }
+        float weight = weightOfEdges(edges);
+        this.cache.vertexWeight((Id) v.id(), weight);
+        return weight;
+    }
+
+    private int kinOfVertex(Vertex v) {
+        if (v.label().startsWith(C_PASS) && v.property(C_KIN).isPresent()) {
+            return v.value(C_KIN);
+        }
+        return 0;
+    }
+
+    private float cweightOfVertex(Vertex v) {
+        if (v.label().startsWith(C_PASS) && v.property(C_WEIGHT).isPresent()) {
+            return v.value(C_WEIGHT);
+        }
+        return 1f;
+    }
+
+    private Community communityOfVertex(Vertex v, List<Edge> nbs) {
+        Id vid = (Id) v.id();
+        Community c = this.cache.vertex2Community(vid);
+        // ensure source vertex exist in cache
+        if (c == null) {
+            c = this.wrapCommunity(v, nbs);
+            assert c != null;
+        }
+        return c;
+    }
+
+    // 1: wrap original vertex as community node
+    // 2: add original vertices to community node,
+    //    and save as community vertex when merge()
+    // 3: wrap community vertex as community node,
+    //    and repeat step 2 and step 3.
+    private Community wrapCommunity(Vertex v, List<Edge> nbs) {
+        Id vid = (Id) v.id();
+        Community comm = this.cache.vertex2Community(vid);
+        if (comm != null) {
+            return comm;
+        }
+
+        comm = new Community(vid);
+        comm.add(this, v, nbs);
+        comm = this.cache.vertex2CommunityIfAbsent(vid, comm);
+        return comm;
+    }
+
+    private Collection<Pair<Community, MutableInt>> nbCommunities(
+                                                    int pass,
+                                                    List<Edge> edges) {
+        // comms is a map of cid:[community,weight]
+        Map<Id, Pair<Community, MutableInt>> comms = new HashMap<>();
+        for (Edge edge : edges) {
+            Vertex otherV = ((HugeEdge) edge).otherVertex();
+            if (needSkipVertex(pass, otherV)) {
+                // skip the old intermediate data, or filter clabel
+                continue;
+            }
+            // will traverse the neighbors of otherV
+            Community c = this.wrapCommunity(otherV, null);
+            if (!comms.containsKey(c.cid)) {
+                comms.put(c.cid, Pair.of(c, new MutableInt(0)));
+            }
+            // calc weight between source vertex and neighbor community
+            comms.get(c.cid).getRight().add(2 * weightOfEdge(edge));
+        }
+        return comms.values();
+    }
+
+    private void doMoveCommunity(Vertex v, List<Edge> nbs, Community newC) {
+        Id vid = (Id) v.id();
+
+        // update community of v (return the origin one)
+        Community oldC = this.cache.vertex2Community(vid, newC);
+
+        // remove v from old community. should synchronized (vid)?
+        if (oldC != null) {
+            oldC.remove(this, v, nbs);
+        }
+
+        // add v to new community
+        newC.add(this, v, nbs);
+        LOG.debug("Move {} to community: {}", v, newC);
+    }
+
+    private boolean moveCommunity(Vertex v, int pass) {
+        // move vertex to neighbor community if needed
+        List<Edge> nbs = neighbors((Id) v.id());
+        if (this.skipIsolated && pass == 0 && nbs.isEmpty()) {
+            return false;
+        }
+        Community c = communityOfVertex(v, nbs);
+        double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
+        // update community of v if △Q changed
+        double maxDeltaQ = 0d;
+        Community bestComm = null;
+        // list all neighbor communities of v
+        for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
+            // △Q = (Ki_in - Ki * Etot / m) / 2m
+            Community otherC = nbc.getLeft();
+            if (otherC.size() >= MAX_COMM_SIZE) {
+                LOG.info("Skip community {} for {} due to its size >= {}",
+                         otherC.cid, v, MAX_COMM_SIZE);
+                continue;
+            }
+            // weight between c and otherC
+            double kiin = nbc.getRight().floatValue();
+            // weight of otherC
+            double tot = otherC.kin() + otherC.kout();
+            if (c.equals(otherC)) {
+                assert c == otherC;
+                if (tot < ki) {
+                    /*
+                     * expect tot >= ki, but multi-threads may
+                     * cause tot < ki due to concurrent update otherC
+                     */
+                    LOG.warn("Changing vertex: {}(ki={}, kiin={}, pass={}), 
otherC: {}",
+                             v, ki, kiin, pass, otherC);
+                }
+                tot -= ki;
+                // assert tot >= 0d : otherC + ", tot=" + tot + ", ki=" + ki;
+                // expect tot >= 0, but may be something wrong?
+                if (tot < 0d) {
+                    tot = 0d;
+                }
+            }
+            double deltaQ = kiin - ki * tot / this.m;
+            if (deltaQ > maxDeltaQ) {
+                // TODO: cache otherC for neighbors the same community
+                maxDeltaQ = deltaQ;
+                bestComm = otherC;
+            }
+        }
+        if (maxDeltaQ > 0d && !c.equals(bestComm)) {
+            // move v to the community of maxQ neighbor
+            doMoveCommunity(v, nbs, bestComm);
+            return true;
+        }
+        return false;
+    }
+
+    private double moveCommunities(int pass) {
+        LOG.info("Detect community for pass {}", pass);
+        Iterator<Vertex> vertices = this.sourceVertices(pass);
+
+        // shuffle
+        //r = r.order().by(shuffle);
+
+        long total = 0L;
+        AtomicLong moved = new AtomicLong(0L);
+
+        Consumers<Vertex> consumers = new Consumers<>(this.executor, v -> {
+            // called by multi-threads
+            if (this.moveCommunity(v, pass)) {
+                moved.incrementAndGet();
+            }
+        });
+
+        consumers.start("louvain-move-pass-" + pass);
+        try {
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                Vertex v = vertices.next();
+                if (needSkipVertex(pass, v)) {
+                    // skip the old intermediate data, or filter clabel
+                    continue;
+                }
+                total++;
+                consumers.provide(v);
+            }
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
+        }
+
+        // maybe always shocking when set degree limited
+        return total == 0L ? 0d : moved.doubleValue() / total;
+    }
+
+    private void mergeCommunities(int pass) {
+        LOG.info("Merge community for pass {}", pass);
+        // merge each community as a vertex
+        Collection<Pair<Community, Set<Id>>> comms = this.cache.communities();
+        assert this.skipIsolated || this.allMembersExist(comms,  pass - 1);
+        this.cache.resetVertexWeight();
+
+        Consumers<Pair<Community, Set<Id>>> consumers = new Consumers<>(
+                                                        this.executor, pair -> 
{
+            // called by multi-threads
+            this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+        }, () -> {
+            // commit when finished
+            this.graph().tx().commit();
+        });
+
+        consumers.start("louvain-merge-pass-" + pass);
+        try {
+            for (Pair<Community, Set<Id>> pair : comms) {
+                Community c = pair.getLeft();
+                if (c.empty()) {
+                    continue;
+                }
+                this.progress += pair.getRight().size();
+                this.updateProgress(this.progress);
+                //this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+                consumers.provide(pair);
+            }
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
+        }
+
+        this.graph().tx().commit();
+        assert this.skipIsolated || this.allMembersExist(pass);
+
+        // reset communities
+        this.cache.reset();
+    }
+
+    private void mergeCommunity(int pass, Community c, Set<Id> cvertices) {
+        // update kin and edges between communities
+        int kin = c.kin();
+        int membersSize = cvertices.size();
+        assert !cvertices.isEmpty();
+        assert membersSize == c.size();
+        List<String> members = new ArrayList<>(membersSize);
+        Map<Id, MutableFloat> cedges = new HashMap<>(membersSize);
+        for (Id v : cvertices) {
+            members.add(v.toString());
+            // collect edges between this community and other communities
+            List<Edge> neighbors = neighbors(v);
+            for (Edge edge : neighbors) {
+                Vertex otherV = ((HugeEdge) edge).otherVertex();
+                if (cvertices.contains(otherV.id())) {
+                    // inner edges of this community, will be calc twice
+                    // due to both e-in and e-out are in vertices,
+                    kin += weightOfEdge(edge);

Review Comment:
   @javeme shall we drop the float (use floor)? (or use rounding number?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@hugegraph.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to