[
https://issues.apache.org/jira/browse/FLINK-2150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14579615#comment-14579615
]
ASF GitHub Bot commented on FLINK-2150:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/801#discussion_r32067943
--- Diff:
flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/UniqueLabelAssignmentAlgorithm.java
---
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapPartitionFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * Unique Label Assignment Algorithm.
+ *
+ * This library method initializes the vertex values with unique Long
labels by making use of the
+ * following abstractions:
+ * <ul>
+ * <li> a map function generates an n-bit(n - number of parallel
tasks) ID based on its own index
+ * <li> with each record, a counter c is increased
+ * <li> the unique label is then produced by shifting the counter
c by the n-bit mapper ID
+ * </ul>
+ */
+public class UniqueLabelAssignmentAlgorithm implements
GraphAlgorithm<String, Long, NullValue> {
+
+ /**
+ * @param vertices the data set of graph vertices
+ * @return a data set of Tuple2 containing the vertex ID and its unique
Long label
+ */
+ private DataSet<Tuple2<String, Long>>
assignLabels(DataSet<Vertex<String, Long>> vertices) {
+
+ return vertices.mapPartition(new
RichMapPartitionFunction<Vertex<String, Long>,
+ Tuple2<String, Long>>() {
+
+ long shifter = 0;
+ long start = 0;
+
+ @Override
+ public void open(Configuration parameters) throws
Exception {
+ super.open(parameters);
+ shifter =
getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ @Override
+ public void mapPartition(Iterable<Vertex<String, Long>>
vertices,
+
Collector<Tuple2<String, Long>> out) throws Exception {
+
+ for(Vertex<String, Long> v: vertices) {
+ out.collect(new Tuple2<String,
Long>(v.getId(), start << shifter));
--- End diff --
After shifting the count by n bits, you need to set the first n bits to the
task ID (of n bit length).
You might want to add a check, that you stay within long's value range.
> Add a library method that assigns unique Long values to vertices
> ----------------------------------------------------------------
>
> Key: FLINK-2150
> URL: https://issues.apache.org/jira/browse/FLINK-2150
> Project: Flink
> Issue Type: New Feature
> Components: Gelly
> Reporter: Vasia Kalavri
> Assignee: Andra Lungu
> Priority: Minor
> Labels: starter
>
> In some graph algorithms, it is required to initialize the vertex values with
> unique values (e.g. label propagation).
> This issue proposes adding a Gelly library method that receives an input
> graph and initializes its vertex values with unique Long values.
> This method can then also be used to improve the MusicProfiles example.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)