Repository: flink
Updated Branches:
  refs/heads/master 0975d9f11 -> e58fd6e00


[FLINK-4572] [gelly] Convert to negative in LongValueToIntValue

The Gelly drivers expect that scale 32 edges, represented by the lower
32 bits of long values, can be converted to int values. Values between
2^31 and 2^32 - 1 should be converted to negative integers.

Creates separate signed and unsigned translators for long to int. This
prevents ambiguous conversion since each translator only works on a
32-bit range of values.

This closes #2469


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e58fd6e0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e58fd6e0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e58fd6e0

Branch: refs/heads/master
Commit: e58fd6e001b319aa7325a0544735f8c0680141d8
Parents: 4f84a02
Author: Greg Hogan <c...@greghogan.com>
Authored: Fri Sep 2 12:01:29 2016 -0400
Committer: Greg Hogan <c...@greghogan.com>
Committed: Tue Sep 20 10:10:20 2016 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/driver/GraphMetrics.java |  6 +--
 .../graph/examples/ClusteringCoefficient.java   |  6 +--
 .../org/apache/flink/graph/examples/HITS.java   |  4 +-
 .../flink/graph/examples/JaccardIndex.java      |  4 +-
 .../flink/graph/examples/TriangleListing.java   |  6 +--
 .../asm/translate/LongValueToIntValue.java      | 43 -----------------
 .../translate/LongValueToSignedIntValue.java    | 44 ++++++++++++++++++
 .../translate/LongValueToUnsignedIntValue.java  | 49 ++++++++++++++++++++
 .../LongValueToSignedIntValueTest.java          | 49 ++++++++++++++++++++
 .../LongValueToUnsignedIntValueTest.java        | 49 ++++++++++++++++++++
 10 files changed, 204 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
index cc265bb..44b64ee 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -180,7 +180,7 @@ public class GraphMetrics {
                                                        .run(new 
org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, 
NullValue, NullValue>());
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
                                                        .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>());
 
                                                vm = newGraph
@@ -201,7 +201,7 @@ public class GraphMetrics {
                                                        .run(new 
org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, 
NullValue, NullValue>());
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
                                                        .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip));
 
                                                vm = newGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index f4b1ecf..9961fff 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -195,7 +195,7 @@ public class ClusteringCoefficient {
                                                                
.setLittleParallelism(little_parallelism));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
                                                                
.setParallelism(little_parallelism))
                                                        .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>()
                                                                
.setParallelism(little_parallelism));
@@ -225,7 +225,7 @@ public class ClusteringCoefficient {
                                                                
.setLittleParallelism(little_parallelism));
                                        } else {
                                                Graph<IntValue, NullValue, 
NullValue> newGraph = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
                                                                
.setParallelism(little_parallelism))
                                                        .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip)
                                                                
.setParallelism(little_parallelism));

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index 59612d9..b2ce726 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -137,7 +137,7 @@ public class HITS {
                                                .run(new 
org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, 
NullValue>(iterations));
                                } else {
                                        hits = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
                                                .run(new Simplify<IntValue, 
NullValue, NullValue>())
                                                .run(new 
org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, 
NullValue>(iterations));
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 96f66ab..fa69ef0 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -151,7 +151,7 @@ public class JaccardIndex {
                                                        
.setLittleParallelism(little_parallelism));
                                } else {
                                        ji = graph
-                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue())
+                                               .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue())
                                                        
.setParallelism(little_parallelism))
                                                .run(new Simplify<IntValue, 
NullValue, NullValue>(clipAndFlip)
                                                        
.setParallelism(little_parallelism))

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index f3ce708..d819d19 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -30,7 +30,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
@@ -159,7 +159,7 @@ public class TriangleListing {
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, 
NullValue, NullValue>());
                                        } else {
                                                tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
                                                        .run(new 
org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, 
NullValue>())
                                                        .run(new 
org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, 
NullValue, NullValue>());
                                        }
@@ -175,7 +175,7 @@ public class TriangleListing {
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, 
NullValue, NullValue>());
                                        } else {
                                                tl = graph
-                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToIntValue()))
+                                                       .run(new 
TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new 
LongValueToUnsignedIntValue()))
                                                        .run(new 
org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, 
NullValue>(clipAndFlip))
                                                        .run(new 
org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, 
NullValue, NullValue>());
                                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
