Add display data for MapElements transform

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0dab643a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0dab643a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0dab643a

Branch: refs/heads/master
Commit: 0dab643ae3c7d275016f8b11e406af5d15e7f388
Parents: 9f6c27c
Author: Scott Wegner <sweg...@google.com>
Authored: Tue May 3 11:16:51 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue May 10 16:21:27 2016 -0700

----------------------------------------------------------------------
 .../transforms/DataflowMapElementsTest.java     | 55 ++++++++++++++++++++
 .../apache/beam/sdk/transforms/MapElements.java | 12 +++++
 .../beam/sdk/transforms/MapElementsTest.java    | 36 ++++++++++++-
 3 files changed, 101 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0dab643a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
new file mode 100644
index 0000000..8a5e67d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowMapElementsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.transforms;
+
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Unit tests for Dataflow usage of {@link MapElements} transforms.
+ */
+public class DataflowMapElementsTest implements Serializable {
+  @Test
+  public void testPrimitiveDisplayData() {
+    SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
+      @Override
+      public Integer apply(Integer input) {
+        return input;
+      }
+    };
+
+    MapElements<?, ?> map = MapElements.via(mapFn);
+    DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
+
+    Set<DisplayData> displayData = 
evaluator.displayDataForPrimitiveTransforms(map);
+    assertThat("MapElements should include the mapFn in its primitive display 
data",
+        displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0dab643a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 47c2f5d..29d1dde 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 
@@ -108,6 +109,17 @@ extends PTransform<PCollection<InputT>, 
PCollection<OutputT>> {
       public void processElement(ProcessContext c) {
         c.output(fn.apply(c.element()));
       }
+
+      @Override
+      public void populateDisplayData(DisplayData.Builder builder) {
+        MapElements.this.populateDisplayData(builder);
+      }
     })).setTypeDescriptorInternal(outputType);
   }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder.add(DisplayData.item("mapFn", fn.getClass()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0dab643a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 5446512..1e2c826 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -17,12 +17,15 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -103,7 +106,7 @@ public class MapElementsTest implements Serializable {
     
assertThat(pipeline.getCoderRegistry().getDefaultCoder(output.getTypeDescriptor()),
         equalTo(pipeline.getCoderRegistry().getDefaultCoder(new 
TypeDescriptor<String>() {})));
 
-    // Make sure the pipelien runs too
+    // Make sure the pipeline runs too
     pipeline.run();
   }
 
@@ -118,12 +121,41 @@ public class MapElementsTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  public void testSerializableFunctionDisplayData() {
+    SerializableFunction<Integer, Integer> serializableFn =
+        new SerializableFunction<Integer, Integer>() {
+          @Override
+          public Integer apply(Integer input) {
+            return input;
+          }
+        };
+
+    MapElements<?, ?> serializableMap = MapElements.via(serializableFn)
+        .withOutputType(TypeDescriptor.of(Integer.class));
+    assertThat(DisplayData.from(serializableMap),
+        hasDisplayItem("mapFn", serializableFn.getClass()));
+  }
+
+  @Test
+  public void testSimpleFunctionDisplayData() {
+    SimpleFunction<?, ?> simpleFn = new SimpleFunction<Integer, Integer>() {
+      @Override
+      public Integer apply(Integer input) {
+        return input;
+      }
+    };
+
+    MapElements<?, ?> simpleMap = MapElements.via(simpleFn);
+    assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", 
simpleFn.getClass()));
+  }
+
   static class VoidValues<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Void>>> {
 
     @Override
     public PCollection<KV<K, Void>> apply(PCollection<KV<K, V>> input) {
-      return input.apply(MapElements.<KV<K, V>, KV<K, Void>>via(
+      return input.apply(MapElements.via(
           new SimpleFunction<KV<K, V>, KV<K, Void>>() {
             @Override
             public KV<K, Void> apply(KV<K, V> input) {

Reply via email to