[
https://issues.apache.org/jira/browse/FLINK-3771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263796#comment-15263796
]
ASF GitHub Bot commented on FLINK-3771:
---------------------------------------
Github user vasia commented on a diff in the pull request:
https://github.com/apache/flink/pull/1900#discussion_r61552437
--- Diff:
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
---
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Preconditions;
+
+import static
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static
org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
+
+/**
+ * Methods for translation of the type or modification of the data of graph
+ * labels, vertex values, and edge values.
+ */
+public class Translate {
+
+ //
--------------------------------------------------------------------------------------------
+ // Translate vertex labels
+ //
--------------------------------------------------------------------------------------------
+
+ /**
+ * Translate {@link Vertex} labels using the given {@link MapFunction}.
+ *
+ * @param vertices input vertices
+ * @param translator implements conversion from {@code OLD} to {@code
NEW}
+ * @param <OLD> old vertex label type
+ * @param <NEW> new vertex label type
+ * @param <VV> vertex value type
+ * @return translated vertices
+ */
+ public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>>
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW>
translator) {
+ return translateVertexLabels(vertices, translator,
PARALLELISM_UNKNOWN);
+ }
+
+ /**
+ * Translate {@link Vertex} labels using the given {@link MapFunction}.
+ *
+ * @param vertices input vertices
+ * @param translator implements conversion from {@code OLD} to {@code
NEW}
+ * @param parallelism operator parallelism
+ * @param <OLD> old vertex label type
+ * @param <NEW> new vertex label type
+ * @param <VV> vertex value type
+ * @return translated vertices
+ */
+ @SuppressWarnings("unchecked")
+ public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>>
translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW>
translator, int parallelism) {
+ Preconditions.checkNotNull(vertices);
+ Preconditions.checkNotNull(translator);
+ Preconditions.checkArgument(parallelism > 0 || parallelism ==
PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+ "The parallelism must be greater than zero.");
+
+ Class<Vertex<NEW,VV>> vertexClass =
(Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
+ TypeInformation<NEW> newType =
TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null,
null);
+ TypeInformation<VV> vertexValueType =
((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
+
+ TupleTypeInfo<Vertex<NEW,VV>> returnType = new
TupleTypeInfo<>(vertexClass, newType, vertexValueType);
+
+ return vertices
+ .map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
+ .returns(returnType)
+ .setParallelism(parallelism)
+ .name("Translate vertex labels");
+ }
+
+ /**
+ * Translate {@link Vertex} labels using the given {@link MapFunction}.
+ *
+ * @param <OLD> old vertex label type
+ * @param <NEW> new vertex label type
+ * @param <VV> vertex value type
+ */
+ @ForwardedFields("1")
+ private static class TranslateVertexLabel<OLD,NEW,VV>
+ extends WrappingFunction<MapFunction<OLD,NEW>>
+ implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
+ private Vertex<NEW,VV> vertex = new Vertex<>();
+
+ public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
+ super(translator);
+ }
+
+ @Override
+ public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
+ throws Exception {
--- End diff --
can be moved to the line above
> Methods for translating Graphs
> ------------------------------
>
> Key: FLINK-3771
> URL: https://issues.apache.org/jira/browse/FLINK-3771
> Project: Flink
> Issue Type: New Feature
> Components: Gelly
> Affects Versions: 1.1.0
> Reporter: Greg Hogan
> Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Provide methods for translation of the type or value of graph labels, vertex
> values, and edge values.
> Sample use cases:
> * shifting graph labels in order to union generated graphs or graphs read
> from multiple sources
> * downsizing labels or values since algorithms prefer to generate wide types
> which may be expensive for further computation
> * changing label type for testing or benchmarking alternative code paths
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)