deleted file mode 100644
index adaf592..0000000
--- 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.translate;
-
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.MathUtils;
-
-/**
- * Translate {@link LongValue} to {@link IntValue}.
- *
- * Throws {@link RuntimeException} for integer overflow.
- */
-public class LongValueToIntValue
-implements TranslateFunction<LongValue, IntValue> {
-
-       @Override
-       public IntValue translate(LongValue value, IntValue reuse)
-                       throws Exception {
-               if (reuse == null) {
-                       reuse = new IntValue();
-               }
-
-               reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
-               return reuse;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
new file mode 100644
index 0000000..92a4abc
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.MathUtils;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToSignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+       @Override
+       public IntValue translate(LongValue value, IntValue reuse)
+                       throws Exception {
+               if (reuse == null) {
+                       reuse = new IntValue();
+               }
+
+               reuse.setValue(MathUtils.checkedDownCast(value.getValue()));
+
+               return reuse;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
new file mode 100644
index 0000000..943e727
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValue.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate {@link LongValue} to {@link IntValue}.
+ *
+ * Throws {@link RuntimeException} for integer overflow.
+ */
+public class LongValueToUnsignedIntValue
+implements TranslateFunction<LongValue, IntValue> {
+
+       @Override
+       public IntValue translate(LongValue value, IntValue reuse)
+                       throws Exception {
+               if (reuse == null) {
+                       reuse = new IntValue();
+               }
+
+               long l = value.getValue();
+
+               if (l < 0 || l >= (1L << 32)) {
+                       throw new IllegalArgumentException("Cannot cast long 
value " + value + " to integer.");
+               } else {
+                       reuse.setValue((int)(l & 0xffffffffL));
+               }
+
+               return reuse;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
new file mode 100644
index 0000000..9e2ec0a
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToSignedIntValueTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToSignedIntValueTest {
+
+       private TranslateFunction<LongValue, IntValue> translator = new 
LongValueToSignedIntValue();
+
+       private IntValue reuse = new IntValue();
+
+       @Test
+       public void testTranslation() throws Exception {
+               assertEquals(new IntValue(Integer.MIN_VALUE), 
translator.translate(new LongValue((long)Integer.MIN_VALUE), reuse));
+               assertEquals(new IntValue(0), translator.translate(new 
LongValue(0L), reuse));
+               assertEquals(new IntValue(Integer.MAX_VALUE), 
translator.translate(new LongValue((long)Integer.MAX_VALUE), reuse));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testUpperOutOfRange() throws Exception {
+               assertEquals(new IntValue(), translator.translate(new 
LongValue((long)Integer.MAX_VALUE + 1), reuse));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testLowerOutOfRange() throws Exception {
+               assertEquals(new IntValue(), translator.translate(new 
LongValue((long)Integer.MIN_VALUE - 1), reuse));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e58fd6e0/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
new file mode 100644
index 0000000..14e9685
--- /dev/null
+++ 
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/translate/LongValueToUnsignedIntValueTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.translate;
+
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class LongValueToUnsignedIntValueTest {
+
+       private TranslateFunction<LongValue, IntValue> translator = new 
LongValueToUnsignedIntValue();
+
+       private IntValue reuse = new IntValue();
+
+       @Test
+       public void testTranslation() throws Exception {
+               assertEquals(new IntValue(0), translator.translate(new 
LongValue(0L), reuse));
+               assertEquals(new IntValue(Integer.MIN_VALUE), 
translator.translate(new LongValue((long)Integer.MAX_VALUE + 1), reuse));
+               assertEquals(new IntValue(-1), translator.translate(new 
LongValue((1L << 32) - 1), reuse));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testUpperOutOfRange() throws Exception {
+               assertEquals(new IntValue(), translator.translate(new 
LongValue(1L << 32), reuse));
+       }
+
+       @Test(expected=IllegalArgumentException.class)
+       public void testLowerOutOfRange() throws Exception {
+               assertEquals(new IntValue(), translator.translate(new 
LongValue(-1), reuse));
+       }
+}

Reply via email to