[ 
https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258076#comment-15258076
 ] 

ASF GitHub Bot commented on FLINK-3771:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61083553
  
    --- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/translate/Translate.java
 ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.translate;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate graph labels
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link 
Graph} using the given {@link Translator}.
    +    *
    +    * @param graph input graph
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param <OLD> old graph label type
    +    * @param <NEW> new graph label type
    +    * @param <VV> vertex value type
    +    * @param <EV> edge value type
    +    * @return translated graph
    +    */
    +   public static <OLD,NEW,VV,EV> Graph<NEW,VV,EV> 
translateGraphLabels(Graph<OLD,VV,EV> graph, Translator<OLD,NEW> translator) {
    +           return translateGraphLabels(graph, translator, 
ExecutionConfig.PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link 
Graph} using the given {@link Translator}.
    +    *
    +    * @param graph input graph
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param parallelism operator parallelism
    +    * @param <OLD> old graph label type
    +    * @param <NEW> new graph label type
    +    * @param <VV> vertex value type
    +    * @param <EV> edge value type
    +    * @return translated graph
    +    */
    +   public static <OLD,NEW,VV,EV> Graph<NEW,VV,EV> 
translateGraphLabels(Graph<OLD,VV,EV> graph, Translator<OLD,NEW> translator, 
int parallelism) {
    +           // Vertices
    +           DataSet<Vertex<NEW,VV>> translatedVertices = 
translateVertexLabels(graph.getVertices(), translator, parallelism);
    +
    +           // Edges
    +           DataSet<Edge<NEW,EV>> translatedEdges = 
translateEdgeLabels(graph.getEdges(), translator, parallelism);
    +
    +           // Graph
    +           return Graph.fromDataSet(translatedVertices, translatedEdges, 
graph.getContext());
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate vertex labels
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link Translator}.
    +    *
    +    * @param vertices input vertices
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    * @return translated vertices
    +    */
    +   public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> 
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, Translator<OLD,NEW> 
translator) {
    +           return translateVertexLabels(vertices, translator, 
ExecutionConfig.PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link Translator}.
    +    *
    +    * @param vertices input vertices
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param parallelism operator parallelism
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    * @return translated vertices
    +    */
    +   @SuppressWarnings("unchecked")
    +   public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> 
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, Translator<OLD,NEW> 
translator, int parallelism) {
    +           Class<Vertex<NEW,VV>> vertexClass = 
(Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +           TypeInformation<NEW> newType = 
translator.getTypeHint().getTypeInfo();
    +           TypeInformation<VV> vertexValueType = 
((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +           TupleTypeInfo<Vertex<NEW,VV>> returnType = new 
TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +           return vertices
    +                   .map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +                   .returns(returnType)
    +                           .setParallelism(parallelism)
    +                           .name("Translate vertex labels");
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link Translator}.
    +    *
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    */
    +   @ForwardedFields("1")
    +   private static class TranslateVertexLabel<OLD,NEW,VV>
    +   implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +           private final Translator<OLD,NEW> translator;
    +
    +           private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +           public TranslateVertexLabel(Translator<OLD,NEW> translator) {
    +                   this.translator = translator;
    +           }
    +
    +           @Override
    +           public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
    +                           throws Exception {
    +                   vertex.f0 = translator.translate(value.f0, vertex.f0);
    +                   vertex.f1 = value.f1;
    +
    +                   return vertex;
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate edge labels
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Translate {@link Edge} labels using the given {@link Translator}.
    +    *
    +    * @param edges input edges
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param <OLD> old edge label type
    +    * @param <NEW> new edge label type
    +    * @param <EV> edge value type
    +    * @return translated edges
    +    */
    +   public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> 
translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, Translator<OLD,NEW> 
translator) {
    +           return translateEdgeLabels(edges, translator, 
ExecutionConfig.PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Translate {@link Edge} labels using the given {@link Translator}.
    +    *
    +    * @param edges input edges
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param parallelism operator parallelism
    +    * @param <OLD> old edge label type
    +    * @param <NEW> new edge label type
    +    * @param <EV> edge value type
    +    * @return translated edges
    +    */
    +   @SuppressWarnings("unchecked")
    +   public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> 
translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, Translator<OLD,NEW> 
translator, int parallelism) {
    +           Class<Edge<NEW,EV>> edgeClass = (Class<Edge<NEW,EV>>)(Class<? 
extends Edge>) Edge.class;
    +           TypeInformation<NEW> newType = 
translator.getTypeHint().getTypeInfo();
    +           TypeInformation<EV> edgeValueType = 
((TupleTypeInfo<Edge<OLD,EV>>) edges.getType()).getTypeAt(2);
    +
    +           TupleTypeInfo<Edge<NEW,EV>> returnType = new 
TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
    +
    +           return edges
    +                   .map(new TranslateEdgeLabel<OLD,NEW,EV>(translator))
    +                   .returns(returnType)
    +                           .setParallelism(parallelism)
    +                           .name("Translate edge labels");
    +   }
    +
    +   /**
    +    * Translate {@link Edge} labels using the given {@link Translator}.
    +    *
    +    * @param <OLD> old edge label type
    +    * @param <NEW> new edge label type
    +    * @param <EV> edge label type
    +    */
    +   @ForwardedFields("2")
    +   private static class TranslateEdgeLabel<OLD,NEW,EV>
    +   implements MapFunction<Edge<OLD,EV>, Edge<NEW,EV>> {
    +           private final Translator<OLD,NEW> translator;
    +
    +           private Edge<NEW,EV> edge = new Edge<>();
    +
    +           public TranslateEdgeLabel(Translator<OLD,NEW> translator) {
    +                   this.translator = translator;
    +           }
    +
    +           @Override
    +           public Edge<NEW,EV> map(Edge<OLD,EV> value)
    +                           throws Exception {
    +                   edge.f0 = translator.translate(value.f0, edge.f0);
    +                   edge.f1 = translator.translate(value.f1, edge.f1);
    +                   edge.f2 = value.f2;
    +
    +                   return edge;
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate vertex values
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Translate {@link Vertex} values using the given {@link Translator}.
    +    *
    +    * @param vertices input vertices
    +    * @param <K> vertex label type
    +    * @param <OLD> old vertex value type
    +    * @param <NEW> new vertex value type
    +    * @return translated vertices
    +    */
    +   public static <K,OLD,NEW> DataSet<Vertex<K,NEW>> 
translateVertexValues(DataSet<Vertex<K,OLD>> vertices, Translator<OLD,NEW> 
translator) {
    +           return translateVertexValues(vertices, translator, 
ExecutionConfig.PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} values using the given {@link Translator}.
    +    *
    +    * @param vertices source vertices
    +    * @param parallelism operator parallelism
    +    * @param <K> vertex label type
    +    * @param <OLD> old vertex value type
    +    * @param <NEW> new vertex value type
    +    * @return translated vertices
    +    */
    +   @SuppressWarnings("unchecked")
    +   public static <K,OLD,NEW> DataSet<Vertex<K,NEW>> 
translateVertexValues(DataSet<Vertex<K,OLD>> vertices, Translator<OLD,NEW> 
translator, int parallelism) {
    +           Class<Vertex<K,NEW>> vertexClass = 
(Class<Vertex<K,NEW>>)(Class<? extends Vertex>) Vertex.class;
    +           TypeInformation<K> labelType = ((TupleTypeInfo<Vertex<K,OLD>>) 
vertices.getType()).getTypeAt(0);
    +           TypeInformation<NEW> newType = 
translator.getTypeHint().getTypeInfo();
    +
    +           TupleTypeInfo<Vertex<K,NEW>> returnType = new 
TupleTypeInfo<>(vertexClass, labelType, newType);
    +
    +           return vertices
    +                   .map(new TranslateVertexValue<K,OLD,NEW>(translator))
    +                   .returns(returnType)
    +                           .setParallelism(parallelism)
    +                           .name("Translate vertex values");
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} values using the given {@link Translator}.
    +    *
    +    * @param <K> vertex label type
    +    * @param <OLD> old vertex value type
    +    * @param <NEW> new vertex value type
    +    */
    +   @ForwardedFields("0")
    +   private static class TranslateVertexValue<K,OLD,NEW>
    +   implements MapFunction<Vertex<K,OLD>, Vertex<K,NEW>> {
    +           private final Translator<OLD,NEW> translator;
    +
    +           private Vertex<K,NEW> vertex = new Vertex<>();
    +
    +           public TranslateVertexValue(Translator<OLD,NEW> translator) {
    +                   this.translator = translator;
    +           }
    +
    +           @Override
    +           public Vertex<K,NEW> map(Vertex<K,OLD> value)
    +                           throws Exception {
    +                   vertex.f0 = value.f0;
    +                   vertex.f1 = translator.translate(value.f1, vertex.f1);;
    --- End diff --
    
    Noting the double semi-colons here and below.


> Methods for translating Graphs
> ------------------------------
>
>                 Key: FLINK-3771
>                 URL: https://issues.apache.org/jira/browse/FLINK-3771
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 1.1.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>             Fix For: 1.1.0
>
>
> Provide methods for translation of the type or value of graph labels, vertex 
> values, and edge values.
> Sample use cases:
> * shifting graph labels in order to union generated graphs or graphs read 
> from multiple sources
> * downsizing labels or values since algorithms prefer to generate wide types 
> which may be expensive for further computation
> * changing label type for testing or benchmarking alternative code paths



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to