javeme commented on code in PR #285: URL: https://github.com/apache/incubator-hugegraph-computer/pull/285#discussion_r1437839434
########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { Review Comment: prefer `this.targetId.contains(",")` ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target Review Comment: prefer `single target: one vertex id` ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { + // remove spaces + this.targetId = Arrays.stream(this.targetId.split(",")) + .map(e -> e.trim()) + .collect(Collectors.joining(",")); + this.targetQuantityType = QuantityType.MULTIPLE; + } else { + this.targetQuantityType = QuantityType.SINGLE; + } + + this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, ""); + + this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1); + if (this.defaultWeight <= 0) { + throw new ComputerException("The param '%s' must be greater than 0, " + + "actual got '%s'", + OPTION_DEFAULT_WEIGHT, this.defaultWeight); + } + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + SingleSourceShortestPathValue value = new SingleSourceShortestPathValue(); + value.unreachable(); + vertex.value(value); + + // start from source vertex + if (!this.idEquals(vertex, this.sourceId)) { + vertex.inactivate(); + return; + } + value.zeroDistance(); // source vertex + + // single target && source vertex == target vertex + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.sourceId.equals(this.targetId)) { + LOG.debug("source vertex {} equals target vertex {}", this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + if (vertex.numEdges() <= 0) { + // isolated vertex + LOG.debug("source vertex {} can not reach target vertex {}", + this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue message = new SingleSourceShortestPathValue(); + message.addToPath(vertex, this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), message); + }); + + vertex.inactivate(); + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator<SingleSourceShortestPathValue> messages) { + if (this.isTarget(vertex) && !this.reachTarget.contains(vertex.id())) { + // reach target + this.reachTarget.add(vertex.id()); + } + + while (messages.hasNext()) { + SingleSourceShortestPathValue message = messages.next(); + SingleSourceShortestPathValue value = vertex.value(); + + if (message.totalWeight() < value.totalWeight()) { + // find a shorter path + value.shorterPath(vertex, message.path(), message.totalWeight()); + } else { + continue; + } + + // reach all target or nowhere to go + if (this.isAllTargetReach(vertex) || vertex.numEdges() <= 0) { + continue; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue(); + forwardMessage.addToPath(value.path(), + value.totalWeight() + this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), forwardMessage); + }); + } + + vertex.inactivate(); + } + + @Override + public void beforeSuperstep(WorkerContext context) { + this.reachTarget = context.aggregatedValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET, + this.reachTarget); + } + + /** + * determine whether vertex.id and id are equal + */ + private boolean idEquals(Vertex vertex, String id) { + return vertex.id().value().toString().equals(id); + } + + /** + * get the weight of an edge by its weight property + */ + private double getEdgeWeight(Edge edge) { + double weight = this.defaultWeight; + + Value property = edge.property(this.weightProperty); + if (property != null) { + if (!property.isNumber()) { + throw new ComputerException("The value of %s must be a numeric value, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + + weight = ((DoubleValue) property).doubleValue(); + if (weight <= 0) { + throw new ComputerException("The value of %s must be greater than 0, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + } + return weight; + } + + /** + * determine whether vertex is one of the target + */ + private boolean isTarget(Vertex vertex) { + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.idEquals(vertex, this.targetId)) { + return true; + } + + if (this.targetQuantityType.equals(QuantityType.MULTIPLE)) { Review Comment: ditto ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated Review Comment: multiple vertex ids separated by comma ########## computer-api/src/main/java/org/apache/hugegraph/computer/core/combiner/IdListMergeCombiner.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.core.combiner; + +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.IdList; + +public class IdListMergeCombiner implements Combiner<IdList> { + + @Override + public void combine(IdList v1, IdList v2, IdList result) { + // merge + for (Id id : v1.values()) { + if (!result.contains(id)) { Review Comment: TODO: use IdSet ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; Review Comment: do you mean reachedTargets? ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathMaster.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import org.apache.hugegraph.computer.core.combiner.IdListMergeCombiner; +import org.apache.hugegraph.computer.core.graph.value.ValueType; +import org.apache.hugegraph.computer.core.master.MasterComputation; +import org.apache.hugegraph.computer.core.master.MasterComputationContext; +import org.apache.hugegraph.computer.core.master.MasterContext; + +public class SingleSourceShortestPathMaster implements MasterComputation { + + public static final String SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET = + "single_source_shortest_path.reach_target"; Review Comment: REACHED_TARGETS / reached_targets ? ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPathOutput.java: ########## @@ -36,13 +37,12 @@ protected void prepareSchema() { } @Override - protected List<String> value(Vertex vertex) { - ShortestPathValue value = vertex.value(); + protected String value(Vertex vertex) { + SingleSourceShortestPathValue value = vertex.value(); - List<String> path = new ArrayList<>(); - for (int i = 0; i < value.path().size(); i++) { - path.add(value.path().get(i).toString()); - } - return path; + Map map = new HashMap(); + map.put("path", value.path().toString()); + map.put("totalWeight", value.totalWeight()); Review Comment: expect style of json key: totalWeight => total_weight ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { + // remove spaces + this.targetId = Arrays.stream(this.targetId.split(",")) + .map(e -> e.trim()) + .collect(Collectors.joining(",")); + this.targetQuantityType = QuantityType.MULTIPLE; + } else { + this.targetQuantityType = QuantityType.SINGLE; + } + + this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, ""); + + this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1); + if (this.defaultWeight <= 0) { + throw new ComputerException("The param '%s' must be greater than 0, " + + "actual got '%s'", + OPTION_DEFAULT_WEIGHT, this.defaultWeight); + } + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + SingleSourceShortestPathValue value = new SingleSourceShortestPathValue(); + value.unreachable(); + vertex.value(value); + + // start from source vertex + if (!this.idEquals(vertex, this.sourceId)) { + vertex.inactivate(); + return; + } + value.zeroDistance(); // source vertex + + // single target && source vertex == target vertex + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.sourceId.equals(this.targetId)) { + LOG.debug("source vertex {} equals target vertex {}", this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + if (vertex.numEdges() <= 0) { + // isolated vertex + LOG.debug("source vertex {} can not reach target vertex {}", + this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue message = new SingleSourceShortestPathValue(); + message.addToPath(vertex, this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), message); + }); + + vertex.inactivate(); + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator<SingleSourceShortestPathValue> messages) { + if (this.isTarget(vertex) && !this.reachTarget.contains(vertex.id())) { + // reach target + this.reachTarget.add(vertex.id()); + } + + while (messages.hasNext()) { + SingleSourceShortestPathValue message = messages.next(); + SingleSourceShortestPathValue value = vertex.value(); + + if (message.totalWeight() < value.totalWeight()) { + // find a shorter path + value.shorterPath(vertex, message.path(), message.totalWeight()); + } else { + continue; + } + + // reach all target or nowhere to go + if (this.isAllTargetReach(vertex) || vertex.numEdges() <= 0) { + continue; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue(); + forwardMessage.addToPath(value.path(), + value.totalWeight() + this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), forwardMessage); + }); + } + + vertex.inactivate(); + } + + @Override + public void beforeSuperstep(WorkerContext context) { + this.reachTarget = context.aggregatedValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET, + this.reachTarget); + } + + /** + * determine whether vertex.id and id are equal + */ + private boolean idEquals(Vertex vertex, String id) { + return vertex.id().value().toString().equals(id); Review Comment: please note two ids with string type and number type may not equal, like `123` and `"123"` ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { + // remove spaces + this.targetId = Arrays.stream(this.targetId.split(",")) + .map(e -> e.trim()) + .collect(Collectors.joining(",")); + this.targetQuantityType = QuantityType.MULTIPLE; + } else { + this.targetQuantityType = QuantityType.SINGLE; + } + + this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, ""); + + this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1); + if (this.defaultWeight <= 0) { + throw new ComputerException("The param '%s' must be greater than 0, " + + "actual got '%s'", + OPTION_DEFAULT_WEIGHT, this.defaultWeight); + } + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + SingleSourceShortestPathValue value = new SingleSourceShortestPathValue(); + value.unreachable(); + vertex.value(value); + + // start from source vertex + if (!this.idEquals(vertex, this.sourceId)) { + vertex.inactivate(); + return; + } + value.zeroDistance(); // source vertex + + // single target && source vertex == target vertex + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.sourceId.equals(this.targetId)) { + LOG.debug("source vertex {} equals target vertex {}", this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + if (vertex.numEdges() <= 0) { + // isolated vertex + LOG.debug("source vertex {} can not reach target vertex {}", + this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue message = new SingleSourceShortestPathValue(); + message.addToPath(vertex, this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), message); + }); + + vertex.inactivate(); + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator<SingleSourceShortestPathValue> messages) { + if (this.isTarget(vertex) && !this.reachTarget.contains(vertex.id())) { + // reach target + this.reachTarget.add(vertex.id()); + } + + while (messages.hasNext()) { + SingleSourceShortestPathValue message = messages.next(); + SingleSourceShortestPathValue value = vertex.value(); + + if (message.totalWeight() < value.totalWeight()) { + // find a shorter path + value.shorterPath(vertex, message.path(), message.totalWeight()); + } else { + continue; + } + + // reach all target or nowhere to go + if (this.isAllTargetReach(vertex) || vertex.numEdges() <= 0) { + continue; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue(); + forwardMessage.addToPath(value.path(), + value.totalWeight() + this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), forwardMessage); + }); + } + + vertex.inactivate(); + } + + @Override + public void beforeSuperstep(WorkerContext context) { + this.reachTarget = context.aggregatedValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET, + this.reachTarget); + } + + /** + * determine whether vertex.id and id are equal + */ + private boolean idEquals(Vertex vertex, String id) { + return vertex.id().value().toString().equals(id); + } + + /** + * get the weight of an edge by its weight property + */ + private double getEdgeWeight(Edge edge) { + double weight = this.defaultWeight; + + Value property = edge.property(this.weightProperty); + if (property != null) { + if (!property.isNumber()) { + throw new ComputerException("The value of %s must be a numeric value, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + + weight = ((DoubleValue) property).doubleValue(); + if (weight <= 0) { + throw new ComputerException("The value of %s must be greater than 0, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + } + return weight; + } + + /** + * determine whether vertex is one of the target + */ + private boolean isTarget(Vertex vertex) { + if (this.targetQuantityType.equals(QuantityType.SINGLE) && Review Comment: `this.targetQuantityType == QuantityType.SINGLE` is ok ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { + // remove spaces + this.targetId = Arrays.stream(this.targetId.split(",")) + .map(e -> e.trim()) + .collect(Collectors.joining(",")); + this.targetQuantityType = QuantityType.MULTIPLE; + } else { + this.targetQuantityType = QuantityType.SINGLE; + } + + this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, ""); + + this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1); + if (this.defaultWeight <= 0) { + throw new ComputerException("The param '%s' must be greater than 0, " + + "actual got '%s'", + OPTION_DEFAULT_WEIGHT, this.defaultWeight); + } + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + SingleSourceShortestPathValue value = new SingleSourceShortestPathValue(); + value.unreachable(); + vertex.value(value); + + // start from source vertex + if (!this.idEquals(vertex, this.sourceId)) { + vertex.inactivate(); + return; + } + value.zeroDistance(); // source vertex + + // single target && source vertex == target vertex + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.sourceId.equals(this.targetId)) { + LOG.debug("source vertex {} equals target vertex {}", this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + if (vertex.numEdges() <= 0) { + // isolated vertex + LOG.debug("source vertex {} can not reach target vertex {}", + this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue message = new SingleSourceShortestPathValue(); + message.addToPath(vertex, this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), message); + }); + + vertex.inactivate(); + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator<SingleSourceShortestPathValue> messages) { + if (this.isTarget(vertex) && !this.reachTarget.contains(vertex.id())) { + // reach target + this.reachTarget.add(vertex.id()); + } + + while (messages.hasNext()) { + SingleSourceShortestPathValue message = messages.next(); + SingleSourceShortestPathValue value = vertex.value(); + + if (message.totalWeight() < value.totalWeight()) { + // find a shorter path + value.shorterPath(vertex, message.path(), message.totalWeight()); + } else { + continue; + } + + // reach all target or nowhere to go + if (this.isAllTargetReach(vertex) || vertex.numEdges() <= 0) { + continue; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue(); + forwardMessage.addToPath(value.path(), + value.totalWeight() + this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), forwardMessage); + }); + } + + vertex.inactivate(); + } + + @Override + public void beforeSuperstep(WorkerContext context) { + this.reachTarget = context.aggregatedValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET, + this.reachTarget); + } + + /** + * determine whether vertex.id and id are equal + */ + private boolean idEquals(Vertex vertex, String id) { + return vertex.id().value().toString().equals(id); + } + + /** + * get the weight of an edge by its weight property + */ + private double getEdgeWeight(Edge edge) { + double weight = this.defaultWeight; + + Value property = edge.property(this.weightProperty); + if (property != null) { + if (!property.isNumber()) { + throw new ComputerException("The value of %s must be a numeric value, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + + weight = ((DoubleValue) property).doubleValue(); + if (weight <= 0) { + throw new ComputerException("The value of %s must be greater than 0, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + } + return weight; + } + + /** + * determine whether vertex is one of the target + */ + private boolean isTarget(Vertex vertex) { + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.idEquals(vertex, this.targetId)) { + return true; + } + + if (this.targetQuantityType.equals(QuantityType.MULTIPLE)) { + for (String targetId : this.targetId.split(",")) { + if (this.idEquals(vertex, targetId)) { + return true; + } + } + } + return false; + } + + /** + * determine whether reach all target + */ + private boolean isAllTargetReach(Vertex vertex) { Review Comment: isAllTargetsReached? ########## computer-algorithm/src/main/java/org/apache/hugegraph/computer/algorithm/path/shortest/SingleSourceShortestPath.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.computer.algorithm.path.shortest; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hugegraph.computer.core.common.exception.ComputerException; +import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.graph.edge.Edge; +import org.apache.hugegraph.computer.core.graph.id.Id; +import org.apache.hugegraph.computer.core.graph.value.DoubleValue; +import org.apache.hugegraph.computer.core.graph.value.IdList; +import org.apache.hugegraph.computer.core.graph.value.Value; +import org.apache.hugegraph.computer.core.graph.vertex.Vertex; +import org.apache.hugegraph.computer.core.worker.Computation; +import org.apache.hugegraph.computer.core.worker.ComputationContext; +import org.apache.hugegraph.computer.core.worker.WorkerContext; +import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +public class SingleSourceShortestPath implements Computation<SingleSourceShortestPathValue> { + + private static final Logger LOG = Log.logger(SingleSourceShortestPath.class); + + public static final String OPTION_SOURCE_ID = "single_source_shortest_path.source_id"; + public static final String OPTION_TARGET_ID = "single_source_shortest_path.target_id"; + public static final String OPTION_WEIGHT_PROPERTY = + "single_source_shortest_path.weight_property"; + public static final String OPTION_DEFAULT_WEIGHT = + "single_source_shortest_path.default_weight"; + + /** + * source vertex id + */ + private String sourceId; + + /** + * target vertex id. + * 1. single target + * 2. multiple target: comma separated + * 3. all: * + */ + private String targetId; + + /** + * target quantity type + */ + private QuantityType targetQuantityType; + + /** + * weight property. + * weight value must be a positive number. + */ + private String weightProperty; + + /** + * default weight. + * default 1 + */ + private Double defaultWeight; + + //****************** global data ******************// + /** + * reach target + */ + private IdList reachTarget; + + @Override + public String category() { + return "path"; + } + + @Override + public String name() { + return "single_source_shortest_path"; + } + + @Override + public void init(Config config) { + this.sourceId = config.getString(OPTION_SOURCE_ID, ""); + if (StringUtils.isBlank(this.sourceId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_SOURCE_ID); + } + + this.targetId = config.getString(OPTION_TARGET_ID, ""); + if (StringUtils.isBlank(this.targetId)) { + throw new ComputerException("The param '%s' must not be blank", OPTION_TARGET_ID); + } + + if (this.targetId.equals("*")) { + this.targetQuantityType = QuantityType.ALL; + } else if (this.targetId.split(",").length > 1) { + // remove spaces + this.targetId = Arrays.stream(this.targetId.split(",")) + .map(e -> e.trim()) + .collect(Collectors.joining(",")); + this.targetQuantityType = QuantityType.MULTIPLE; + } else { + this.targetQuantityType = QuantityType.SINGLE; + } + + this.weightProperty = config.getString(OPTION_WEIGHT_PROPERTY, ""); + + this.defaultWeight = config.getDouble(OPTION_DEFAULT_WEIGHT, 1); + if (this.defaultWeight <= 0) { + throw new ComputerException("The param '%s' must be greater than 0, " + + "actual got '%s'", + OPTION_DEFAULT_WEIGHT, this.defaultWeight); + } + } + + @Override + public void compute0(ComputationContext context, Vertex vertex) { + SingleSourceShortestPathValue value = new SingleSourceShortestPathValue(); + value.unreachable(); + vertex.value(value); + + // start from source vertex + if (!this.idEquals(vertex, this.sourceId)) { + vertex.inactivate(); + return; + } + value.zeroDistance(); // source vertex + + // single target && source vertex == target vertex + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.sourceId.equals(this.targetId)) { + LOG.debug("source vertex {} equals target vertex {}", this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + if (vertex.numEdges() <= 0) { + // isolated vertex + LOG.debug("source vertex {} can not reach target vertex {}", + this.sourceId, this.targetId); + vertex.inactivate(); + return; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue message = new SingleSourceShortestPathValue(); + message.addToPath(vertex, this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), message); + }); + + vertex.inactivate(); + } + + @Override + public void compute(ComputationContext context, Vertex vertex, + Iterator<SingleSourceShortestPathValue> messages) { + if (this.isTarget(vertex) && !this.reachTarget.contains(vertex.id())) { + // reach target + this.reachTarget.add(vertex.id()); + } + + while (messages.hasNext()) { + SingleSourceShortestPathValue message = messages.next(); + SingleSourceShortestPathValue value = vertex.value(); + + if (message.totalWeight() < value.totalWeight()) { + // find a shorter path + value.shorterPath(vertex, message.path(), message.totalWeight()); + } else { + continue; + } + + // reach all target or nowhere to go + if (this.isAllTargetReach(vertex) || vertex.numEdges() <= 0) { + continue; + } + + vertex.edges().forEach(edge -> { + SingleSourceShortestPathValue forwardMessage = new SingleSourceShortestPathValue(); + forwardMessage.addToPath(value.path(), + value.totalWeight() + this.getEdgeWeight(edge)); + + context.sendMessage(edge.targetId(), forwardMessage); + }); + } + + vertex.inactivate(); + } + + @Override + public void beforeSuperstep(WorkerContext context) { + this.reachTarget = context.aggregatedValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET); + } + + @Override + public void afterSuperstep(WorkerContext context) { + context.aggregateValue( + SingleSourceShortestPathMaster.SINGLE_SOURCE_SHORTEST_PATH_REACH_TARGET, + this.reachTarget); + } + + /** + * determine whether vertex.id and id are equal + */ + private boolean idEquals(Vertex vertex, String id) { + return vertex.id().value().toString().equals(id); + } + + /** + * get the weight of an edge by its weight property + */ + private double getEdgeWeight(Edge edge) { + double weight = this.defaultWeight; + + Value property = edge.property(this.weightProperty); + if (property != null) { + if (!property.isNumber()) { + throw new ComputerException("The value of %s must be a numeric value, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + + weight = ((DoubleValue) property).doubleValue(); + if (weight <= 0) { + throw new ComputerException("The value of %s must be greater than 0, " + + "actual got '%s'", + this.weightProperty, property.string()); + } + } + return weight; + } + + /** + * determine whether vertex is one of the target + */ + private boolean isTarget(Vertex vertex) { + if (this.targetQuantityType.equals(QuantityType.SINGLE) && + this.idEquals(vertex, this.targetId)) { + return true; + } + + if (this.targetQuantityType.equals(QuantityType.MULTIPLE)) { + for (String targetId : this.targetId.split(",")) { Review Comment: can we cache a this.targetIdSet? and just return this.targetIdSet.contains(vertex-id) for both SINGLE and MULTIPLE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
