[ 
https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263798#comment-15263798
 ] 

ASF GitHub Bot commented on FLINK-3771:
---------------------------------------

Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552498
  
    --- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
 ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static 
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate vertex labels
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +    *
    +    * @param vertices input vertices
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    * @return translated vertices
    +    */
    +   public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> 
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> 
translator) {
    +           return translateVertexLabels(vertices, translator, 
PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +    *
    +    * @param vertices input vertices
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param parallelism operator parallelism
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    * @return translated vertices
    +    */
    +   @SuppressWarnings("unchecked")
    +   public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> 
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> 
translator, int parallelism) {
    +           Preconditions.checkNotNull(vertices);
    +           Preconditions.checkNotNull(translator);
    +           Preconditions.checkArgument(parallelism > 0 || parallelism == 
PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +                   "The parallelism must be greater than zero.");
    +
    +           Class<Vertex<NEW,VV>> vertexClass = 
(Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +           TypeInformation<NEW> newType = 
TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, 
null);
    +           TypeInformation<VV> vertexValueType = 
((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +           TupleTypeInfo<Vertex<NEW,VV>> returnType = new 
TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +           return vertices
    +                   .map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +                   .returns(returnType)
    +                           .setParallelism(parallelism)
    +                           .name("Translate vertex labels");
    +   }
    +
    +   /**
    +    * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +    *
    +    * @param <OLD> old vertex label type
    +    * @param <NEW> new vertex label type
    +    * @param <VV> vertex value type
    +    */
    +   @ForwardedFields("1")
    +   private static class TranslateVertexLabel<OLD,NEW,VV>
    +   extends WrappingFunction<MapFunction<OLD,NEW>>
    +   implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +           private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +           public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
    +                   super(translator);
    +           }
    +
    +           @Override
    +           public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
    +                           throws Exception {
    +                   vertex.f0 = wrappedFunction.map(value.f0);
    +                   vertex.f1 = value.f1;
    +
    +                   return vertex;
    +           }
    +   }
    +
    +   // 
--------------------------------------------------------------------------------------------
    +   //  Translate edge labels
    +   // 
--------------------------------------------------------------------------------------------
    +
    +   /**
    +    * Translate {@link Edge} labels using the given {@link MapFunction}.
    +    *
    +    * @param edges input edges
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param <OLD> old edge label type
    +    * @param <NEW> new edge label type
    +    * @param <EV> edge value type
    +    * @return translated edges
    +    */
    +   public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> 
translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, MapFunction<OLD,NEW> 
translator) {
    +           return translateEdgeLabels(edges, translator, 
PARALLELISM_UNKNOWN);
    +   }
    +
    +   /**
    +    * Translate {@link Edge} labels using the given {@link MapFunction}.
    +    *
    +    * @param edges input edges
    +    * @param translator implements conversion from {@code OLD} to {@code 
NEW}
    +    * @param parallelism operator parallelism
    +    * @param <OLD> old edge label type
    +    * @param <NEW> new edge label type
    +    * @param <EV> edge value type
    +    * @return translated edges
    +    */
    +   @SuppressWarnings("unchecked")
    +   public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> 
translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, MapFunction<OLD,NEW> 
translator, int parallelism) {
    +           Preconditions.checkNotNull(edges);
    +           Preconditions.checkNotNull(translator);
    +           Preconditions.checkArgument(parallelism > 0 || parallelism == 
PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +                   "The parallelism must be greater than zero.");
    +
    +           Class<Edge<NEW,EV>> edgeClass = (Class<Edge<NEW,EV>>)(Class<? 
extends Edge>) Edge.class;
    +           TypeInformation<NEW> newType = 
TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, 
null);
    +           TypeInformation<EV> edgeValueType = 
((TupleTypeInfo<Edge<OLD,EV>>) edges.getType()).getTypeAt(2);
    +
    +           TupleTypeInfo<Edge<NEW,EV>> returnType = new 
TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
    +
    +           return edges
    +                   .map(new TranslateEdgeLabel<OLD,NEW,EV>(translator))
    +                   .returns(returnType)
    +                           .setParallelism(parallelism)
    --- End diff --
    
    indention is off


> Methods for translating Graphs
> ------------------------------
>
>                 Key: FLINK-3771
>                 URL: https://issues.apache.org/jira/browse/FLINK-3771
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 1.1.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>             Fix For: 1.1.0
>
>
> Provide methods for translation of the type or value of graph labels, vertex 
> values, and edge values.
> Sample use cases:
> * shifting graph labels in order to union generated graphs or graphs read 
> from multiple sources
> * downsizing labels or values since algorithms prefer to generate wide types 
> which may be expensive for further computation
> * changing label type for testing or benchmarking alternative code paths



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to